Почему spark-submit не работает с AnalysisException: kafka не является допустимым источником данных Spark SQL?

Я использую Spark 2.1.0 с Kafka 0.10.2.1.

Я пишу приложение Spark, которое считывает наборы данных из темы Kafka.

Код выглядит следующим образом:

package com.example;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class MLP {

    public static void main(String[] args) {
        SparkSession spark = SparkSession
            .builder()
            .appName("MLP")
            .getOrCreate();

        Dataset<Row> df = spark
            .read()
            .format("kafka")
            .option("kafka.bootstrap.servers","localhost:9092,localhost:9093")
            .option("subscribe", "resultsTopic")
            .load();
        df.show();
        spark.stop();
    }
}

Мой сценарий развертывания выглядит следующим образом:

spark-submit \
  --verbose \
  --jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',') \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 \
  --class com.**** \
  --master (Spark Master URL) /path/to/jar 

Однако я получаю сообщение об ошибке:

Exception in thread "main" org.apache.spark.sql.AnalysisException:
kafka is not a valid Spark SQL Data Source.;

Я пытался использовать то же приложение с источником данных, отличным от Jafka, и фрейм данных создается правильно. Я также пытался использовать пряжу в режиме клиента, и я получаю ту же ошибку.




Ответы (2)


Kafka как источник данных для непотокового DataFrame — наборы данных будут доступны в Spark 2.2, ссылка в этом выпуске на Spark JIRA

Как упоминал @JacekLaskowski, измените пакет на (модифицированная версия Jacek для использования 2.2):

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

Более того, используйте readStream для чтения потока данных.

Вы не можете использовать show с потоковыми источниками данных, вместо этого используйте формат console.

StreamingQuery query = df.writeStream()
  .outputMode("append")
  .format("console")
  .start();

query.awaitTermination();

см. эту ссылку

person T. Gawęda    schedule 29.06.2017
comment
Я не минусовал вас :(. Я удалил df.show(); и включил запрос и включил исключение, которое будет выброшено в main(). Но теперь я получаю полный режим вывода, который не поддерживается, когда нет потоковых агрегаций в потоковом режиме. DataFrames/Datasets ;; kafka У меня настроен потребитель, который опрашивает тему, и он существует. - person Suf; 29.06.2017
comment
@Suf Вы хотите использовать набор потоковых данных или статический? Для статики необходимо обновить Spark до версии 2.2. - person T. Gawęda; 29.06.2017
comment
Проверьте режим append :) - person T. Gawęda; 29.06.2017

Прежде всего, вы должны заменить --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 (что, я сомневаюсь, работает) на следующее:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1

Я не думаю, что версия 2.10 когда-либо была доступна. Возможно, вы подумали о 2.1.0, который мог бы сработать, если бы вы использовали 2.1.0 (а не 2.10).

Во-вторых, удалите --jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ','), который Spark все равно загружает, за исключением некоторых дополнительных банок, таких как исходный код Kafka.

Это должно дать вам доступ к исходному формату kafka.

person Jacek Laskowski    schedule 29.06.2017
comment
Насколько я знаю, в Spark 2.1 нет источника Kafka для непотоковых наборов данных. - person T. Gawęda; 29.06.2017
comment
Да, см. issues.apache.org/jira/browse/SPARK-20036. . Так что частично ваш ответ правильный, частично мой ;) Однако кто-то считает, что мой ответ неверен o.O. - person T. Gawęda; 29.06.2017
comment
Ой. Верно! Извините за минус. Это был я. Исправлено. - person Jacek Laskowski; 29.06.2017