У меня есть требование обрабатывать файлы xml, передаваемые в папку S3. В настоящее время я реализовал это следующим образом.
Во-первых, прочитайте файлы, используя файловый поток Spark.
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
Для каждого RDD проверьте, был ли прочитан какой-либо файл
if (data.count() !=0)
Запишите строку в новый каталог HDFS
data.coalesce(1).saveAsTextFile(sdir);
Создайте чтение Dataframe из вышеуказанного каталога HDFS.
val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)
Сделайте некоторую обработку в Dataframe и сохраните как JSON
loaddata.write.mode("append").json("s3://mybucket/somefolder")
Почему-то я чувствую, что описанный выше подход очень неэффективен и, откровенно говоря, довольно школьный. Есть ли лучшее решение? Любая помощь будет принята с благодарностью.
Последующий вопрос: как манипулировать полями (не столбцами) в кадре данных? У меня есть очень сложный вложенный xml, и когда я использую описанный выше метод, я получаю Dataframe с 9 столбцами и 50 нечетными внутренними массивами Struct. Это нормально, за исключением необходимости обрезать имена определенных полей. Есть ли способ добиться этого без взрыва фрейма данных, так как мне нужно снова построить ту же структуру?