[GitHub] bahir issue #13: [BAHIR-39] Add SQL Streaming MQTT support.
Github user frreiss commented on the issue: https://github.com/apache/bahir/pull/13 LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.
Github user frreiss commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73423700 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 --- End diff -- I think you need to initialize this variable from something stored in a checkpoint. Otherwise you'll get a different answer at a given offset into the stream if the source dies and is restarted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.
Github user frreiss commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73422375 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { +initLock.await() // Wait for initialization to complete. +val temp = offset + 1 +messages.put(temp, messageParser(message.getPayload)) +offset = temp +log.trace(s"Message arrived, $topic_ $message") + } + + override def deliveryComplete(token: IMqttDeliveryToken): Unit = { --- End diff -- I feel like this callback probably needs to do something to ensure that there are no duplicate or out of order messages in the message buffer. What is the interaction between these two callbacks (messageArrived/deliveryComplete) and the different QoS levels in MQTT? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #13: [BAHIR-39] Add SQL Streaming MQTT support.
Github user frreiss commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73422074 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { +Try(store.maxProcessedOffset) match { + case Success(x) => +log.info(s"Recovering from last stored offset $x") +x + case Failure(e) => 0 +} + } + + initialize() + private def initialize(): Unit = { + +client = new MqttClient(brokerUrl, clientId, persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +// This is required to support recovery. TODO: configurable ? +mqttConnectOptions.setCleanSession(false) + +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { --- End diff -- Does MQTT guarantee that this method will be called exactly once, even if the client or server crashes and is restarted? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir issue #13: Add SQL Streaming MQTT support.
Github user frreiss commented on the issue: https://github.com/apache/bahir/pull/13 I don't think a warning is going to be enough. As far as I can see, this code will exhaust the Java heap and crash the Spark executor processes every time it runs. And when those processes restart, they will not be able to replay any lost data, because there is no code to repopulate the messages buffer. Even if there was code to repopulate the buffer, filling the buffer up again would only result in exhausting the heap again. Can MQTT's client-side persistence be used to provide the ability to replay arbitrarily back in time? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir issue #13: Add SQL Streaming MQTT support.
Github user frreiss commented on the issue: https://github.com/apache/bahir/pull/13 I don't see any code in this diff to remove old messages from the in-memory buffer after those messages have been consumed. Is there something I'm missing? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #13: Add SQL Streaming MQTT support.
Github user frreiss commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r72210666 --- Diff: streaming-mqtt/sql/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStream.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable.ArrayBuffer + +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, Encoder, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{TimestampType, StringType, StructField, StructType} + + +object MQTTStream { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStream(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + private val offset = new AtomicLong(0) + override def schema: StructType = MQTTStream.SCHEMA_DEFAULT + + @GuardedBy("this") + private var messages = new ArrayBuffer[(String, Timestamp)] + initialize() + private def initialize(): Unit = { + +val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = { +offset.getAndIncrement() +messages += messageParser(message.getPayload) --- End diff -- I don't think the @GuardedBy annotation adds `synchronized` blocks automatically. Maybe you meant to make the `messages` field synchronized? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #13: Add SQL Streaming MQTT support.
Github user frreiss commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r72195316 --- Diff: streaming-mqtt/sql/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStream.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable.ArrayBuffer + +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, Encoder, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{TimestampType, StringType, StructField, StructType} + + +object MQTTStream { + + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) +:: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStream(brokerUrl: String, persistence: MqttClientPersistence, +topic: String, messageParser: Array[Byte] => (String, Timestamp), +sqlContext: SQLContext) extends Source with Logging { + + private val offset = new AtomicLong(0) + override def schema: StructType = MQTTStream.SCHEMA_DEFAULT + + @GuardedBy("this") + private var messages = new ArrayBuffer[(String, Timestamp)] + initialize() + private def initialize(): Unit = { + +val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) +val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() +mqttConnectOptions.setAutomaticReconnect(true) +val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = { +offset.getAndIncrement() +messages += messageParser(message.getPayload) --- End diff -- Shouldn't there be some locking code here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] bahir pull request #13: Add SQL Streaming MQTT support.
Github user frreiss commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r72194637 --- Diff: streaming-mqtt/dstream/python/dstream.py --- @@ -0,0 +1,643 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +import operator +import time +from itertools import chain +from datetime import datetime + +if sys.version < "3": +from itertools import imap as map, ifilter as filter + +from py4j.protocol import Py4JJavaError + +from pyspark import RDD +from pyspark.storagelevel import StorageLevel +from pyspark.streaming.util import rddToFileName, TransformFunction +from pyspark.rdd import portable_hash +from pyspark.resultiterable import ResultIterable + +__all__ = ["DStream"] + + +class DStream(object): --- End diff -- Should this class be in Bahir? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---