Github user ckadner commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/37#discussion_r104118613
  
    --- Diff: 
streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala 
---
    @@ -199,7 +199,181 @@ object MQTTUtils {
         createStream(jssc.ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
           Option(username), Option(password), Option(cleanSession), None, 
None, None, None)
       }
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT 
publisher.
    +   * @param ssc           StreamingContext object
    +   * @param brokerUrl     Url of remote MQTT publisher
    +   * @param topics        Array of topic names to subscribe to
    +   * @param storageLevel  RDD storage level. Defaults to 
StorageLevel.MEMORY_AND_DISK_SER_2.
    +   */
    +  def createPairedStream(
    +      ssc: StreamingContext,
    +      brokerUrl: String,
    +      topics: Array[String],
    +      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    +    ): ReceiverInputDStream[(String, String)] = {
    +    new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel)
    +  }
    +
     
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT 
publisher.
    +   * @param ssc                StreamingContext object
    +   * @param brokerUrl          Url of remote MQTT publisher
    +   * @param topics             Array of topic names to subscribe to
    +   * @param storageLevel       RDD storage level. Defaults to 
StorageLevel.MEMORY_AND_DISK_SER_2.
    +   * @param clientId           ClientId to use for the mqtt connection
    +   * @param username           Username for authentication to the mqtt 
publisher
    +   * @param password           Password for authentication to the mqtt 
publisher
    +   * @param cleanSession       Sets the mqtt cleanSession parameter
    +   * @param qos                Quality of service to use for the topic 
subscription
    +   * @param connectionTimeout  Connection timeout for the mqtt connection
    +   * @param keepAliveInterval  Keepalive interal for the mqtt connection
    +   * @param mqttVersion        Version to use for the mqtt connection
    +   */
    +  def createPairedStream(
    +      ssc: StreamingContext,
    +      brokerUrl: String,
    +      topics: Array[String],
    +      storageLevel: StorageLevel,
    +      clientId: Option[String],
    +      username: Option[String],
    +      password: Option[String],
    +      cleanSession: Option[Boolean],
    +      qos: Option[Int],
    +      connectionTimeout: Option[Int],
    +      keepAliveInterval: Option[Int],
    +      mqttVersion: Option[Int]
    +    ): ReceiverInputDStream[(String, String)] = {
    +    new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel, 
clientId, username, password,
    +          cleanSession, qos, connectionTimeout, keepAliveInterval, 
mqttVersion)
    +  }
    +
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT 
publisher.
    +   * Storage level of the data will be the default 
StorageLevel.MEMORY_AND_DISK_SER_2.
    +   * @param jssc      JavaStreamingContext object
    +   * @param brokerUrl Url of remote MQTT publisher
    +   * @param topic     Array of topic names to subscribe to
    --- End diff --
    
    should be `@param topics` (plural)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to