Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/3844#discussion_r22397277
--- Diff:
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
---
@@ -17,31 +17,65 @@
package org.apache.spark.streaming.mqtt
-import org.scalatest.FunSuite
-
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
-class MQTTStreamSuite extends FunSuite {
-
- val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter
{
+ private val batchDuration = Seconds(1)
private val master: String = "local[2]"
-
private val framework: String = this.getClass.getSimpleName
+ private val brokerUrl = "tcp://localhost:1883"
+ private val topic = "def"
+ private var ssc: StreamingContext = _
- test("mqtt input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val brokerUrl = "abc"
- val topic = "def"
+ before {
+ ssc = new StreamingContext(master, framework, batchDuration)
+ }
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ ssc = null
+ }
+ }
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc,
brokerUrl, topic)
- val test2: ReceiverInputDStream[String] =
+ test("mqtt input stream") {
+ val sendMessage = "MQTT demo for spark streaming"
+ publishData(sendMessage)
+ val receiveStream: ReceiverInputDStream[String] =
MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2)
-
- // TODO: Actually test receiving data
+ var receiveMessage: String = ""
+ receiveStream.foreachRDD { rdd =>
+ receiveMessage = rdd.first
+ receiveMessage
+ }
+ ssc.start()
+ eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+ assert(sendMessage.equals(receiveMessage))
+ }
ssc.stop()
}
+
+ def publishData(sendMessage: String): Unit = {
+ try {
+ val persistence: MqttClientPersistence = new
MqttDefaultFilePersistence("/tmp")
+ val client: MqttClient = new MqttClient(brokerUrl,
MqttClient.generateClientId(), persistence)
+ client.connect()
+ val msgTopic: MqttTopic = client.getTopic(topic)
+ val message: MqttMessage = new
MqttMessage(String.valueOf(sendMessage).getBytes("utf-8"))
+ message.setQos(1)
+ message.setRetained(true)
+ msgTopic.publish(message)
+ println("Published data \ntopic: " + msgTopic.getName() +
"\nMessage: " + message)
+ client.disconnect()
+ } catch {
+ case e: MqttException => println("Exception Caught: " + e)
+ }
--- End diff --
Shouldnt there be a `finally` to close any running servers (client, etc.)?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]