Я хочу написать конвейер ETL с искровой обработкой различных источников ввода, но используя как можно меньше вычислительных ресурсов, и у меня возникнут проблемы с использованием «традиционного» подхода Spark ETL.
У меня есть несколько источников потоковых данных, которые необходимо сохранить в таблицах DeltaLake. Каждый источник данных - это просто папка в s3 с файлами avro. У каждого источника данных своя схема. Каждый источник данных должен храниться в собственной таблице DeltaLake. Требуется небольшое преобразование, кроме avro -> delta, только обогащение некоторыми дополнительными полями, полученными из имени файла. Новые файлы добавляются с умеренной скоростью, от одного раза в минуту до одного раза в день, в зависимости от источника данных. У меня есть уведомление kafka, когда появляются новые данные, с описанием того, какие данные и путь к файлу s3.
Предположим, есть два источника данных - A и B. A - это файлы s3: // bucket / A / *, B - s3: // bucket / B / *. Всякий раз, когда добавляются новые файлы, у меня появляется сообщение кафки с полезной нагрузкой {'источник данных': 'A', имя файла: 's3: // bucket / A / file1', ... другие поля}. Файлы должны идти в дельта-таблицу s3: // delta / A /, B - s3: // delta / B /
Как я могу проглотить их все в одном приложении с минимальной задержкой? Поскольку необходимые данные постоянно поступают, звучат как потоковая передача. Но при потоковой передаче искр нужно заранее определить схему потока, а у меня есть разные источники с разными схемами, которые заранее не известны.
Создание специального приложения Spark для каждого источника данных не вариант - существует более 100 источников данных с поступающими очень маленькими файлами. Наличие более 100 искровых приложений - пустая трата денег. Все должны быть загружены с использованием одного кластера среднего размера.
Единственная идея, которая у меня есть сейчас: в процессе драйвера запустить обычного потребителя kafka, для каждой записи прочитать фрейм данных, обогатить дополнительными полями и сохранить его в дельта-таблице. Больше параллелизма - используйте несколько сообщений и запускайте их во фьючерсах, чтобы несколько заданий выполнялись одновременно. Какой-то псевдокод в процессе драйвера:
val consumer = KafkaConsumer(...)
consumer.foreach{record =>
val ds = record.datasource
val file = record.filename
val df = spark.read.format(avro).load(file)
.withColumn('id', record.id)
val dest = s"s3://delta/${record.datasourceName}"
df.write.format('delta').save(dest)
consumer.commit(offset from record)
}
Звучит хорошо (и PoC показывает, что это работает), но мне интересно, есть ли другие варианты? Любые другие идеи приветствуются. Spark работает на платформе DataBricks.