[jira] [Commented] (BAHIR-183) Using HDFS for saving message for mqtt source
[ https://issues.apache.org/jira/browse/BAHIR-183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870495#comment-16870495 ] ASF GitHub Bot commented on BAHIR-183: -- yanlin-Lynn commented on pull request #84: [BAHIR-183] [WIP] HDFS based MQTT client persistence URL: https://github.com/apache/bahir/pull/84#discussion_r296471045 ## File path: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala ## @@ -148,3 +141,113 @@ private[mqtt] class LocalMessageStore(val persistentStore: MqttClientPersistence } } + +private[mqtt] class HdfsMqttClientPersistence(config: Configuration) +extends MqttClientPersistence { + + var rootPath: Path = _ + var fileSystem: FileSystem = _ + + override def open(clientId: String, serverURI: String): Unit = { +try { + rootPath = new Path("mqtt/" + clientId + "/" + serverURI.replaceAll("[^a-zA-Z0-9]", "_")) + fileSystem = FileSystem.get(config) + if (!fileSystem.exists(rootPath)) { +fileSystem.mkdirs(rootPath) + } +} +catch { + case e: Exception => throw new MqttPersistenceException(e) +} + } + + override def close(): Unit = { +try { + fileSystem.close() +} +catch { + case e: Exception => throw new MqttPersistenceException(e) +} + } + + override def put(key: String, persistable: MqttPersistable): Unit = { +try { + val path = getPath(key) Review comment: I just worry about the performance of creating a file for each coming message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Using HDFS for saving message for mqtt source > - > > Key: BAHIR-183 > URL: https://issues.apache.org/jira/browse/BAHIR-183 > Project: Bahir > Issue Type: Improvement > Components: Spark Structured Streaming Connectors >Affects Versions: Spark-2.2.0 >Reporter: Wang Yanlin >Assignee: Wang Yanlin >Priority: Major > Fix For: Spark-2.4.0 > > > Currently in spark-sql-streaming-mqtt, the received mqtt message is saved in > a local file by driver, this will have the risks of losing data for cluster > mode when application master failover occurs. So saving in-coming mqtt > messages using a director in checkpoint will solve this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BAHIR-183) Using HDFS for saving message for mqtt source
[ https://issues.apache.org/jira/browse/BAHIR-183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870500#comment-16870500 ] ASF GitHub Bot commented on BAHIR-183: -- yanlin-Lynn commented on issue #84: [BAHIR-183] [WIP] HDFS based MQTT client persistence URL: https://github.com/apache/bahir/pull/84#issuecomment-504732887 And, I think we should better keep the ability to do flow-control, eg, control max number of message in each batch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Using HDFS for saving message for mqtt source > - > > Key: BAHIR-183 > URL: https://issues.apache.org/jira/browse/BAHIR-183 > Project: Bahir > Issue Type: Improvement > Components: Spark Structured Streaming Connectors >Affects Versions: Spark-2.2.0 >Reporter: Wang Yanlin >Assignee: Wang Yanlin >Priority: Major > Fix For: Spark-2.4.0 > > > Currently in spark-sql-streaming-mqtt, the received mqtt message is saved in > a local file by driver, this will have the risks of losing data for cluster > mode when application master failover occurs. So saving in-coming mqtt > messages using a director in checkpoint will solve this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BAHIR-183) Using HDFS for saving message for mqtt source
[ https://issues.apache.org/jira/browse/BAHIR-183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870492#comment-16870492 ] Wang Yanlin commented on BAHIR-183: --- hi, [~lukasz.antoniak], for your concern of lost message, I have checked the method *messageArrived* in interface *org.eclipse.paho.client.mqttv3.MqttCallback*. The message any QoS 1 or 2 message will be redelivered by the server if we throw exception before return. I save message data in hdfs back-end in *messageArrived* implementation, so I think we will not lost any message in this situtation. > Using HDFS for saving message for mqtt source > - > > Key: BAHIR-183 > URL: https://issues.apache.org/jira/browse/BAHIR-183 > Project: Bahir > Issue Type: Improvement > Components: Spark Structured Streaming Connectors >Affects Versions: Spark-2.2.0 >Reporter: Wang Yanlin >Assignee: Wang Yanlin >Priority: Major > Fix For: Spark-2.4.0 > > > Currently in spark-sql-streaming-mqtt, the received mqtt message is saved in > a local file by driver, this will have the risks of losing data for cluster > mode when application master failover occurs. So saving in-coming mqtt > messages using a director in checkpoint will solve this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)