Неверные имена столбцов в наборе данных Spark

У меня есть следующий класс case:

case class Data[T](field1: String, field2: T)

Я использую сериализатор крио со следующими имплицитами:

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](implicit e1: Encoder[A1], e2: Encoder[A2]): Encoder[(A1, A2)] =
        Encoders.tuple[A1, A2](e1, e2)

...

И я попытался сделать следующее соединение:

val ds1 = someDataframe1.as[(String, T)].map(row => Data(row._1, row._2))
val ds1 = someDataframe2.as[(String, T)].map(row => Data(row._1, row._2))
ds1.joinWith(ds2, col("field1") === col("field1"), "left_outer")

После этого я получил следующее исключение:

org.apache.spark.sql.AnalysisException: cannot resolve 'field1' given input columns: [value, value];

Что случилось с именами столбцов в моих наборах данных?

UPD: когда я позвонил ds1.schema, я получил следующий вывод:

StructField(name = value,dataType = BinaryType, nullable = true)

Я думаю, что у меня проблема с сериализацией крио (нет метаданных схемы, класс case был сериализован как одно поле blob без имени). Также я заметил, что все работает хорошо, когда T является крио-известным классом (Int, String) или классом case. Но когда T - это какой-то Java-бин, я получаю свою схему набора данных в виде одного безымянного поля большого двоичного объекта.

Искра версии 1.6.1


person Cortwave    schedule 19.09.2016    source источник
comment
Я получаю другое сообщение об ошибке (версия Spark?). Во всяком случае, вы пробовали col("_1.field1") === col("_2.field2) в качестве условия присоединения? Таким образом, это работает для меня.   -  person Beryllium    schedule 19.09.2016
comment
@Beryllium проверьте обновленный вопрос, пожалуйста   -  person Cortwave    schedule 19.09.2016


Ответы (1)


Вы только что создали набор данных только с одним типом столбца Data, и вы не можете получить доступ к полям (fields1) внутри этого объекта.

dataframe: |value|
           |------
           |Data |
           |------
           |Data |
           |------

Вы можете попробовать это, чтобы преобразовать ваш фрейм данных в dataest: val ds1 = someDataframe1.as[Data]

dataframe: |field1|field2|
           |------
           |String|  T   |
           |------
           |String|  T   |
           |------

Или, если вы все еще хотите использовать свой фрейм данных, попробуйте изменить критерии поиска:

ds.joinWith(ds2, df.col("value").field1 === df2.col("value").field2)
person Charles Song    schedule 27.01.2017