Github user ckadner commented on a diff in the pull request:
https://github.com/apache/bahir/pull/37#discussion_r104118613
--- Diff:
streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
---
@@ -199,7 +199,181 @@ object MQTTUtils {
createStream(jssc.ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
Option(username), Option(password), Option(cleanSession), None,
None, None, None)
}
+ /**
+ * Create an input stream that receives messages pushed by a MQTT
publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to
StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createPairedStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[(String, String)] = {
+ new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT
publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topics Array of topic names to subscribe to
+ * @param storageLevel RDD storage level. Defaults to
StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param clientId ClientId to use for the mqtt connection
+ * @param username Username for authentication to the mqtt
publisher
+ * @param password Password for authentication to the mqtt
publisher
+ * @param cleanSession Sets the mqtt cleanSession parameter
+ * @param qos Quality of service to use for the topic
subscription
+ * @param connectionTimeout Connection timeout for the mqtt connection
+ * @param keepAliveInterval Keepalive interal for the mqtt connection
+ * @param mqttVersion Version to use for the mqtt connection
+ */
+ def createPairedStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topics: Array[String],
+ storageLevel: StorageLevel,
+ clientId: Option[String],
+ username: Option[String],
+ password: Option[String],
+ cleanSession: Option[Boolean],
+ qos: Option[Int],
+ connectionTimeout: Option[Int],
+ keepAliveInterval: Option[Int],
+ mqttVersion: Option[Int]
+ ): ReceiverInputDStream[(String, String)] = {
+ new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel,
clientId, username, password,
+ cleanSession, qos, connectionTimeout, keepAliveInterval,
mqttVersion)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT
publisher.
+ * Storage level of the data will be the default
StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Array of topic names to subscribe to
--- End diff --
should be `@param topics` (plural)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---