Я использую облачный композитор для организации ETL для файлов, поступающих в GCS и идущих в BigQuery. У меня есть облачная функция, которая запускает даг при поступлении файла, а облачная функция передает имя / местоположение файла в DAG. В моем DAG есть 2 задачи:
1) Используйте DataflowPythonOperator
для запуска задания потока данных, которое считывает данные из текста в GCS, преобразует их и вводит в BQ, а 2) перемещает файл в состояние сбоя. / сегмент успеха в зависимости от того, завершилось ли задание успешно или неудачно. У каждого файла есть идентификатор файла, который является столбцом в таблице bigquery. Иногда файл редактируется один или два раза (это не потоковая передача, где это бывает часто), и я хочу иметь возможность сначала удалить существующие записи для этого файла.
Я посмотрел на других операторов воздушного потока, но хотел, чтобы в моем DAG было 2 задачи, прежде чем запускать задание потока данных:
- Получите идентификатор файла на основе имени файла (прямо сейчас у меня есть имя файла сопоставления таблицы bigquery -> идентификатор файла, но я также могу просто ввести json, который служит картой, я думаю, если это проще )
- Если идентификатор файла уже присутствует в таблице bigquery (таблица, которая выводит преобразованные данные из задания потока данных), удалите его, а затем запустите задание потока данных, чтобы у меня была самая последняя информация. Я знаю, что один из вариантов - просто добавить отметку времени и использовать только самые свежие записи, но поскольку на файл может быть 1 миллион записей, и это не похоже на то, что я удаляю 100 файлов в день (возможно, 1-2 вершины) это может быть запутанным и запутанным.
После задания потока данных, в идеале перед перемещением файла в папку успеха / неудачи, я хотел бы добавить в некоторую таблицу «записей», что эта игра была введена в это время. Это будет мой способ увидеть все вставки, которые произошли. Я пытался найти разные способы сделать это, я новичок в облачном композиторе, поэтому у меня нет четкого представления о том, как это будет работать после 10+ часов исследований, иначе я бы опубликовал код для ввода.
Спасибо, я очень благодарен всем за помощь и прошу прощения, если это не так ясно, как хотелось бы, документация по воздушному потоку очень надежна, но, учитывая, что облачный композитор и bigquery относительно новы, трудно так тщательно изучить, как что-то делать. Специфические задачи GCP.