Spark: как преобразовать RDD в Seq для использования в конвейере

Я хочу использовать реализацию конвейера в MLlib. Раньше у меня был файл RDD, и я передал его при создании модели, но теперь, чтобы использовать конвейер, должна быть последовательность LabeledDocument для передачи в конвейер.

У меня есть RDD, который создается следующим образом:

val data = sc.textFile("/test.csv");
val parsedData = data.map { line =>
        val parts = line.split(',')
        LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail))
        }.cache()

В примере конвейера Руководство по программированию Spark конвейеру необходимы следующие данные:

// Prepare training documents, which are labeled.
val training = sparkContext.parallelize(Seq(
  LabeledDocument(0L, "a b c d e spark", 1.0),
  LabeledDocument(1L, "b d", 0.0),
  LabeledDocument(2L, "spark f g h", 1.0),
  LabeledDocument(3L, "hadoop mapreduce", 0.0),
  LabeledDocument(4L, "b spark who", 1.0),
  LabeledDocument(5L, "g d a y", 0.0),
  LabeledDocument(6L, "spark fly", 1.0),
  LabeledDocument(7L, "was mapreduce", 0.0),
  LabeledDocument(8L, "e spark program", 1.0),
  LabeledDocument(9L, "a e c l", 0.0),
  LabeledDocument(10L, "spark compile", 1.0),
  LabeledDocument(11L, "hadoop software", 0.0)))

Мне нужен способ изменить мой RDD (parsedData) на последовательность LabeledDocuments (например, обучение в примере).

Я ценю вашу помощь.


person Mohammad    schedule 19.06.2015    source источник


Ответы (2)


Я нашел ответ на этот вопрос.

Я могу преобразовать свой RDD (parsedData) в SchemaRDD, который представляет собой последовательность LabeledDocuments, с помощью следующего кода:

val rddSchema = parsedData.toSchemaRDD;

Теперь проблема изменилась! Я хочу разделить новую rddSchema на обучение (80%) и тестирование (20%). Если я использую randomSplit, он возвращает Array[RDD[Row]] вместо SchemaRDD.

Новая проблема: как преобразовать Array[RDD[Row]] в SchemaRDD -- ИЛИ -- как разделить SchemaRDD, в результате чего быть SchemaRDD?

person Mohammad    schedule 19.06.2015
comment
Рад, что вы нашли ответ, возможно, вы захотите принять его как окончательный ответ. Если у вас есть какие-либо другие вопросы, вы также можете создать новую тему, так как не рекомендуется задавать несколько вопросов одновременно. - person Mikel Urkia; 19.06.2015

Я пробовал следовать в pyspark-

def myFunc(s):
    # words = s.split(",")
    s = re.sub("\"", "", s)
    words = [s for s in s.split(",")]
    val = words[0]
    lbl = 0.0
    if val == 4 or val == "4":
        lbl = 0.0
    elif val == 0 or val == "0":
        lbl = 1.0

    cleanlbl = cleanLine(words[5], True, val)
    # print "cleanlblcleanlbl ",cleanlbl
    return LabeledPoint(lbl, htf.transform(cleanlbl.split(" ")))


sparseList = sc.textFile("hdfs:///stats/training.1600000.processed.noemoticon.csv").map(myFunc)

sparseList.cache()  # Cache data since Logistic Regression is an iterative algorithm.


# for data in dataset:
trainfeats, testfeats = sparseList.randomSplit([0.8, 0.2], 10)

Вы можете разделить при анализе данных, вы можете взломать и изменить в соответствии с вашими потребностями

person Abhishek Choudhary    schedule 19.06.2015
comment
myFunc возвращает те же данные, что и parsedData (пока без проблем). Затем randomSplit возвращает Array[RDD[Row]] вместо SchemaRDD. это моя проблема! - person Mohammad; 19.06.2015