Форматировать (удалить класс/скобки) Вывод Spark CSV saveAsTextFile?

Я пытаюсь удалить текст класса или массива обертки из данных CSV, сохраненных с помощью saveAsTextFile, без необходимости выполнять этап постобработки, отличный от Spark.

У меня есть некоторые данные TSV в больших файлах, которые я передаю в RDD.

 val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => Test(x(0),x(1)))

testRdd.saveAsTextFile("test")

Это сохраняет данные, обернутые именем класса:

head -n 1 part-00000
Test("1969720fb3100608b38297aad8b3be93","active")

Я также пытался использовать его в безымянном классе (?) вместо класса case.

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => (x(0),x(1)))

testRdd.saveAsTextFile ("test2")

Это дает

("1969720fb3100608b38297aad8b3be93","active")

который все еще нуждается в постобработке, чтобы удалить обертывающие скобки.

Чтобы удалить символы переноса, я попробовал flatMap(), но RDD, по-видимому, имеет неправильный тип:

testRdd.flatMap(identity).saveAsTextFile("test3")
<console>:17: error: type mismatch;
 found   : ((String, String)) => (String, String)
 required: ((String, String)) => TraversableOnce[?]
              testRdd.flatMap(identity).saveAsTextFile("test3")

Итак... мне нужно преобразовать RDD в какой-либо другой тип RDD, или есть другой способ сохранить RDD как CSV, чтобы текст переноса был удален?

Спасибо!


person user2029783    schedule 06.05.2015    source источник


Ответы (4)


Вы можете попробовать следующее:

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id"))
                                 .map(x => x.toLowerCase.split('\t'))
                                 .map(x => x(0)+","+x(1))

Что мы слышали, так это то, что после фильтрации вашего заголовка вы можете использовать строчные буквы в том же отрывке карты, сохраняя при этом некоторые ненужные дополнительные сопоставления.

Это создаст RDD[String], который вы можете сохранить в формате CSV.

ПС:

  • Расширение вывода сохраненного rdd не csv, а формат!

  • Это не оптимальное и единственное решение, но оно сделает эту работу за вас!

person eliasah    schedule 06.05.2015
comment
Идеальный. Спасибо не только за ответ на вопрос, но и за указание на упрощение выполнения нескольких шагов в одном вызове map(). - person user2029783; 06.05.2015

val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => x(0)+","+x(1))

Это запишет вывод как csv

person None    schedule 06.05.2015

Вы можете ознакомиться с библиотекой Spark CSV.

person Laurent Magnin    schedule 08.02.2016

val logFile = "/input.csv"

val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "true")

val sc = новый SparkContext(master="local", appName="Mi app", conf)

val logData = sc.textFile(logFile, 2).cache()

значение ниже = logData.map (строка => строка.toLowerCase)

person Anxo P    schedule 20.05.2016