Использование custom UDF withColumn в наборе данных Spark ‹Row›; java.lang.String нельзя преобразовать в org.apache.spark.sql.Row

У меня есть файл JSON, содержащий много полей. Я прочитал файл с помощью набора данных Spark в java.

  • Spark версии 2.2.0

  • Java JDK 1.8.0_121

Ниже приведен код.

SparkSession spark = SparkSession
              .builder()
              .appName("Java Spark SQL basic example")
              .config("spark.some.config.option", "some-value")
              .master("local")
              .getOrCreate();

Dataset<Row> df = spark.read().json("jsonfile.json");

Я хотел бы использовать функцию withColumn с настраиваемым UDF для добавления нового столбца.

UDF1 someudf = new UDF1<Row,String>(){
        public String call(Row fin) throws Exception{
            String some_str = fin.getAs("String");
            return some_str;
        }
    };
spark.udf().register( "some_udf", someudf, DataTypes.StringType );
df.withColumn( "procs", callUDF( "some_udf", col("columnx") ) ).show();

Я получаю сообщение об ошибке при выполнении приведенного выше кода. java.lang.String нельзя преобразовать в org.apache.spark.sql.Row

Вопросов:

1. Является ли считывание набора данных единственным вариантом? Я могу преобразовать df в df строк. но я не смогу выбирать поля.

2 - Пытался, но не смог определить определяемый пользователем тип данных. Мне не удалось зарегистрировать UDF с этим пользовательским типом UDDatatype. мне нужны здесь пользовательские типы данных?

3 - и главный вопрос, как я могу преобразовать String в Row?

Часть журнала скопирована ниже:

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
    at Risks.readcsv$1.call(readcsv.java:1)
    at org.apache.spark.sql.UDFRegistration$$anonfun$27.apply(UDFRegistration.scala:512)
        ... 16 more

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$27: (string) => string)

Мы будем благодарны за вашу помощь.


person valearner    schedule 25.08.2017    source источник


Ответы (2)


Вы получаете это исключение, потому что UDF будет выполняться с типом данных столбца, который не является Row. Представьте, что у нас есть Dataset<Row> ds, в котором есть два столбца col1 и col2, оба типа String. Теперь, если мы хотим преобразовать значение col2 в верхний регистр, используя UDF.

Мы можем зарегистрироваться и позвонить UDF, как показано ниже.

spark.udf().register("toUpper", toUpper, DataTypes.StringType);
ds.select(col("*"),callUDF("toUpper", col("col2"))).show();

Или используя withColumn

ds.withColumn("Upper",callUDF("toUpper", col("col2"))).show();

И UDF должно быть так, как показано ниже.

private static UDF1 toUpper = new UDF1<String, String>() {
    public String call(final String str) throws Exception {
        return str.toUpperCase();
    }
};
person abaghel    schedule 25.08.2017
comment
потрясающе, мне нужно внимательнее читать документы. Спасибо большое - person valearner; 25.08.2017
comment
У меня это не работает. UDF вообще не вызывается. - person Adam Arold; 20.06.2018
comment
@abaghel любые советы по этой проблеме UDF plz stackoverflow.com/questions/63935600/ - person BdEngineer; 17.09.2020

Улучшение того, что написал @abaghel. Если вы используете следующий импорт

import org.apache.spark.sql.functions;

При использовании withColumn код должен быть следующим:

ds.withColumn("Upper",functions.callUDF("toUpper", ds.col("col2"))).show();
person Batuhan Tüter    schedule 26.12.2018
comment
Как мы можем вызвать UDF для всех столбцов вместо определенного столбца? - person deals my; 24.08.2020
comment
@Batuhan Tüter любые советы по этой проблеме UDF, пожалуйста, stackoverflow.com/questions/63935600/ - person BdEngineer; 17.09.2020