Airflow DAG - как сначала проверить BQ (при необходимости удалить), а затем запустить задание потока данных?

Я использую облачный композитор для организации ETL для файлов, поступающих в GCS и идущих в BigQuery. У меня есть облачная функция, которая запускает даг при поступлении файла, а облачная функция передает имя / местоположение файла в DAG. В моем DAG есть 2 задачи:

1) Используйте DataflowPythonOperator для запуска задания потока данных, которое считывает данные из текста в GCS, преобразует их и вводит в BQ, а 2) перемещает файл в состояние сбоя. / сегмент успеха в зависимости от того, завершилось ли задание успешно или неудачно. У каждого файла есть идентификатор файла, который является столбцом в таблице bigquery. Иногда файл редактируется один или два раза (это не потоковая передача, где это бывает часто), и я хочу иметь возможность сначала удалить существующие записи для этого файла.

Я посмотрел на других операторов воздушного потока, но хотел, чтобы в моем DAG было 2 задачи, прежде чем запускать задание потока данных:

  1. Получите идентификатор файла на основе имени файла (прямо сейчас у меня есть имя файла сопоставления таблицы bigquery -> идентификатор файла, но я также могу просто ввести json, который служит картой, я думаю, если это проще )
  2. Если идентификатор файла уже присутствует в таблице bigquery (таблица, которая выводит преобразованные данные из задания потока данных), удалите его, а затем запустите задание потока данных, чтобы у меня была самая последняя информация. Я знаю, что один из вариантов - просто добавить отметку времени и использовать только самые свежие записи, но поскольку на файл может быть 1 миллион записей, и это не похоже на то, что я удаляю 100 файлов в день (возможно, 1-2 вершины) это может быть запутанным и запутанным.

После задания потока данных, в идеале перед перемещением файла в папку успеха / неудачи, я хотел бы добавить в некоторую таблицу «записей», что эта игра была введена в это время. Это будет мой способ увидеть все вставки, которые произошли. Я пытался найти разные способы сделать это, я новичок в облачном композиторе, поэтому у меня нет четкого представления о том, как это будет работать после 10+ часов исследований, иначе я бы опубликовал код для ввода.

Спасибо, я очень благодарен всем за помощь и прошу прощения, если это не так ясно, как хотелось бы, документация по воздушному потоку очень надежна, но, учитывая, что облачный композитор и bigquery относительно новы, трудно так тщательно изучить, как что-то делать. Специфические задачи GCP.




Ответы (1)


Звучит немного сложно. К счастью, есть операторы практически для каждой службы GCP. Другое дело, когда запускать DAG. Вы уже догадались? Вы хотите, чтобы функция Google Cloud запускалась каждый раз, когда в эту корзину GCS поступает новый файл.

  1. Запуск вашего DAG

Чтобы запустить DAG, вы захотите вызвать его с помощью облачной функции Google, которая использует Завершение объекта или триггеры Обновление метаданных.

  1. Загрузка данных в BigQuery

Если ваш файл уже находится в GCS и в формате JSON или CSV, то использование задания Dataflow является излишним. Вы можете использовать GoogleCloudStorageToBigQueryOperator для загрузки файла в BQ.

  1. Отслеживание идентификатора файла

Вероятно, лучший способ вычислить идентификатор файла - это использовать оператор Bash или Python из Airflow. Можете ли вы получить его непосредственно из имени файла?

Если это так, тогда у вас может быть оператор Python, расположенный выше GoogleCloudStorageObjectSensor, чтобы проверить, находится ли файл в успешном каталоге.

Если это так, то вы можете использовать BigQueryOperator. для выполнения запроса на удаление в BQ.

После этого вы запускаете GoogleCloudStorageToBigQueryOperator.

  1. Перемещение файлов

Если вы перемещаете файлы из GCS в местоположения GCS, тогда GoogleCloudStorageToGoogleCloudStorageOperator сделай то, что тебе нужно. Если ваш оператор загрузки BQ не работает, перейдите в расположение сбойных файлов, а в случае успеха перейдите в расположение успешных заданий.

  1. Ведение журналов задач

Возможно, все, что вам нужно для отслеживания вставок, - это записывать информацию о задачах в GCS. Узнайте, как записывать информацию о задаче в GCS

Это помогает?

person Pablo    schedule 14.01.2019
comment
Итак, я уже выполнил этап облачной функции и задание потока данных (поток данных - это 1 миллион записей, а воздушный поток не был таким быстрым, если я не ошибаюсь). Дело в том, что мне нужно иметь доступ к идентификатору файла после его вычисления. Я могу вычислить его, используя имя файла и выполнив простой поиск в таблице bigquery. Однако после того, как я его вычислю, мне нужно, чтобы мой шаблон знал, что это такое, чтобы использовать его в моем запросе. Имеет ли это смысл? - person WIT; 14.01.2019
comment
Airflow не импортирует сам файл. Он запускает задание загрузки BQ для файла, которое выполняется очень быстро и бесплатно (в отличие от задания потока данных). - Что касается идентификатора файла, позвольте мне немного вернуться к вам - person Pablo; 14.01.2019
comment
Можете ли вы сделать (1) Импортировать данные во временную таблицу - (2) Подождать 1 час обновлений файла (3) Если файл обновится, затем удалить временную таблицу (4) Если файл не обновляется, скопируйте временную таблицу в целевую таблицу ? - person Pablo; 15.01.2019
comment
К сожалению, нет, и часто обновления не будут происходить в течение 1 часа, они могут быть на 24 часа позже, а могут быть на 1 месяц или даже на год позже. - person WIT; 15.01.2019
comment
В этом случае да - у вас должен быть (1) оператор Python, который вычисляет идентификатор файла. (2) BigQueryOperator, который выполняет запрос на удаление для всех строк с этим идентификатором файла. (3). DataflowPythonOperator, который запускает задание, которое добавляет идентификатор файла к строкам и вставляет BigQuery. Как насчет этого? - person Pablo; 15.01.2019
comment
Что касается хранения идентификатора файла, я думаю, вам будет лучше хранить его в GCS, а не в BQ. GCS должен быть дешевле. - person Pablo; 15.01.2019
comment
Я думаю, что это имеет смысл, причина, по которой я хочу искать в BQ идентификатор файла, заключается в том, что существует редкая вероятность, что он может измениться, и я не хочу беспокоиться об обновлении его в GCS, лучше иметь его в BQ - person WIT; 16.01.2019
comment
Я немного запутался, как я буду использовать оператор python, а затем получить само значение для использования в следующей задаче thoguh - person WIT; 16.01.2019
comment
Вы можете использовать xcoms для передачи значений между задачами: airflow.apache.org/concepts.html#xcoms < / а> - person Pablo; 16.01.2019
comment
это помогло? рад продолжить поиски - person Pablo; 17.01.2019