Я пытаюсь удалить текст класса или массива обертки из данных CSV, сохраненных с помощью saveAsTextFile, без необходимости выполнять этап постобработки, отличный от Spark.
У меня есть некоторые данные TSV в больших файлах, которые я передаю в RDD.
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => Test(x(0),x(1)))
testRdd.saveAsTextFile("test")
Это сохраняет данные, обернутые именем класса:
head -n 1 part-00000
Test("1969720fb3100608b38297aad8b3be93","active")
Я также пытался использовать его в безымянном классе (?) вместо класса case.
val testRdd = sc.textFile(_input).filter(!_.startsWith("unique_transaction_id")).map(x => x.toLowerCase).map(x => x.split('\t')).map(x => (x(0),x(1)))
testRdd.saveAsTextFile ("test2")
Это дает
("1969720fb3100608b38297aad8b3be93","active")
который все еще нуждается в постобработке, чтобы удалить обертывающие скобки.
Чтобы удалить символы переноса, я попробовал flatMap(), но RDD, по-видимому, имеет неправильный тип:
testRdd.flatMap(identity).saveAsTextFile("test3")
<console>:17: error: type mismatch;
found : ((String, String)) => (String, String)
required: ((String, String)) => TraversableOnce[?]
testRdd.flatMap(identity).saveAsTextFile("test3")
Итак... мне нужно преобразовать RDD в какой-либо другой тип RDD, или есть другой способ сохранить RDD как CSV, чтобы текст переноса был удален?
Спасибо!