XML-файлы потоковой передачи Spark

У меня есть требование обрабатывать файлы xml, передаваемые в папку S3. В настоящее время я реализовал это следующим образом.

Во-первых, прочитайте файлы, используя файловый поток Spark.

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

Для каждого RDD проверьте, был ли прочитан какой-либо файл

if (data.count() !=0)

Запишите строку в новый каталог HDFS

data.coalesce(1).saveAsTextFile(sdir);

Создайте чтение Dataframe из вышеуказанного каталога HDFS.

val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)

Сделайте некоторую обработку в Dataframe и сохраните как JSON

loaddata.write.mode("append").json("s3://mybucket/somefolder")

Почему-то я чувствую, что описанный выше подход очень неэффективен и, откровенно говоря, довольно школьный. Есть ли лучшее решение? Любая помощь будет принята с благодарностью.

Последующий вопрос: как манипулировать полями (не столбцами) в кадре данных? У меня есть очень сложный вложенный xml, и когда я использую описанный выше метод, я получаю Dataframe с 9 столбцами и 50 нечетными внутренними массивами Struct. Это нормально, за исключением необходимости обрезать имена определенных полей. Есть ли способ добиться этого без взрыва фрейма данных, так как мне нужно снова построить ту же структуру?


person Vamsi    schedule 18.11.2016    source источник


Ответы (1)


Если вы используете Spark 2.0, вы можете заставить его работать со структурированной потоковой передачей:

val inputDF = spark.readStream.format("com.databricks.spark.xml")
  .option("rowTag", "Trans")
  .load(path)
person Community    schedule 18.11.2016
comment
Большое спасибо. Моя целевая среда — это стек EMR со Spark 2.0.1. Я попробую ваше предложение на коробке EMR. - person Vamsi; 18.11.2016
comment
пожалуйста, проголосуйте / примите, если вы согласны с решением, упомянутым выше. - person Ram Ghadiyaram; 23.11.2016
comment
val inputDF = spark.readStream.format("com.databricks.spark.xml") .option("rowTag", "Trans") .load(path) Приведенное выше решение не работает со искрой 2.X. - person Parag Chimanpure; 19.10.2020