Следуя тематике статей, в которых мы подробно объясняли некоторые практические примеры как проводить различные процессы, автоматизацию и интеграцию с YepCode, сегодня мы поговорим о том, как автоматизировать перевод больших объемов данных от Google BigQuery до Snowflake наиболее эффективным способом.

Перенос больших объемов данных из Google BigQuery в Snowflake не является задачей гибкой разработки.

На рынке существует множество инструментов NoCode ETL, которые позволяют перемещать информацию между несколькими источниками данных (как локальными службами, так и SaaS). Но распространенной проблемой является то, что пользователи не могут полностью адаптировать эти процессы загрузки ко всем своим потребностям.

Подумайте, например, о сценарии, в котором информация генерируется каждый день в Google BigQuery (т. е. информация о событиях Google Analytics). Представьте, что вы хотите скопировать эту информацию в базу данных Снежинка в ночное время.

Обязательным требованием должно быть копирование только новых событий, которые ранее не копировались. Прежде чем перейти к BigQuery, вам нужно перейти в Snowflake, чтобы получить дату последней загрузки. Таким образом, вы сможете использовать его позже для построения SQL-предложения BigQuery. Как правило, это может быть проверка максимального значения столбца даты или использование контрольной таблицы, которая отслеживает каждое выполнение загрузки.

Вам также может потребоваться уведомление (с помощью уведомления электронной почты или Slack) о том, как была выполнена эта загрузка (например, отчет о количестве новых загруженных событий ).

Последнее требование может заключаться в том, чтобы учитывать, что миллионы событий генерируются каждый день. В этом случае простой подход запуска SQL для BigQuery и выполнения вставки каждой строки в Snowflake может не сработать.

// This is a bad approach to load millions of rows

const [job] = await bigqueryClient.createQueryJob({
    query: `SELECT column1, column2 FROM table1`
});
const [rows] = await job.getQueryResults();

snowflakeClient.execute({
    sqlText: 'insert into table1(column1, column2) values(?, ?, ?)',
    binds: rows.map((row)=> [row.column1, row.column2]])
});

Мы используем интеграцию Yepcode, чтобы создать наиболее эффективный рабочий процесс для этого процесса.

В этой ситуации процесс, который мы бы предложили с использованием интеграции YepCode, будет включать следующие шаги:

  • Запустите синхронный выбор SQL для Snowflake, чтобы получить дату последней загрузки, а также текущее количество событий. Мы могли бы использовать повторно используемую функцию для этого подхода syncExec и включить ее в один модуль YepCode JS:
exports.snowflakeSyncExec = (snowflakeClient, sqlText, binds = [], rest = {}) => {
  return new Promise((resolve, reject) => {
    snowflakeClient.execute({
      sqlText,
      binds,
      ...rest,
      complete: (err, stmt, rows) => {
        if (err) {
          reject(err);
          return;
        }
        resolve([stmt, rows]);
      },
    });
  });
}
  • С помощью этой многократно используемой функции последнюю загруженную дату и текущее количество строк можно получить примерно так:
const [, rows] = await snowflakeSyncExec(snowflakeClient, "SELECT MAX(DATE) AS LAST_LOADED_DATE, COUNT(1) AS ROWS_AMOUNT FROM table1");

const LAST_LOADED_DATE = rows[0].LAST_LOADED_DATE
const ROWS_AMOUNT = rows[0].ROWS_AMOUNT
  • Используя ранее полученную дату, мы могли бы создать предложение Google BigQuery SQL, чтобы получить новые события и выполнить этот запрос в BigQuery.
  • Вместо того, чтобы возвращать строки, мы оставим их в Google Cloud Bucket в формате CSV.
  • Для этого мы должны использовать функцию экспорта данных, которая может оставлять строки, возвращаемые запросом, в CSV-файле в Google Cloud Bucket (сопутствующие документы) .
  • Пример кода может быть таким:
const exportQuery =
  `EXPORT DATA OPTIONS(
  uri='gs://my-google-cloud-bucket-name/my-table-export-file_*',
  format='CSV',
  header=true,
  compression='GZIP',
  field_delimiter=',') AS
  SELECT column1, column2 FROM table1 WHERE date > @last_loaded_date
  `;

googleBigQueryClient.createQueryJob({
  query: exportQuery,
  params: {last_loaded_date: LAST_LOADED_DATE}
});
  • Обратите внимание, что этот подход экспорта данных можно также использовать с сжатием GZIP для сокращения использования сети.
  • Имея результат запроса, экспортированный в файл в корзине, мы можем напрямую загрузить этот CSV-файл в Snowflake, используя предварительно настроенный этап. Это замечательная функция в Snowflake (сопутствующие документы).
  • Часть кода может быть:
const importCSVSqlSentence =
  `COPY INTO my_table_name
  FROM @stage_for_google_cloud_bucket
  PATTERN='my-table-export-file_.*';`;

snowflakeSyncExec(snowflakeClient, importCSVSqlSentence);
  • Загрузив новую информацию в Snowflake, мы могли удалить ранее сгенерированные файлы CSV. Интеграция Google Cloud Storage может помочь нам:
googleCloudStorageClient.bucket('my-google-cloud-bucket-name').deleteFiles({
    prefix: 'my-table-export-file_'
});
  • В качестве последнего шага мы могли бы выполнить еще один запрос в Snowflake, чтобы загрузить новое количество событий, и с этой информацией мы могли бы создать уведомление и доставить его по электронной почте или с помощью слабая интеграция (сопутствующие документы).
  • Фрагмент кода для отправки этого сообщения может быть таким:
await slackBotClient.chat.postMessage({
  channel: "bigQuery-snowflake-load",
  text: `Load successfully finished with ${rowsAmount} new rows copied!`,
});

Чтобы реализовать весь процесс в YepCode, вам необходимо настроить сервисный аккаунт Google с доступом к Google BigQuery и Google Cloud Bucket. Кроме того, следуйте инструкциям в Snowflake, чтобы настроить этап, который может считывать информацию из этого Bucket.

После создания этой конфигурации лучшим вариантом может быть создание общего процесса, который мог бы выполнять перемещение информации параметризованным способом, получая следующие параметры:

  • Предложение SQL для выполнения в Google BigQuery (с датой начала загрузки в качестве параметра)
  • Предложение SQL для выполнения в Snowflake для получения последней загруженной даты и количества событий
  • База данных Snowflake и имя таблицы