Проблема со схемой соединителя структурированной потоковой передачи ApacheBahir в потоковой передаче ApacheSpark

Я пытаюсь подключить структурированный поток Apache Spark к теме MQTT (в данном случае IBM Watson IoT Platform на IBM Bluemix).

Я создаю структурированный поток следующим образом:

val df = spark.readStream 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("username","<username>")
    .option("password","<password>")
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
    .option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")

Пока все хорошо, в REPL я возвращаю этот объект df следующим образом:

df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]

Но если я начну читать из потока, используя эту строку:

val query = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Я получаю следующую ошибку:

scala> 17/02/03 07:32:23 ERROR StreamExecution: Query query-1
terminated with error java.lang.ClassCastException: scala.Tuple2
cannot be cast to scala.runtime.Nothing$    at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:156)
    at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:156)
    at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)  at
scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633)    at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:156)
    at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:155)
    at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:155)
    at scala.collection.immutable.Range.foreach(Range.scala:160)    at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:155)
    at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:332)
    at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:329)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)  at
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)  at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)  at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
    at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
    at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:329)
    at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:194)
    at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
    at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:184)
    at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:120)
17/02/03 07:32:24 WARN MQTTTextStreamSource: Connection to mqtt server
lost. Connection lost (32109) - java.io.EOFException    at
org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
    at java.lang.Thread.run(Thread.java:745) Caused by:
java.io.EOFException    at
java.io.DataInputStream.readByte(DataInputStream.java:267)  at
org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
    at
org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
    ... 1 more 17/02/03 07:32:28 WARN MQTTTextStreamSource: Connection to
mqtt server lost.

Моя интуиция подсказывает, что со схемой что-то не так, поэтому я добавил:

import org.apache.spark.sql.types._ val 
schema = StructType(
    StructField("count",LongType,true)::
    StructField("flowrate",LongType,true)::
    StructField("fluidlevel",StringType,true)::
    StructField("frequency",LongType,true)::
    StructField("hardness",LongType,true)::
    StructField("speed",LongType,true)::
    StructField("temperature",LongType,true)::
    StructField("ts",LongType,true)::
    StructField("voltage",LongType,true):: Nil)

val df = spark.readStream 
    .schema(schema)
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("username","<username>")
    .option("password","<password>")
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
    .option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")

Но это не помогает, есть идеи?


person Romeo Kienzler    schedule 03.02.2017    source источник
comment
Мне кажется, что это проблема версии. Какую версию MQTT и Spark вы используете?   -  person Yuval Itzchakov    schedule 03.02.2017
comment
spark-2.0.0-bin-hadoop2.7, Watson IoT использует MQTT V3.1.1 imho   -  person Romeo Kienzler    schedule 03.02.2017
comment
Похоже, вы опубликовали свое имя пользователя и пароль. Убедитесь, что вы немедленно отозвали эти учетные данные, так как теперь ими может пользоваться кто угодно.   -  person hardillb    schedule 03.02.2017
comment
Я изменил PW : прежде чем я опубликовал :)   -  person Romeo Kienzler    schedule 03.02.2017
comment
теперь попробовал это с mosquitto, это работает - так что, похоже, это проблема с брокером IBM Watson IoT ... есть идеи, как отлаживать?   -  person Romeo Kienzler    schedule 03.02.2017


Ответы (1)


Кажется, ваша проблема в том, что вы повторно используете один и тот же идентификатор клиента для последующих подключений.

Closing TCP connection:   ClientID="a:vy0z2s:a-vy0z2s-xxxxxxxxxx" Protocol=mqtt4-tcp Endpoint="mqtt"   RC=288 Reason="The client ID was reused."  

Для одного clientID допускается только одно уникальное соединение; у вас не может быть двух одновременных подключений с использованием одного и того же идентификатора.

Проверьте идентификатор клиента и убедитесь, что несколько экземпляров одного и того же приложения используют уникальный идентификатор клиента. Приложения могут использовать один и тот же ключ API, но MQTT требует, чтобы идентификатор клиента всегда был уникальным.

person ValerieLampkin    schedule 03.02.2017