[jira] [Commented] (BAHIR-183) Using HDFS for saving message for mqtt source

2019-06-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870495#comment-16870495
 ] 

ASF GitHub Bot commented on BAHIR-183:
--

yanlin-Lynn commented on pull request #84: [BAHIR-183] [WIP] HDFS based MQTT 
client persistence
URL: https://github.com/apache/bahir/pull/84#discussion_r296471045
 
 

 ##
 File path: 
sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MessageStore.scala
 ##
 @@ -148,3 +141,113 @@ private[mqtt] class LocalMessageStore(val 
persistentStore: MqttClientPersistence
   }
 
 }
+
+private[mqtt] class HdfsMqttClientPersistence(config: Configuration)
+extends MqttClientPersistence {
+
+  var rootPath: Path = _
+  var fileSystem: FileSystem = _
+
+  override def open(clientId: String, serverURI: String): Unit = {
+try {
+  rootPath = new Path("mqtt/" + clientId + "/" + 
serverURI.replaceAll("[^a-zA-Z0-9]", "_"))
+  fileSystem = FileSystem.get(config)
+  if (!fileSystem.exists(rootPath)) {
+fileSystem.mkdirs(rootPath)
+  }
+}
+catch {
+  case e: Exception => throw new MqttPersistenceException(e)
+}
+  }
+
+  override def close(): Unit = {
+try {
+  fileSystem.close()
+}
+catch {
+  case e: Exception => throw new MqttPersistenceException(e)
+}
+  }
+
+  override def put(key: String, persistable: MqttPersistable): Unit = {
+try {
+  val path = getPath(key)
 
 Review comment:
   I just worry about the performance of creating a file for each coming 
message.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Using HDFS for saving message for mqtt source
> -
>
> Key: BAHIR-183
> URL: https://issues.apache.org/jira/browse/BAHIR-183
> Project: Bahir
>  Issue Type: Improvement
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.2.0
>Reporter: Wang Yanlin
>Assignee: Wang Yanlin
>Priority: Major
> Fix For: Spark-2.4.0
>
>
> Currently in spark-sql-streaming-mqtt, the received mqtt message is saved in 
> a local file by driver, this will have the risks of losing data for cluster 
> mode when application master failover occurs. So saving in-coming mqtt 
> messages using a director in checkpoint will solve this problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BAHIR-183) Using HDFS for saving message for mqtt source

2019-06-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870500#comment-16870500
 ] 

ASF GitHub Bot commented on BAHIR-183:
--

yanlin-Lynn commented on issue #84: [BAHIR-183] [WIP] HDFS based MQTT client 
persistence
URL: https://github.com/apache/bahir/pull/84#issuecomment-504732887
 
 
   And, I think we should better keep the ability to do flow-control, eg, 
control max number of message in each batch.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Using HDFS for saving message for mqtt source
> -
>
> Key: BAHIR-183
> URL: https://issues.apache.org/jira/browse/BAHIR-183
> Project: Bahir
>  Issue Type: Improvement
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.2.0
>Reporter: Wang Yanlin
>Assignee: Wang Yanlin
>Priority: Major
> Fix For: Spark-2.4.0
>
>
> Currently in spark-sql-streaming-mqtt, the received mqtt message is saved in 
> a local file by driver, this will have the risks of losing data for cluster 
> mode when application master failover occurs. So saving in-coming mqtt 
> messages using a director in checkpoint will solve this problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BAHIR-183) Using HDFS for saving message for mqtt source

2019-06-23 Thread Wang Yanlin (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16870492#comment-16870492
 ] 

Wang Yanlin commented on BAHIR-183:
---

hi,  [~lukasz.antoniak], for your concern of lost message, I have checked the 
method *messageArrived* in interface 
*org.eclipse.paho.client.mqttv3.MqttCallback*. The message any QoS 1 or 2 
message will be redelivered by the server if we throw exception before return. 
I save message data in hdfs back-end in *messageArrived* implementation, so I 
think we will not lost any message in this situtation.

> Using HDFS for saving message for mqtt source
> -
>
> Key: BAHIR-183
> URL: https://issues.apache.org/jira/browse/BAHIR-183
> Project: Bahir
>  Issue Type: Improvement
>  Components: Spark Structured Streaming Connectors
>Affects Versions: Spark-2.2.0
>Reporter: Wang Yanlin
>Assignee: Wang Yanlin
>Priority: Major
> Fix For: Spark-2.4.0
>
>
> Currently in spark-sql-streaming-mqtt, the received mqtt message is saved in 
> a local file by driver, this will have the risks of losing data for cluster 
> mode when application master failover occurs. So saving in-coming mqtt 
> messages using a director in checkpoint will solve this problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)