Сопоставление UUID в соединителе Spark Cassandra

У меня есть следующий код для сохранения RDD в кассандре:

 JavaRDD<UserByID> mapped = ......

CassandraJavaUtil.javaFunctions(mapped)
.writerBuilder("mykeyspace", "user_by_id", mapToRow(UserByID.class)).saveToCassandra();

И UserByID - это обычный сериализуемый POJO со следующей переменной с геттерами и сеттерами

private UUID userid;

Таблица Cassandra имеет точно такие же имена переменных класса UserByID, а идентификатор пользователя имеет тип uuid в таблице Cassandra, я успешно загружаю данные из таблицы, используя то же сопоставление классов.

CassandraJavaRDD<UserByID> UserByIDRDD = javaFunctions(spark)
 .cassandraTable("mykeyspace", "user_by_id", mapRowTo(UserByID.class));

однако, когда я вызываю функцию saveToCassandra выше, я получаю следующее исключение:

org.apache.spark.SparkException: Job aborted due to stage failure: Task
0 in stage 227.0 failed 1 times, most recent failure: Lost task 0.0
in stage 227.0 (TID 12721, localhost, executor driver): 
java.lang.IllegalArgumentException: 
The value (4e22e71a-a387-4de8-baf1-0ef6e65fe33e) of the type 
(java.util.UUID) cannot be converted to 
struct<leastSignificantBits:bigint,mostSignificantBits:bigint> 

Чтобы решить эту проблему, я зарегистрировал кодек UUID, но это не помогло, я использую spark-cassandra-connector_2.11 версию 2.4.0 и ту же версию для spark-core_2.11. Есть ли предложения?

моя ссылка находится здесь, но там нет Пример Java UUID, ваша помощь приветствуется.


person Hasson    schedule 12.12.2018    source источник


Ответы (1)


Это действительно странная ошибка - она ​​отлично работает с соединителем 2.4.0 и Spark 2.2.1 со следующим примером:

Определение таблицы:

CREATE TABLE test.utest (
    id int PRIMARY KEY,
    u uuid
);

Класс POJO:

public class UUIDData {
    private UUID u;
    private int id;
    ...
    // getters/setters
};

Spark job:

public static void main(String[] args) {
    SparkSession spark = SparkSession
            .builder()
            .appName("UUIDTest")
            .getOrCreate();

    CassandraJavaRDD<UUIDData> uuids = javaFunctions(spark.sparkContext())
            .cassandraTable("test", "utest", mapRowTo(UUIDData.class));

    JavaRDD<UUIDData> uuids2 = uuids.map(x -> new UUIDData(x.getId() + 10, x.getU()));

    CassandraJavaUtil.javaFunctions(uuids2)
            .writerBuilder("test", "utest", mapToRow(UUIDData.class))
            .saveToCassandra();
}

Я заметил, что в вашем коде вы используете функции mapRowTo и mapToRow без вызова .class в POJO - вы уверены, что ваш код скомпилирован и вы не запускаете старую версию кода?

person Alex Ott    schedule 28.12.2018
comment
Спасибо за ваш ответ, у меня уже есть .class, но я забыл добавить их здесь, в вопросе, поскольку я изменил имя класса с того, что у меня есть на самом деле. Я все равно попробую. - person Hasson; 01.01.2019