Как сопоставить имена файлов с RDD с помощью sc.textFile(s3n://bucket/*.csv)?

Обратите внимание, я должен использовать файл sc.textFile, но я приму любые другие ответы.

Что я хочу сделать, так это просто добавить имя файла, который обрабатывается в RDD... что-то вроде:

var rdd = sc.textFile("s3n://bucket/*.csv").map(line=>filename+","+line)

Очень признателен!

EDIT2: РЕШЕНИЕ ДЛЯ EDIT1 — использовать Hadoop 2.4 или выше. Однако я не проверял это с помощью ведомых устройств... и т. д. Однако некоторые из упомянутых решений работают только для небольших наборов данных. Если вы хотите использовать большие данные, вам придется использовать HadoopRDD.

РЕДАКТИРОВАТЬ: я пробовал следующее, и это не сработало:

:cp symjar/aws-java-sdk-1.9.29.jar
:cp symjar/aws-java-sdk-flow-build-tools-1.9.29.jar

import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{S3ObjectSummary, ObjectListing, GetObjectRequest}
import com.amazonaws.auth._


def awsAccessKeyId = "AKEY"
def awsSecretAccessKey = "SKEY"

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
hadoopConf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)

var rdd = sc.wholeTextFiles("s3n://bucket/dir/*.csv").map { case (filename, content) =>  filename }
rdd.count

ПРИМЕЧАНИЕ. Он подключается к S3, и это не проблема (поскольку я проверял это много раз).

Ошибка, которую я получаю:

INFO input.FileInputFormat: Total input paths to process : 4
java.io.FileNotFoundException: File does not exist: /RTLM-918/simple/t1-100.csv
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:489)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)
    at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
    at $iwC$$iwC$$iwC.<init>(<console>:46)
    at $iwC$$iwC.<init>(<console>:48)
    at $iwC.<init>(<console>:50)
    at <init>(<console>:52)
    at .<init>(<console>:56)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

person 3xCh1_23    schedule 08.04.2015    source источник
comment
Не могли бы вы написать себе собственный InputFormat?   -  person Alister Lee    schedule 09.04.2015


Ответы (3)


Единственный текстовый метод, включающий имя файла, — wholeTextFiles.

sc.wholeTextFiles(path).map { case (filename, content) => ... }
person Marius Soutier    schedule 08.04.2015
comment
Я пробовал это, и он сказал мне, что файл не найден. Возможно, это работает только на локальном хосте. - person 3xCh1_23; 08.04.2015
comment
Я только что попробовал. (Spark 1.3.0, предварительно созданный для Hadoop 2.4, с путем s3n, включая ключ/пароль.) Он работает нормально. Это правильный ответ. Дамир, можешь показать, как ты пытаешься использовать wholeTextFiles? Одна из возможных проблем заключается в том, что ваш пароль S3 содержит не буквенно-цифровые символы. (Не указывайте свой пароль, но укажите, содержит ли он такие символы.) - person Daniel Darabos; 09.04.2015
comment
Я проверил это на простом примере из оболочки, и он не работает. Пожалуйста, смотрите отредактированный (исходный) пост :: РЕДАКТИРОВАТЬ: он также не работает без подстановочных знаков. - person 3xCh1_23; 09.04.2015

Просто передайте его как переменную во время карты (или установите как свойство объекта).

sc.textFile("s3n://bucket/"+fn+".csv").map(line=>set_filepath(line,fn))

person Myles Baker    schedule 08.04.2015
comment
Допустим, я не знаю, что такое файлы в ведре, поэтому я не могу определить str(fn). Это то, что я имел в виду с вопросом, и причина, по которой я включил *.csv. Как бы я это сделал тогда и какой именно будет str def ? - person 3xCh1_23; 08.04.2015
comment
Просто получите содержимое каталога и примените к нему логику. safaribooksonline.com/library/view/scala-cookbook/9781449340292/ - person Myles Baker; 08.04.2015
comment
У меня была та же идея, но должен быть другой способ ... возможно, что-то, что сказало бы нам, какое имя файла обрабатывает исполнитель или предназначен для обработки? - person 3xCh1_23; 08.04.2015
comment
Я не думаю, что вы можете сделать это без определения собственного подкласса HadoopRDD. - person Myles Baker; 08.04.2015

Если вы имеете дело с большими данными, то вам подойдет HadoopRDD. В противном случае, с другими предложениями, это не сработает.

Код:

val text = sc.hadoopFile("s3n://.../", classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
// Cast to a HadoopRDD
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]

val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) ⇒
  val file = inputSplit.asInstanceOf[FileSplit]
  iterator.map { tpl ⇒ (file.getPath, tpl._2) }
}
person 3xCh1_23    schedule 08.04.2015