У меня есть следующий класс case:
case class Data[T](field1: String, field2: T)
Я использую сериализатор крио со следующими имплицитами:
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](implicit e1: Encoder[A1], e2: Encoder[A2]): Encoder[(A1, A2)] =
Encoders.tuple[A1, A2](e1, e2)
...
И я попытался сделать следующее соединение:
val ds1 = someDataframe1.as[(String, T)].map(row => Data(row._1, row._2))
val ds1 = someDataframe2.as[(String, T)].map(row => Data(row._1, row._2))
ds1.joinWith(ds2, col("field1") === col("field1"), "left_outer")
После этого я получил следующее исключение:
org.apache.spark.sql.AnalysisException: cannot resolve 'field1' given input columns: [value, value];
Что случилось с именами столбцов в моих наборах данных?
UPD: когда я позвонил ds1.schema
, я получил следующий вывод:
StructField(name = value,dataType = BinaryType, nullable = true)
Я думаю, что у меня проблема с сериализацией крио (нет метаданных схемы, класс case был сериализован как одно поле blob без имени). Также я заметил, что все работает хорошо, когда T
является крио-известным классом (Int, String) или классом case. Но когда T
- это какой-то Java-бин, я получаю свою схему набора данных в виде одного безымянного поля большого двоичного объекта.
Искра версии 1.6.1
col("_1.field1") === col("_2.field2)
в качестве условия присоединения? Таким образом, это работает для меня. - person Beryllium   schedule 19.09.2016