Scala java.lang.String не может быть преобразован в java.lang. Двойная ошибка при преобразовании кадра данных двойного типа в LabeledPoint в Spark

У меня есть набор данных переменных 2002 года. Все переменные числовые. Сначала я прочитал набор данных для Spark 1.5.0 и создал кадр данных Double Type, следуя инструкции здесь . Затем я преобразовал фрейм данных в LabeledPoint, следуя инструкциям здесь и здесь. Однако, когда я попытался распечатать образцы строк в сгенерированном LabeledPoint, я получил ошибку "java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double". Ниже приведен код Scala, который я использовал. Извините за длинный код, но я надеюсь, что это поможет отладке.

Может ли кто-нибудь сказать мне, откуда берется ошибка и как решить проблему? Большое спасибо за Вашу помощь!

Ниже приведен код Scala, который я использовал:

// Read in dataset but drop the header row
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val trainRDD = sc.textFile("train.txt").filter(line => !line.contains("target"))
// Read in header file to get column names. Store in an Array.
val dictFile = "header.txt"
var arrName = new Array[String](2002)
for (line <- Source.fromFile(dictFile).getLines) {
    arrName = line.split('\t').map(_.trim).toArray
}

// Create dataframe using programmatically specifying the schema method
// Encode schema in a string
var schemaString = arrName.mkString(" ")
// Import Row
import org.apache.spark.sql.Row
// Import RDD
import org.apache.spark.rdd.RDD
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType,LongType,FloatType,DoubleType}
// Generate the Double Type schema based on the string of schema
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, DoubleType, true)))
// Create rowRDD and convert String type to Double type
val arrVar = sc.broadcast(0 to 2001 toArray)
def createRowRDD(rdd:RDD[String], anArray:org.apache.spark.broadcast.Broadcast[Array[Int]]) : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = {
    val rowRDD = rdd.map(_.split("\t")).map(_.map({y => y.toDouble})).map(p => Row.fromSeq(anArray.value map p))
    return rowRDD
}
val rowRDDTrain = createRowRDD(trainRDD, arrVar)
// Apply the schema to the RDD.
val trainDF = sqlContext.createDataFrame(rowRDDTrain, schema)
trainDF.printSchema
// Verified all 2002 variables are in "double (nullable = true)" format

// Define toLabeledPoint( ) to convert dataframe to LabeledPoint format
// Reference: https://stackoverflow.com/questions/31638770/rdd-to-labeledpoint-conversion
def toLabeledPoint(dataDF:org.apache.spark.sql.DataFrame) : org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = {
    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.regression.LabeledPoint
    val targetInd = dataDF.columns.indexOf("target")
    val ignored = List("target")
    val featInd = dataDF.columns.diff(ignored).map(dataDF.columns.indexOf(_))
    val dataLP = dataDF.rdd.map(r => LabeledPoint(r.getDouble(targetInd),
     Vectors.dense(featInd.map(r.getDouble(_)).toArray)))
    return dataLP
}

// Create LabeledPoint from dataframe
val trainLP = toLabeledPoint(trainDF)
// Print out sammple rows in the generated LabeledPoint
trainLP.take(5).foreach(println)
// Failed: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double

Обновление:

Большое спасибо за комментарии Дэвида Гриффина и zero323 ниже. Дэвид прав. Я считаю, что исключение действительно вызвано значениями null в данных. Я заменил следующий исходный код:

 def createRowRDD(rdd:RDD[String], anArray:org.apache.spark.broadcast.Broadcast[Array[Int]]) : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = {
    val rowRDD = rdd.map(_.split("\t")).map(_.map({y => y.toDouble})).map(p => Row.fromSeq(anArray.value map p))
    return rowRDD
}

с этим, чтобы приписать значения null к 0,0, и тогда проблема исчезнет:

def createRowRDD(rdd:RDD[String], anArray:org.apache.spark.broadcast.Broadcast[Array[Int]]) : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = {
    val rowRDD = rdd.map(_.split("\t")).map(_.map({y => try {y.toDouble} catch {case _ : Throwable => 0.0}})).map(p => Row.fromSeq(anArray.value map p))
    return rowRDD
}

person machine_learner    schedule 16.03.2016    source источник
comment
Не могли бы вы сократить это до минимально воспроизводимого примера?   -  person zero323    schedule 16.03.2016
comment
У вас есть null значения в ваших данных? Если вы приведете строку null к Long, возможно, это вызовет проблему.   -  person David Griffin    schedule 16.03.2016
comment
Спасибо, Дэвид и zero323! Дэвид прав. Проблема вызвана значениями null в данных. Я обновил свой исходный пост, чтобы добавить решение для вменения значений null в 0,0. Эта проблема теперь решена так быстро благодаря вашей огромной помощи!   -  person machine_learner    schedule 16.03.2016