Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/4229#discussion_r35410104
--- Diff: python/pyspark/streaming/tests.py ---
@@ -827,6 +828,67 @@ def test_flume_polling_multiple_hosts(self):
self._testMultipleTimes(self._testFlumePollingMultipleHosts)
+class MQTTStreamTests(PySparkStreamingTestCase):
+ timeout = 20 # seconds
+ duration = 1
+
+ def setUp(self):
+ super(MQTTStreamTests, self).setUp()
+
+ MQTTTestUtilsClz =
self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils")
+ self._MQTTTestUtils = MQTTTestUtilsClz.newInstance()
+ self._MQTTTestUtils.setup()
+
+ def tearDown(self):
+ if self._MQTTTestUtils is not None:
+ self._MQTTTestUtils.teardown()
+ self._MQTTTestUtils = None
+
+ super(MQTTStreamTests, self).tearDown()
+
+ def _randomTopic(self):
+ return "topic-%d" % random.randint(0, 10000)
+
+ def _startContext(self, topic):
+ # Start the StreamingContext and also collect the result
+ stream = MQTTUtils.createStream(self.ssc, "tcp://" +
self._MQTTTestUtils.brokerUri(), topic)
+ result = []
+
+ def getOutput(_, rdd):
+ for data in rdd.collect():
+ result.append(data)
+
+ stream.foreachRDD(getOutput)
+ self.ssc.start()
+ return result
+
+ def _publishData(self, topic, data):
+ start_time = time.time()
+ while True:
+ try:
+ self._MQTTTestUtils.publishData(topic, data)
+ break
+ except:
+ if time.time() - start_time < self.timeout:
+ time.sleep(0.01)
+ else:
+ raise
+
+ def _validateStreamResult(self, sendData, result):
+ receiveData = ''.join(result[0])
+ self.assertEqual(sendData, receiveData)
+
+ def test_mqtt_stream(self):
+ """Test the Python MQTT stream API."""
+ sendData = "MQTT demo for spark streaming"
+ topic = self._randomTopic()
+ result = self._startContext(topic)
+ self._publishData(topic, sendData)
--- End diff --
Finally found why the test failed. If `self._publishData` finishes before
starting the MQTT receiver, the receiver won't receive the data. You need to
wait for starting the receiver before publishing data.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]