Как получить разные фреймы данных Spark за одно задание искры

Я хочу написать конвейер ETL с искровой обработкой различных источников ввода, но используя как можно меньше вычислительных ресурсов, и у меня возникнут проблемы с использованием «традиционного» подхода Spark ETL.

У меня есть несколько источников потоковых данных, которые необходимо сохранить в таблицах DeltaLake. Каждый источник данных - это просто папка в s3 с файлами avro. У каждого источника данных своя схема. Каждый источник данных должен храниться в собственной таблице DeltaLake. Требуется небольшое преобразование, кроме avro -> delta, только обогащение некоторыми дополнительными полями, полученными из имени файла. Новые файлы добавляются с умеренной скоростью, от одного раза в минуту до одного раза в день, в зависимости от источника данных. У меня есть уведомление kafka, когда появляются новые данные, с описанием того, какие данные и путь к файлу s3.

Предположим, есть два источника данных - A и B. A - это файлы s3: // bucket / A / *, B - s3: // bucket / B / *. Всякий раз, когда добавляются новые файлы, у меня появляется сообщение кафки с полезной нагрузкой {'источник данных': 'A', имя файла: 's3: // bucket / A / file1', ... другие поля}. Файлы должны идти в дельта-таблицу s3: // delta / A /, B - s3: // delta / B /

Как я могу проглотить их все в одном приложении с минимальной задержкой? Поскольку необходимые данные постоянно поступают, звучат как потоковая передача. Но при потоковой передаче искр нужно заранее определить схему потока, а у меня есть разные источники с разными схемами, которые заранее не известны.

Создание специального приложения Spark для каждого источника данных не вариант - существует более 100 источников данных с поступающими очень маленькими файлами. Наличие более 100 искровых приложений - пустая трата денег. Все должны быть загружены с использованием одного кластера среднего размера.

Единственная идея, которая у меня есть сейчас: в процессе драйвера запустить обычного потребителя kafka, для каждой записи прочитать фрейм данных, обогатить дополнительными полями и сохранить его в дельта-таблице. Больше параллелизма - используйте несколько сообщений и запускайте их во фьючерсах, чтобы несколько заданий выполнялись одновременно. Какой-то псевдокод в процессе драйвера:

val consumer = KafkaConsumer(...)
consumer.foreach{record =>
    val ds = record.datasource
    val file = record.filename
    val df = spark.read.format(avro).load(file)
        .withColumn('id', record.id)
    val dest = s"s3://delta/${record.datasourceName}"
    df.write.format('delta').save(dest)
    consumer.commit(offset from record)
}

Звучит хорошо (и PoC показывает, что это работает), но мне интересно, есть ли другие варианты? Любые другие идеи приветствуются. Spark работает на платформе DataBricks.


person Vyacheslav Krot    schedule 04.05.2020    source источник


Ответы (1)


Spark не ограничивает вас наличием приложения Spark для каждого приема источника данных, вы можете сгруппировать источники данных в несколько приложений Spark или использовать одно приложение Spark для всех источников данных, что является осуществимым подходом, если у приложения Spark достаточно ресурсов для принимать и обрабатывать весь источник данных.

Вы можете сделать что-то вроде:

object StreamingJobs extends SparkApp {

  // consume from Kafka Topic 1
  StreamProcess_1.runStream(spark)

  // consume from Kafka Topic 2
  StreamProcess_2.runStream(spark)

  //  consume from Kafka Topic n
  StreamProcess_N.runStream(spark)

  // wait until termination
  spark.streams.awaitAnyTermination()

}

и, возможно, еще одна искровая работа для пакетной обработки

object BatchedJobs extends SparkApp {

  // consume from data source 1
  BatchedProcess_1.run(spark)

  // consume from  data source 2
  BatchedProcess_2.run(spark)

  //  consume from  data source n
  BatchedProcess_N.run(spark) 

}
person dumitru    schedule 05.05.2020