Hi everybody,

I am trying to restart Spark Streaming application, which is reading from
MQTT source using Apache Bahir.
Here is the code:

val stream = spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", "topic")
  .load("tcp://localhost:1883")


After that I am ingesting data into HDFS.
The error occurs on Bahir side straight after the restart.
I have added the stack trace of the error below.

Does anybody has any idea what is the source of the error and how to fix
this ?

Any help is appreciated.

Best,
Michael



Caused by: MqttException (0) - java.io.FileNotFoundException:
/paho193809750754542-tcplocalhost1883/13.msg (No such file or directory)
at
org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.get(MqttDefaultFilePersistence.java:218)
at
org.apache.bahir.sql.streaming.mqtt.LocalMessageStore.get(MessageStore.scala:81)
at
org.apache.bahir.sql.streaming.mqtt.LocalMessageStore.retrieve(MessageStore.scala:110)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$5.apply(MQTTStreamSource.scala:158)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$5.apply(MQTTStreamSource.scala:158)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:158)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:157)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:157)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:157)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$2.apply(StreamExecution.scala:473)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$2.apply(StreamExecution.scala:469)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution.org
$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:469)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.StreamExecution.org
$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
... 1 more
Caused by: java.io.FileNotFoundException:
paho193809750754542-tcplocalhost1883/13.msg (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at
org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.get(MqttDefaultFilePersistence.java:207)
... 27 more

Reply via email to