Spark Streaming IllegalStateException: этот потребитель уже закрыт

Итак, используя: - Spark Structured Streaming (2.1.0) - Kafka 0.10.2.0 - Scala 2.11

Я использую API по умолчанию от Kafka, поэтому в основном:

val df = spark.readStream
  .format("kafka")
  .option(...)

Настройка параметров (через SSL) и все такое. Затем я явно применяю несколько действий и т. д. и запускаю поток и т. д. (он работает правильно). Однако время от времени выдает исключение:

17/05/30 11:05:23 WARN TaskSetManager: Lost task 23.0 in stage 77.0 (TID 3329, spark-worker-3, executor 0): java.lang.IllegalStateException: This consumer has already been closed.
at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1611)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1622)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:278)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(CachedKafkaConsumer.scala:177)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:89)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:147)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:136)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:52)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Любые советы, почему это может не сработать?




Ответы (1)


https://issues.apache.org/jira/browse/SPARK-18682 исправлено при реализации пакетного исходного кода Kafka. Вы не должны видеть его в Spark 2.1.1. Если вы по-прежнему видите эту ошибку в Spark 2.1.1, создайте заявку Spark на https://issues.apache.org/jira/browse/SPARK

person zsxwing    schedule 30.05.2017
comment
Не могли бы вы проверить мое другое сообщение на почту? Заранее спасибо - person ppanero; 26.07.2017
comment
Привет еще раз, я все еще получаю это в 2.2.0. Я открыл вопрос, и он был отклонен, не могли бы вы помочь мне? немного блокирует для меня atm: issues.apache.org/jira/browse/SPARK- 21453 - person ppanero; 03.08.2017
comment
Я вижу это даже с 3.0.2. временами. В основном при повторной попытке задачи, которая была кэширована. - person Grigoriev Nick; 03.07.2021