Я использую Spark 2.1.0 с Kafka 0.10.2.1.
Я пишу приложение Spark, которое считывает наборы данных из темы Kafka.
Код выглядит следующим образом:
package com.example;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class MLP {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("MLP")
.getOrCreate();
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092,localhost:9093")
.option("subscribe", "resultsTopic")
.load();
df.show();
spark.stop();
}
}
Мой сценарий развертывания выглядит следующим образом:
spark-submit \
--verbose \
--jars${echo /home/hduser1/spark/jars/*.jar | tr ' ' ',') \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.10 \
--class com.**** \
--master (Spark Master URL) /path/to/jar
Однако я получаю сообщение об ошибке:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
kafka is not a valid Spark SQL Data Source.;
Я пытался использовать то же приложение с источником данных, отличным от Jafka, и фрейм данных создается правильно. Я также пытался использовать пряжу в режиме клиента, и я получаю ту же ошибку.