[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15407153#comment-15407153 ] ASF GitHub Bot commented on BAHIR-39: - Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73461457 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { +initLock.await() // Wait for initialization to complete. +val temp = offset + 1 +messages.put(temp, messageParser(message.getPayload)) +offset = temp +log.trace(s"Message arrived, $topic_ $message") + } + + override def deliveryComplete(token: IMqttDeliveryToken): Unit = { + } + + override def connectionLost(cause: Throwable): Unit = { +log.warn("Connection to mqtt server lost.", cause) + } + + override def connectComplete(reconnect: Boolean, serverURI: String): Unit = { +log.info(s"Connect complete $serverURI. Is it a reconnect?: $reconnect") + } +} +client.setCallback(callback) +client.connect(mqttConnectOptions) +client.subscribe(topic) +// It is not possible to initialize offset without `client.connect` +offset = fetchLastProcessedOffset() --- End diff --
[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73461457 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { +initLock.await() // Wait for initialization to complete. +val temp = offset + 1 +messages.put(temp, messageParser(message.getPayload)) +offset = temp +log.trace(s"Message arrived, $topic_ $message") + } + + override def deliveryComplete(token: IMqttDeliveryToken): Unit = { + } + + override def connectionLost(cause: Throwable): Unit = { +log.warn("Connection to mqtt server lost.", cause) + } + + override def connectComplete(reconnect: Boolean, serverURI: String): Unit = { +log.info(s"Connect complete $serverURI. Is it a reconnect?: $reconnect") + } +} +client.setCallback(callback) +client.connect(mqttConnectOptions) +client.subscribe(topic) +// It is not possible to initialize offset without `client.connect` +offset = fetchLastProcessedOffset() --- End diff -- Hi Fred, offset is initialized here. Because it can only be fetched once we have connected the client. This also the reason, I am holding an `initlock`. --- If your project is set up for it, you can reply to this email and
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406625#comment-15406625 ] ASF GitHub Bot commented on BAHIR-39: - Github user frreiss commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73422375 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { +initLock.await() // Wait for initialization to complete. +val temp = offset + 1 +messages.put(temp, messageParser(message.getPayload)) +offset = temp +log.trace(s"Message arrived, $topic_ $message") + } + + override def deliveryComplete(token: IMqttDeliveryToken): Unit = { --- End diff -- I feel like this callback probably needs to do something to ensure that there are no duplicate or out of order messages in the message buffer. What is the interaction between these two callbacks (messageArrived/deliveryComplete) and the different QoS levels in MQTT? > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions:
[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.
Github user frreiss commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73422375 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { +initLock.await() // Wait for initialization to complete. +val temp = offset + 1 +messages.put(temp, messageParser(message.getPayload)) +offset = temp +log.trace(s"Message arrived, $topic_ $message") + } + + override def deliveryComplete(token: IMqttDeliveryToken): Unit = { --- End diff -- I feel like this callback probably needs to do something to ensure that there are no duplicate or out of order messages in the message buffer. What is the interaction between these two callbacks (messageArrived/deliveryComplete) and the different QoS levels in MQTT? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.
Github user frreiss commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73422074 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { --- End diff -- Does MQTT guarantee that this method will be called exactly once, even if the client or server crashes and is restarted? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406522#comment-15406522 ] ASF GitHub Bot commented on BAHIR-39: - Github user lresende commented on the issue: https://github.com/apache/bahir/pull/13 @ScrapCodes I added some comments, they should be easy to address. Also, I am ok if not all issues you mentioned above are finished before we commit this code, but I would appreciate if you could raise jiras to make sure we address them in the future. > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.
Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73407125 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.utils + +import org.slf4j.LoggerFactory + + +trait Logging { --- End diff -- Do we need our own definition of Logging ? Or is ok for us to use the one on Spark (which is what the other extensions are using)... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406494#comment-15406494 ] ASF GitHub Bot commented on BAHIR-39: - Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73406376 --- Diff: sql-streaming-mqtt/README.md --- @@ -0,0 +1,121 @@ +A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.). + +## Linking + +Using SBT: + +```scala +libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.0.0" +``` + +Using Maven: + +```xml + +org.apache.bahir +spark-sql-streaming-mqtt_2.11 +2.0.0 + +``` + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + +``` +$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.0.0 +``` + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. + +## Examples + +A SQL Stream can be created with data streams received through MQTT Server using, + +```scala +sqlContext.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", "mytopic") + .load("tcp://localhost:1883") + +``` + +## Enable recovering from failures. + +Setting values for option `localStorage` and `clientId` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown. + +```scala +sqlContext.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", "mytopic") + .option("localStorage", "/path/to/localdir") + .option("clientId", "some-client-id") + .load("tcp://localhost:1883") + +``` + +### Scala API + +An example, for scala API to count words from incoming message stream. + +```scala +// Create DataFrame representing the stream of input lines from connection to mqtt server +val lines = spark.readStream + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") + .option("topic", topic) + .load(brokerUrl).as[(String, Timestamp)] + +// Split the lines into words +val words = lines.map(_._1).flatMap(_.split(" ")) + +// Generate running word count +val wordCounts = words.groupBy("value").count() + +// Start running the query that prints the running counts to the console +val query = wordCounts.writeStream + .outputMode("complete") + .format("console") + .start() + +query.awaitTermination() + +``` +Please see `MQTTStreamWordCount.scala` for full example. + +### Java API + +An example, for Java API to count words from incoming message stream. + +```java + +// Create DataFrame representing the stream of input lines from connection to mqtt server. +Dataset lines = spark +.readStream() + .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") +.option("topic", topic) +.load(brokerUrl).select("value").as(Encoders.STRING()); + +// Split the lines into words +Dataset words = lines.flatMap(new FlatMapFunction() { +@Override +public Iterator call(String x) { +return Arrays.asList(x.split(" ")).iterator(); +} +}, Encoders.STRING()); + +// Generate running word count +Dataset wordCounts = words.groupBy("value").count(); + +// Start running the query that prints the running counts to the console +StreamingQuery query = wordCounts.writeStream() +.outputMode("complete") +.format("console") +.start(); + +query.awaitTermination(); +``` + +Please see `JavaMQTTStreamWordCount.java` for full example. --- End diff -- Please add a link to the example directory. Also, the example file name is MQTTStreamWordCount.java > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL:
[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.
Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73405744 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { +initLock.await() // Wait for initialization to complete. +val temp = offset + 1 +messages.put(temp, messageParser(message.getPayload)) +offset = temp +log.trace(s"Message arrived, $topic_ $message") + } + + override def deliveryComplete(token: IMqttDeliveryToken): Unit = { + } + + override def connectionLost(cause: Throwable): Unit = { +log.warn("Connection to mqtt server lost.", cause) + } + + override def connectComplete(reconnect: Boolean, serverURI: String): Unit = { +log.info(s"Connect complete $serverURI. Is it a reconnect?: $reconnect") + } +} +client.setCallback(callback) +client.connect(mqttConnectOptions) +client.subscribe(topic) +// It is not possible to initialize offset without `client.connect` +offset = fetchLastProcessedOffset() +initLock.countDown() // Release. + } + + /** Stop this source and free any resources it has allocated. */ + override def stop(): Unit = { +client.disconnect() +persistence.close() +client.close() + }
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406487#comment-15406487 ] ASF GitHub Bot commented on BAHIR-39: - Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73405558 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") --- End diff -- Not thread safe, see comments below on calendar usage... > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-31) Add documentation for streaming-zeromq connector
[ https://issues.apache.org/jira/browse/BAHIR-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406464#comment-15406464 ] ASF GitHub Bot commented on BAHIR-31: - Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/15 > Add documentation for streaming-zeromq connector > > > Key: BAHIR-31 > URL: https://issues.apache.org/jira/browse/BAHIR-31 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende >Assignee: Luciano Resende > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-29) Add documentation for streaming-mqtt connector
[ https://issues.apache.org/jira/browse/BAHIR-29?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406466#comment-15406466 ] ASF GitHub Bot commented on BAHIR-29: - Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/17 > Add documentation for streaming-mqtt connector > -- > > Key: BAHIR-29 > URL: https://issues.apache.org/jira/browse/BAHIR-29 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende >Assignee: Luciano Resende > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-30) Add documentation for streaming-twitter connector
[ https://issues.apache.org/jira/browse/BAHIR-30?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406465#comment-15406465 ] ASF GitHub Bot commented on BAHIR-30: - Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/16 > Add documentation for streaming-twitter connector > - > > Key: BAHIR-30 > URL: https://issues.apache.org/jira/browse/BAHIR-30 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende >Assignee: Luciano Resende > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] bahir pull request #16: [BAHIR-30] Add basic documentation for Twitter conne...
Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/16 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #18: [BAHIR-28] Add basic documentation for Akka connecto...
Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/18 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #17: [BAHIR-29] Add basic documentation for MQTT Connecto...
Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/17 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #15: [BAHIR-31] Add basic documentation for ZeroMQ connec...
Github user lresende closed the pull request at: https://github.com/apache/bahir/pull/15 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (BAHIR-29) Add documentation for streaming-mqtt connector
[ https://issues.apache.org/jira/browse/BAHIR-29?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luciano Resende closed BAHIR-29. Resolution: Fixed Assignee: Luciano Resende Fix Version/s: 2.0.0 > Add documentation for streaming-mqtt connector > -- > > Key: BAHIR-29 > URL: https://issues.apache.org/jira/browse/BAHIR-29 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende >Assignee: Luciano Resende > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (BAHIR-27) Add documentation for existing streaming connectors
[ https://issues.apache.org/jira/browse/BAHIR-27?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luciano Resende closed BAHIR-27. Resolution: Fixed Assignee: Luciano Resende Fix Version/s: 2.0.0 > Add documentation for existing streaming connectors > --- > > Key: BAHIR-27 > URL: https://issues.apache.org/jira/browse/BAHIR-27 > Project: Bahir > Issue Type: New Feature > Components: Spark Streaming Connectors >Reporter: Luciano Resende >Assignee: Luciano Resende > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-29) Add documentation for streaming-mqtt connector
[ https://issues.apache.org/jira/browse/BAHIR-29?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406449#comment-15406449 ] ASF subversion and git services commented on BAHIR-29: -- Commit 619936d39d18b7af45b7acec9af02b599a43b056 in bahir's branch refs/heads/master from [~luciano resende] [ https://git-wip-us.apache.org/repos/asf?p=bahir.git;h=619936d ] [BAHIR-29] Add basic documentation for MQTT Connector > Add documentation for streaming-mqtt connector > -- > > Key: BAHIR-29 > URL: https://issues.apache.org/jira/browse/BAHIR-29 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-31) Add documentation for streaming-zeromq connector
[ https://issues.apache.org/jira/browse/BAHIR-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406451#comment-15406451 ] ASF subversion and git services commented on BAHIR-31: -- Commit 29d8c7622cf9663e295d7616ae1e1b089fe80da9 in bahir's branch refs/heads/master from [~luciano resende] [ https://git-wip-us.apache.org/repos/asf?p=bahir.git;h=29d8c76 ] [BAHIR-31] Add basic documentation for ZeroMQ connector > Add documentation for streaming-zeromq connector > > > Key: BAHIR-31 > URL: https://issues.apache.org/jira/browse/BAHIR-31 > Project: Bahir > Issue Type: Sub-task > Components: Spark Streaming Connectors >Reporter: Luciano Resende > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BAHIR-39) MQTT as a streaming source for SQL Streaming.
[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405435#comment-15405435 ] ASF GitHub Bot commented on BAHIR-39: - Github user ScrapCodes commented on the issue: https://github.com/apache/bahir/pull/13 @lresende Can you please review this PR ? @mridulm Can you please take a look now and see if the concurrency issue that you sensed earlier exists ? > MQTT as a streaming source for SQL Streaming. > - > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources >Affects Versions: 2.1.0 >Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)