[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15407153#comment-15407153
 ] 

ASF GitHub Bot commented on BAHIR-39:
-

Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/bahir/pull/13#discussion_r73461457
  
--- Diff: 
sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.mqtt
+
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.Calendar
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.bahir.utils.Logging
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, 
MqttDefaultFilePersistence}
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, 
Source}
+import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object MQTTStreamConstants {
+
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss")
+
+  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
+:: StructField("timestamp", TimestampType) :: Nil)
+}
+
+class MQTTTextStreamSource(brokerUrl: String, persistence: 
MqttClientPersistence,
+topic: String, clientId: String, messageParser: Array[Byte] => 
(String, Timestamp),
+sqlContext: SQLContext) extends Source with Logging {
+
+  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
+
+  private val store = new LocalMessageStore(persistence, 
sqlContext.sparkContext.getConf)
+
+  private val messages = new TrieMap[Int, (String, Timestamp)]
+
+  private val initLock = new CountDownLatch(1)
+
+  private var offset = 0
+
+  private var client: MqttClient = _
+
+  private def fetchLastProcessedOffset(): Int = {
+Try(store.maxProcessedOffset) match {
+  case Success(x) =>
+log.info(s"Recovering from last stored offset $x")
+x
+  case Failure(e) => 0
+}
+  }
+
+  initialize()
+  private def initialize(): Unit = {
+
+client = new MqttClient(brokerUrl, clientId, persistence)
+val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
+mqttConnectOptions.setAutomaticReconnect(true)
+// This is required to support recovery. TODO: configurable ?
+mqttConnectOptions.setCleanSession(false)
+
+val callback = new MqttCallbackExtended() {
+
+  override def messageArrived(topic_ : String, message: MqttMessage): 
Unit = synchronized {
+initLock.await() // Wait for initialization to complete.
+val temp = offset + 1
+messages.put(temp, messageParser(message.getPayload))
+offset = temp
+log.trace(s"Message arrived, $topic_ $message")
+  }
+
+  override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
+  }
+
+  override def connectionLost(cause: Throwable): Unit = {
+log.warn("Connection to mqtt server lost.", cause)
+  }
+
+  override def connectComplete(reconnect: Boolean, serverURI: String): 
Unit = {
+log.info(s"Connect complete $serverURI. Is it a reconnect?: 
$reconnect")
+  }
+}
+client.setCallback(callback)
+client.connect(mqttConnectOptions)
+client.subscribe(topic)
+// It is not possible to initialize offset without `client.connect`
+offset = fetchLastProcessedOffset()
--- End diff --

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

2016-08-03 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/bahir/pull/13#discussion_r73461457
  
--- Diff: 
sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.mqtt
+
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.Calendar
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.bahir.utils.Logging
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, 
MqttDefaultFilePersistence}
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, 
Source}
+import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object MQTTStreamConstants {
+
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss")
+
+  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
+:: StructField("timestamp", TimestampType) :: Nil)
+}
+
+class MQTTTextStreamSource(brokerUrl: String, persistence: 
MqttClientPersistence,
+topic: String, clientId: String, messageParser: Array[Byte] => 
(String, Timestamp),
+sqlContext: SQLContext) extends Source with Logging {
+
+  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
+
+  private val store = new LocalMessageStore(persistence, 
sqlContext.sparkContext.getConf)
+
+  private val messages = new TrieMap[Int, (String, Timestamp)]
+
+  private val initLock = new CountDownLatch(1)
+
+  private var offset = 0
+
+  private var client: MqttClient = _
+
+  private def fetchLastProcessedOffset(): Int = {
+Try(store.maxProcessedOffset) match {
+  case Success(x) =>
+log.info(s"Recovering from last stored offset $x")
+x
+  case Failure(e) => 0
+}
+  }
+
+  initialize()
+  private def initialize(): Unit = {
+
+client = new MqttClient(brokerUrl, clientId, persistence)
+val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
+mqttConnectOptions.setAutomaticReconnect(true)
+// This is required to support recovery. TODO: configurable ?
+mqttConnectOptions.setCleanSession(false)
+
+val callback = new MqttCallbackExtended() {
+
+  override def messageArrived(topic_ : String, message: MqttMessage): 
Unit = synchronized {
+initLock.await() // Wait for initialization to complete.
+val temp = offset + 1
+messages.put(temp, messageParser(message.getPayload))
+offset = temp
+log.trace(s"Message arrived, $topic_ $message")
+  }
+
+  override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
+  }
+
+  override def connectionLost(cause: Throwable): Unit = {
+log.warn("Connection to mqtt server lost.", cause)
+  }
+
+  override def connectComplete(reconnect: Boolean, serverURI: String): 
Unit = {
+log.info(s"Connect complete $serverURI. Is it a reconnect?: 
$reconnect")
+  }
+}
+client.setCallback(callback)
+client.connect(mqttConnectOptions)
+client.subscribe(topic)
+// It is not possible to initialize offset without `client.connect`
+offset = fetchLastProcessedOffset()
--- End diff --

Hi Fred, offset is initialized here. Because it can only be fetched once we 
have connected the client. This also the reason, I am holding an `initlock`.


---
If your project is set up for it, you can reply to this email and 

[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406625#comment-15406625
 ] 

ASF GitHub Bot commented on BAHIR-39:
-

Github user frreiss commented on a diff in the pull request:

https://github.com/apache/bahir/pull/13#discussion_r73422375
  
--- Diff: 
sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.mqtt
+
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.Calendar
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.bahir.utils.Logging
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, 
MqttDefaultFilePersistence}
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, 
Source}
+import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object MQTTStreamConstants {
+
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss")
+
+  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
+:: StructField("timestamp", TimestampType) :: Nil)
+}
+
+class MQTTTextStreamSource(brokerUrl: String, persistence: 
MqttClientPersistence,
+topic: String, clientId: String, messageParser: Array[Byte] => 
(String, Timestamp),
+sqlContext: SQLContext) extends Source with Logging {
+
+  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
+
+  private val store = new LocalMessageStore(persistence, 
sqlContext.sparkContext.getConf)
+
+  private val messages = new TrieMap[Int, (String, Timestamp)]
+
+  private val initLock = new CountDownLatch(1)
+
+  private var offset = 0
+
+  private var client: MqttClient = _
+
+  private def fetchLastProcessedOffset(): Int = {
+Try(store.maxProcessedOffset) match {
+  case Success(x) =>
+log.info(s"Recovering from last stored offset $x")
+x
+  case Failure(e) => 0
+}
+  }
+
+  initialize()
+  private def initialize(): Unit = {
+
+client = new MqttClient(brokerUrl, clientId, persistence)
+val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
+mqttConnectOptions.setAutomaticReconnect(true)
+// This is required to support recovery. TODO: configurable ?
+mqttConnectOptions.setCleanSession(false)
+
+val callback = new MqttCallbackExtended() {
+
+  override def messageArrived(topic_ : String, message: MqttMessage): 
Unit = synchronized {
+initLock.await() // Wait for initialization to complete.
+val temp = offset + 1
+messages.put(temp, messageParser(message.getPayload))
+offset = temp
+log.trace(s"Message arrived, $topic_ $message")
+  }
+
+  override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
--- End diff --

I feel like this callback probably needs to do something to ensure that 
there are no duplicate or out of order messages in the message buffer. What is 
the interaction between these two callbacks (messageArrived/deliveryComplete) 
and the different QoS levels in MQTT?


> MQTT as a streaming source for SQL Streaming.
> -
>
> Key: BAHIR-39
> URL: https://issues.apache.org/jira/browse/BAHIR-39
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark SQL Data Sources
>Affects Versions: 

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

2016-08-03 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/bahir/pull/13#discussion_r73422375
  
--- Diff: 
sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.mqtt
+
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.Calendar
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.bahir.utils.Logging
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, 
MqttDefaultFilePersistence}
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, 
Source}
+import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object MQTTStreamConstants {
+
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss")
+
+  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
+:: StructField("timestamp", TimestampType) :: Nil)
+}
+
+class MQTTTextStreamSource(brokerUrl: String, persistence: 
MqttClientPersistence,
+topic: String, clientId: String, messageParser: Array[Byte] => 
(String, Timestamp),
+sqlContext: SQLContext) extends Source with Logging {
+
+  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
+
+  private val store = new LocalMessageStore(persistence, 
sqlContext.sparkContext.getConf)
+
+  private val messages = new TrieMap[Int, (String, Timestamp)]
+
+  private val initLock = new CountDownLatch(1)
+
+  private var offset = 0
+
+  private var client: MqttClient = _
+
+  private def fetchLastProcessedOffset(): Int = {
+Try(store.maxProcessedOffset) match {
+  case Success(x) =>
+log.info(s"Recovering from last stored offset $x")
+x
+  case Failure(e) => 0
+}
+  }
+
+  initialize()
+  private def initialize(): Unit = {
+
+client = new MqttClient(brokerUrl, clientId, persistence)
+val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
+mqttConnectOptions.setAutomaticReconnect(true)
+// This is required to support recovery. TODO: configurable ?
+mqttConnectOptions.setCleanSession(false)
+
+val callback = new MqttCallbackExtended() {
+
+  override def messageArrived(topic_ : String, message: MqttMessage): 
Unit = synchronized {
+initLock.await() // Wait for initialization to complete.
+val temp = offset + 1
+messages.put(temp, messageParser(message.getPayload))
+offset = temp
+log.trace(s"Message arrived, $topic_ $message")
+  }
+
+  override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
--- End diff --

I feel like this callback probably needs to do something to ensure that 
there are no duplicate or out of order messages in the message buffer. What is 
the interaction between these two callbacks (messageArrived/deliveryComplete) 
and the different QoS levels in MQTT?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

2016-08-03 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/bahir/pull/13#discussion_r73422074
  
--- Diff: 
sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.mqtt
+
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.Calendar
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.bahir.utils.Logging
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, 
MqttDefaultFilePersistence}
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, 
Source}
+import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object MQTTStreamConstants {
+
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss")
+
+  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
+:: StructField("timestamp", TimestampType) :: Nil)
+}
+
+class MQTTTextStreamSource(brokerUrl: String, persistence: 
MqttClientPersistence,
+topic: String, clientId: String, messageParser: Array[Byte] => 
(String, Timestamp),
+sqlContext: SQLContext) extends Source with Logging {
+
+  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
+
+  private val store = new LocalMessageStore(persistence, 
sqlContext.sparkContext.getConf)
+
+  private val messages = new TrieMap[Int, (String, Timestamp)]
+
+  private val initLock = new CountDownLatch(1)
+
+  private var offset = 0
+
+  private var client: MqttClient = _
+
+  private def fetchLastProcessedOffset(): Int = {
+Try(store.maxProcessedOffset) match {
+  case Success(x) =>
+log.info(s"Recovering from last stored offset $x")
+x
+  case Failure(e) => 0
+}
+  }
+
+  initialize()
+  private def initialize(): Unit = {
+
+client = new MqttClient(brokerUrl, clientId, persistence)
+val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
+mqttConnectOptions.setAutomaticReconnect(true)
+// This is required to support recovery. TODO: configurable ?
+mqttConnectOptions.setCleanSession(false)
+
+val callback = new MqttCallbackExtended() {
+
+  override def messageArrived(topic_ : String, message: MqttMessage): 
Unit = synchronized {
--- End diff --

Does MQTT guarantee that this method will be called exactly once, even if 
the client or server crashes and is restarted?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406522#comment-15406522
 ] 

ASF GitHub Bot commented on BAHIR-39:
-

Github user lresende commented on the issue:

https://github.com/apache/bahir/pull/13
  
@ScrapCodes I added some comments, they should be easy to address. Also, I 
am ok if not all issues you mentioned above are finished before we commit this 
code, but I would appreciate if you could raise jiras to make sure we address 
them in the future.


> MQTT as a streaming source for SQL Streaming.
> -
>
> Key: BAHIR-39
> URL: https://issues.apache.org/jira/browse/BAHIR-39
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark SQL Data Sources
>Affects Versions: 2.1.0
>Reporter: Prashant Sharma
>
> MQTT compatible streaming source for Spark SQL Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

2016-08-03 Thread lresende
Github user lresende commented on a diff in the pull request:

https://github.com/apache/bahir/pull/13#discussion_r73407125
  
--- Diff: 
sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+import org.slf4j.LoggerFactory
+
+
+trait Logging {
--- End diff --

Do we need our own definition of Logging ? Or is ok for us to use the one 
on Spark (which is what the other extensions are using)...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406494#comment-15406494
 ] 

ASF GitHub Bot commented on BAHIR-39:
-

Github user lresende commented on a diff in the pull request:

https://github.com/apache/bahir/pull/13#discussion_r73406376
  
--- Diff: sql-streaming-mqtt/README.md ---
@@ -0,0 +1,121 @@
+A library for reading data from MQTT Servers using Spark SQL Streaming ( 
or Structured streaming.). 
+
+## Linking
+
+Using SBT:
+
+```scala
+libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % 
"2.0.0"
+```
+
+Using Maven:
+
+```xml
+
+org.apache.bahir
+spark-sql-streaming-mqtt_2.11
+2.0.0
+
+```
+
+This library can also be added to Spark jobs launched through 
`spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+```
+$ bin/spark-shell --packages 
org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.0.0
+```
+
+Unlike using `--jars`, using `--packages` ensures that this library and 
its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 
2.0 onwards.
+
+## Examples
+
+A SQL Stream can be created with data streams received through MQTT Server 
using,
+
+```scala
+sqlContext.readStream
+  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+  .option("topic", "mytopic")
+  .load("tcp://localhost:1883")
+
+```
+
+## Enable recovering from failures.
+
+Setting values for option `localStorage` and `clientId` helps in 
recovering in case of a restart, by restoring the state where it left off 
before the shutdown.
+
+```scala
+sqlContext.readStream
+  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+  .option("topic", "mytopic")
+  .option("localStorage", "/path/to/localdir")
+  .option("clientId", "some-client-id")
+  .load("tcp://localhost:1883")
+
+```
+
+### Scala API
+
+An example, for scala API to count words from incoming message stream. 
+
+```scala
+// Create DataFrame representing the stream of input lines from 
connection to mqtt server
+val lines = spark.readStream
+  
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+  .option("topic", topic)
+  .load(brokerUrl).as[(String, Timestamp)]
+
+// Split the lines into words
+val words = lines.map(_._1).flatMap(_.split(" "))
+
+// Generate running word count
+val wordCounts = words.groupBy("value").count()
+
+// Start running the query that prints the running counts to the 
console
+val query = wordCounts.writeStream
+  .outputMode("complete")
+  .format("console")
+  .start()
+
+query.awaitTermination()
+
+```
+Please see `MQTTStreamWordCount.scala` for full example.
+
+### Java API
+
+An example, for Java API to count words from incoming message stream. 
+
+```java
+   
+// Create DataFrame representing the stream of input lines from 
connection to mqtt server.
+Dataset lines = spark
+.readStream()
+
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+.option("topic", topic)
+.load(brokerUrl).select("value").as(Encoders.STRING());
+
+// Split the lines into words
+Dataset words = lines.flatMap(new FlatMapFunction() {
+@Override
+public Iterator call(String x) {
+return Arrays.asList(x.split(" ")).iterator();
+}
+}, Encoders.STRING());
+
+// Generate running word count
+Dataset wordCounts = words.groupBy("value").count();
+
+// Start running the query that prints the running counts to the 
console
+StreamingQuery query = wordCounts.writeStream()
+.outputMode("complete")
+.format("console")
+.start();
+
+query.awaitTermination();
+```
+
+Please see `JavaMQTTStreamWordCount.java` for full example.
--- End diff --

Please add a link to the example directory. Also, the example file name is 
MQTTStreamWordCount.java


> MQTT as a streaming source for SQL Streaming.
> -
>
> Key: BAHIR-39
> URL: 

[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.

2016-08-03 Thread lresende
Github user lresende commented on a diff in the pull request:

https://github.com/apache/bahir/pull/13#discussion_r73405744
  
--- Diff: 
sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.mqtt
+
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.Calendar
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.bahir.utils.Logging
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, 
MqttDefaultFilePersistence}
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, 
Source}
+import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object MQTTStreamConstants {
+
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss")
+
+  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
+:: StructField("timestamp", TimestampType) :: Nil)
+}
+
+class MQTTTextStreamSource(brokerUrl: String, persistence: 
MqttClientPersistence,
+topic: String, clientId: String, messageParser: Array[Byte] => 
(String, Timestamp),
+sqlContext: SQLContext) extends Source with Logging {
+
+  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
+
+  private val store = new LocalMessageStore(persistence, 
sqlContext.sparkContext.getConf)
+
+  private val messages = new TrieMap[Int, (String, Timestamp)]
+
+  private val initLock = new CountDownLatch(1)
+
+  private var offset = 0
+
+  private var client: MqttClient = _
+
+  private def fetchLastProcessedOffset(): Int = {
+Try(store.maxProcessedOffset) match {
+  case Success(x) =>
+log.info(s"Recovering from last stored offset $x")
+x
+  case Failure(e) => 0
+}
+  }
+
+  initialize()
+  private def initialize(): Unit = {
+
+client = new MqttClient(brokerUrl, clientId, persistence)
+val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
+mqttConnectOptions.setAutomaticReconnect(true)
+// This is required to support recovery. TODO: configurable ?
+mqttConnectOptions.setCleanSession(false)
+
+val callback = new MqttCallbackExtended() {
+
+  override def messageArrived(topic_ : String, message: MqttMessage): 
Unit = synchronized {
+initLock.await() // Wait for initialization to complete.
+val temp = offset + 1
+messages.put(temp, messageParser(message.getPayload))
+offset = temp
+log.trace(s"Message arrived, $topic_ $message")
+  }
+
+  override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
+  }
+
+  override def connectionLost(cause: Throwable): Unit = {
+log.warn("Connection to mqtt server lost.", cause)
+  }
+
+  override def connectComplete(reconnect: Boolean, serverURI: String): 
Unit = {
+log.info(s"Connect complete $serverURI. Is it a reconnect?: 
$reconnect")
+  }
+}
+client.setCallback(callback)
+client.connect(mqttConnectOptions)
+client.subscribe(topic)
+// It is not possible to initialize offset without `client.connect`
+offset = fetchLastProcessedOffset()
+initLock.countDown() // Release.
+  }
+
+  /** Stop this source and free any resources it has allocated. */
+  override def stop(): Unit = {
+client.disconnect()
+persistence.close()
+client.close()
+  }
  

[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406487#comment-15406487
 ] 

ASF GitHub Bot commented on BAHIR-39:
-

Github user lresende commented on a diff in the pull request:

https://github.com/apache/bahir/pull/13#discussion_r73405558
  
--- Diff: 
sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.sql.streaming.mqtt
+
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.Calendar
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.bahir.utils.Logging
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, 
MqttDefaultFilePersistence}
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, 
Source}
+import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object MQTTStreamConstants {
+
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss")
--- End diff --

Not thread safe, see comments below on calendar usage...


> MQTT as a streaming source for SQL Streaming.
> -
>
> Key: BAHIR-39
> URL: https://issues.apache.org/jira/browse/BAHIR-39
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark SQL Data Sources
>Affects Versions: 2.1.0
>Reporter: Prashant Sharma
>
> MQTT compatible streaming source for Spark SQL Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-31) Add documentation for streaming-zeromq connector

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406464#comment-15406464
 ] 

ASF GitHub Bot commented on BAHIR-31:
-

Github user lresende closed the pull request at:

https://github.com/apache/bahir/pull/15


> Add documentation for streaming-zeromq connector
> 
>
> Key: BAHIR-31
> URL: https://issues.apache.org/jira/browse/BAHIR-31
> Project: Bahir
>  Issue Type: Sub-task
>  Components: Spark Streaming Connectors
>Reporter: Luciano Resende
>Assignee: Luciano Resende
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-29) Add documentation for streaming-mqtt connector

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-29?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406466#comment-15406466
 ] 

ASF GitHub Bot commented on BAHIR-29:
-

Github user lresende closed the pull request at:

https://github.com/apache/bahir/pull/17


> Add documentation for streaming-mqtt connector
> --
>
> Key: BAHIR-29
> URL: https://issues.apache.org/jira/browse/BAHIR-29
> Project: Bahir
>  Issue Type: Sub-task
>  Components: Spark Streaming Connectors
>Reporter: Luciano Resende
>Assignee: Luciano Resende
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-30) Add documentation for streaming-twitter connector

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-30?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406465#comment-15406465
 ] 

ASF GitHub Bot commented on BAHIR-30:
-

Github user lresende closed the pull request at:

https://github.com/apache/bahir/pull/16


> Add documentation for streaming-twitter connector
> -
>
> Key: BAHIR-30
> URL: https://issues.apache.org/jira/browse/BAHIR-30
> Project: Bahir
>  Issue Type: Sub-task
>  Components: Spark Streaming Connectors
>Reporter: Luciano Resende
>Assignee: Luciano Resende
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] bahir pull request #16: [BAHIR-30] Add basic documentation for Twitter conne...

2016-08-03 Thread lresende
Github user lresende closed the pull request at:

https://github.com/apache/bahir/pull/16


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] bahir pull request #18: [BAHIR-28] Add basic documentation for Akka connecto...

2016-08-03 Thread lresende
Github user lresende closed the pull request at:

https://github.com/apache/bahir/pull/18


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] bahir pull request #17: [BAHIR-29] Add basic documentation for MQTT Connecto...

2016-08-03 Thread lresende
Github user lresende closed the pull request at:

https://github.com/apache/bahir/pull/17


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] bahir pull request #15: [BAHIR-31] Add basic documentation for ZeroMQ connec...

2016-08-03 Thread lresende
Github user lresende closed the pull request at:

https://github.com/apache/bahir/pull/15


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (BAHIR-29) Add documentation for streaming-mqtt connector

2016-08-03 Thread Luciano Resende (JIRA)

 [ 
https://issues.apache.org/jira/browse/BAHIR-29?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luciano Resende closed BAHIR-29.

   Resolution: Fixed
 Assignee: Luciano Resende
Fix Version/s: 2.0.0

> Add documentation for streaming-mqtt connector
> --
>
> Key: BAHIR-29
> URL: https://issues.apache.org/jira/browse/BAHIR-29
> Project: Bahir
>  Issue Type: Sub-task
>  Components: Spark Streaming Connectors
>Reporter: Luciano Resende
>Assignee: Luciano Resende
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (BAHIR-27) Add documentation for existing streaming connectors

2016-08-03 Thread Luciano Resende (JIRA)

 [ 
https://issues.apache.org/jira/browse/BAHIR-27?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luciano Resende closed BAHIR-27.

   Resolution: Fixed
 Assignee: Luciano Resende
Fix Version/s: 2.0.0

> Add documentation for existing streaming connectors
> ---
>
> Key: BAHIR-27
> URL: https://issues.apache.org/jira/browse/BAHIR-27
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark Streaming Connectors
>Reporter: Luciano Resende
>Assignee: Luciano Resende
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-29) Add documentation for streaming-mqtt connector

2016-08-03 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-29?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406449#comment-15406449
 ] 

ASF subversion and git services commented on BAHIR-29:
--

Commit 619936d39d18b7af45b7acec9af02b599a43b056 in bahir's branch 
refs/heads/master from [~luciano resende]
[ https://git-wip-us.apache.org/repos/asf?p=bahir.git;h=619936d ]

[BAHIR-29] Add basic documentation for MQTT Connector


> Add documentation for streaming-mqtt connector
> --
>
> Key: BAHIR-29
> URL: https://issues.apache.org/jira/browse/BAHIR-29
> Project: Bahir
>  Issue Type: Sub-task
>  Components: Spark Streaming Connectors
>Reporter: Luciano Resende
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-31) Add documentation for streaming-zeromq connector

2016-08-03 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406451#comment-15406451
 ] 

ASF subversion and git services commented on BAHIR-31:
--

Commit 29d8c7622cf9663e295d7616ae1e1b089fe80da9 in bahir's branch 
refs/heads/master from [~luciano resende]
[ https://git-wip-us.apache.org/repos/asf?p=bahir.git;h=29d8c76 ]

[BAHIR-31] Add basic documentation for ZeroMQ connector


> Add documentation for streaming-zeromq connector
> 
>
> Key: BAHIR-31
> URL: https://issues.apache.org/jira/browse/BAHIR-31
> Project: Bahir
>  Issue Type: Sub-task
>  Components: Spark Streaming Connectors
>Reporter: Luciano Resende
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.

2016-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405435#comment-15405435
 ] 

ASF GitHub Bot commented on BAHIR-39:
-

Github user ScrapCodes commented on the issue:

https://github.com/apache/bahir/pull/13
  
@lresende Can you please review this PR ?
@mridulm  Can you please take a look now and see if the concurrency issue 
that you sensed earlier exists ?


> MQTT as a streaming source for SQL Streaming.
> -
>
> Key: BAHIR-39
> URL: https://issues.apache.org/jira/browse/BAHIR-39
> Project: Bahir
>  Issue Type: New Feature
>  Components: Spark SQL Data Sources
>Affects Versions: 2.1.0
>Reporter: Prashant Sharma
>
> MQTT compatible streaming source for Spark SQL Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)