Hi everybody, I am trying to restart Spark Streaming application, which is reading from MQTT source using Apache Bahir. Here is the code:
val stream = spark.readStream .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") .option("topic", "topic") .load("tcp://localhost:1883") After that I am ingesting data into HDFS. The error occurs on Bahir side straight after the restart. I have added the stack trace of the error below. Does anybody has any idea what is the source of the error and how to fix this ? Any help is appreciated. Best, Michael Caused by: MqttException (0) - java.io.FileNotFoundException: /paho193809750754542-tcplocalhost1883/13.msg (No such file or directory) at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.get(MqttDefaultFilePersistence.java:218) at org.apache.bahir.sql.streaming.mqtt.LocalMessageStore.get(MessageStore.scala:81) at org.apache.bahir.sql.streaming.mqtt.LocalMessageStore.retrieve(MessageStore.scala:110) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$5.apply(MQTTStreamSource.scala:158) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$5.apply(MQTTStreamSource.scala:158) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:158) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:157) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:157) at scala.collection.immutable.Range.foreach(Range.scala:160) at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:157) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$2.apply(StreamExecution.scala:473) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$2.apply(StreamExecution.scala:469) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.StreamExecution.org $apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:469) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.StreamExecution.org $apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290) ... 1 more Caused by: java.io.FileNotFoundException: paho193809750754542-tcplocalhost1883/13.msg (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.get(MqttDefaultFilePersistence.java:207) ... 27 more