[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-72061693 See #4270 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user srowen commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-71854715 @dragos good catch. It sounds like an issue with the test if anything. You could just reopen SPARK-4631 with a workaround. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-71858250 @srowen I commented on the ticket, but I can't re-open it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user dragos commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-71852913 There also seems to be a race condition introduced by this test. It fails consistently for me (but passes if I add a `Thread.sleep(50)` inside `publishData`). I'll open a ticket. ``` [info] - mqtt input stream *** FAILED *** (552 milliseconds) [info] org.eclipse.paho.client.mqttv3.MqttException: Too many publishes in progress [info] at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:432) [info] at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:121) [info] at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:139) [info] at org.eclipse.paho.client.mqttv3.MqttTopic.publish(MqttTopic.java:107) [info] at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:126) [info] at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:124) [info] at scala.collection.immutable.Range.foreach(Range.scala:141) [info] at org.apache.spark.streaming.mqtt.MQTTStreamSuite.publishData(MQTTStreamSuite.scala:124) [info] at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply$mcV$sp(MQTTStreamSuite.scala:78) [info] at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply(MQTTStreamSuite.scala:66) [info] at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply(MQTTStreamSuite.scala:66) [info] at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) [info] at org.scalatest.Suite$class.withFixture(Suite.scala:1122) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-69675455 It looks like there's maybe a port-binding / racing issue here? https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=centos/1356/testReport/ ``` sbt.ForkMain$ForkError: Transport Connector could not be registered in JMX: Failed to bind to server socket: mqtt://localhost:23456 due to: java.net.BindException: Address already in use at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:27) at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:1977) at org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2468) at org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2385) at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:684) at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:642) at org.apache.activemq.broker.BrokerService.start(BrokerService.java:578) at org.apache.spark.streaming.mqtt.MQTTStreamSuite.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$setupMQTT(MQTTStreamSuite.scala:90) at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply$mcV$sp(MQTTStreamSuite.scala:53) at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply(MQTTStreamSuite.scala:51) at org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply(MQTTStreamSuite.scala:51) at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:195) at org.apache.spark.streaming.mqtt.MQTTStreamSuite.runTest(MQTTStreamSuite.scala:37) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) at org.apache.spark.streaming.mqtt.MQTTStreamSuite.org$scalatest$BeforeAndAfter$$super$run(MQTTStreamSuite.scala:37) at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241) at org.apache.spark.streaming.mqtt.MQTTStreamSuite.run(MQTTStreamSuite.scala:37) at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671) at sbt.ForkMain$Run$2.call(ForkMain.java:294) at sbt.ForkMain$Run$2.call(ForkMain.java:284) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: sbt.ForkMain$ForkError: Failed to bind to server socket: mqtt://localhost:23456 due to: java.net.BindException: Address already in use at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33) at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:138) at org.apache.activemq.transport.tcp.TcpTransportFactory.doBind(TcpTransportFactory.java:60) at org.apache.activemq.transport.TransportFactory.bind(TransportFactory.java:124) at org.apache.activemq.broker.TransportConnector.createTransportServer(TransportConnector.java:310) at org.apache.activemq.broker.TransportConnector.getServer(TransportConnector.java:136) at org.apache.activemq.broker.TransportConnector.asManagedConnector(TransportConnector.java:105) at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:1972) ... 38 more Caused by: sbt.ForkMain$ForkError: Address
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user Bilna commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-69694551 ok.. I will look into it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user Bilna commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68686800 @tdas, Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user Bilna commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22453522 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,111 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = //localhost: + freePort + private val topic = def + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test(mqtt input stream) { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = abc -val topic = def + before { +ssc = new StreamingContext(master, framework, batchDuration) +setupMQTT() + } -// tests the API, does not actually test data receiving -val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) -val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { +if (ssc != null) { + ssc.stop() + ssc = null +} +Utils.deleteRecursively(persistenceDir) +tearDownMQTT() + } -// TODO: Actually test receiving data + test(mqtt input stream) { +val sendMessage = MQTT demo for spark streaming +val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, StorageLevel.MEMORY_ONLY) +var receiveMessage: List[String] = List() +receiveStream.foreachRDD { rdd = + if (rdd.collect.length 0) { +receiveMessage = receiveMessage ::: List(rdd.first) +receiveMessage + } +} +ssc.start() +publishData(sendMessage) +eventually(timeout(1 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage(0))) +} ssc.stop() } + + private def setupMQTT() { +broker = new BrokerService() +connector = new TransportConnector() +connector.setName(mqtt) +connector.setUri(new URI(mqtt: + brokerUri)) +broker.addConnector(connector) +broker.start() + } + + private def tearDownMQTT() { +if (broker != null) { + broker.stop() + broker = null +} +if (connector != null) { + connector.stop() + connector = null +} + } + + private def findFreePort(): Int = { +Utils.startServiceOnPort(23456, (trialPort: Int) = { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) +})._2 + } + + def publishData(data: String): Unit = { +var client: MqttClient = null +try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient(tcp: + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { +val msgTopic: MqttTopic = client.getTopic(topic) +val message: MqttMessage = new MqttMessage(data.getBytes(utf-8)) +message.setQos(1) +message.setRetained(true) +for (i - 0 to 100) + msgTopic.publish(message) --- End diff -- Can you explain what is the correction here. Just to understand what went wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22459212 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,111 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = //localhost: + freePort + private val topic = def + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test(mqtt input stream) { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = abc -val topic = def + before { +ssc = new StreamingContext(master, framework, batchDuration) +setupMQTT() + } -// tests the API, does not actually test data receiving -val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) -val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { +if (ssc != null) { + ssc.stop() + ssc = null +} +Utils.deleteRecursively(persistenceDir) +tearDownMQTT() + } -// TODO: Actually test receiving data + test(mqtt input stream) { +val sendMessage = MQTT demo for spark streaming +val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, StorageLevel.MEMORY_ONLY) +var receiveMessage: List[String] = List() +receiveStream.foreachRDD { rdd = + if (rdd.collect.length 0) { +receiveMessage = receiveMessage ::: List(rdd.first) +receiveMessage + } +} +ssc.start() +publishData(sendMessage) +eventually(timeout(1 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage(0))) +} ssc.stop() } + + private def setupMQTT() { +broker = new BrokerService() +connector = new TransportConnector() +connector.setName(mqtt) +connector.setUri(new URI(mqtt: + brokerUri)) +broker.addConnector(connector) +broker.start() + } + + private def tearDownMQTT() { +if (broker != null) { + broker.stop() + broker = null +} +if (connector != null) { + connector.stop() + connector = null +} + } + + private def findFreePort(): Int = { +Utils.startServiceOnPort(23456, (trialPort: Int) = { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) +})._2 + } + + def publishData(data: String): Unit = { +var client: MqttClient = null +try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient(tcp: + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { +val msgTopic: MqttTopic = client.getTopic(topic) +val message: MqttMessage = new MqttMessage(data.getBytes(utf-8)) +message.setQos(1) +message.setRetained(true) +for (i - 0 to 100) + msgTopic.publish(message) --- End diff -- ``` for (...) { msgTopic.publish(message) } ``` Such code block should either be in one line or be within braces. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user Bilna commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22507984 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,111 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = //localhost: + freePort + private val topic = def + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test(mqtt input stream) { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = abc -val topic = def + before { +ssc = new StreamingContext(master, framework, batchDuration) +setupMQTT() + } -// tests the API, does not actually test data receiving -val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) -val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { +if (ssc != null) { + ssc.stop() + ssc = null +} +Utils.deleteRecursively(persistenceDir) +tearDownMQTT() + } -// TODO: Actually test receiving data + test(mqtt input stream) { +val sendMessage = MQTT demo for spark streaming +val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, StorageLevel.MEMORY_ONLY) +var receiveMessage: List[String] = List() +receiveStream.foreachRDD { rdd = + if (rdd.collect.length 0) { +receiveMessage = receiveMessage ::: List(rdd.first) +receiveMessage + } +} +ssc.start() +publishData(sendMessage) +eventually(timeout(1 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage(0))) +} ssc.stop() } + + private def setupMQTT() { +broker = new BrokerService() +connector = new TransportConnector() +connector.setName(mqtt) +connector.setUri(new URI(mqtt: + brokerUri)) +broker.addConnector(connector) +broker.start() + } + + private def tearDownMQTT() { +if (broker != null) { + broker.stop() + broker = null +} +if (connector != null) { + connector.stop() + connector = null +} + } + + private def findFreePort(): Int = { +Utils.startServiceOnPort(23456, (trialPort: Int) = { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) +})._2 + } + + def publishData(data: String): Unit = { +var client: MqttClient = null +try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient(tcp: + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { +val msgTopic: MqttTopic = client.getTopic(topic) +val message: MqttMessage = new MqttMessage(data.getBytes(utf-8)) +message.setQos(1) +message.setRetained(true) +for (i - 0 to 100) + msgTopic.publish(message) --- End diff -- ok.. thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68646513 [Test build #25035 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25035/consoleFull) for PR 3844 at commit [`acea3a3`](https://github.com/apache/spark/commit/acea3a31eba9d0853cb7484a16f8916219057be0). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68646517 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25035/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22446708 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,111 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = //localhost: + freePort + private val topic = def + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test(mqtt input stream) { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = abc -val topic = def + before { +ssc = new StreamingContext(master, framework, batchDuration) +setupMQTT() + } -// tests the API, does not actually test data receiving -val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) -val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { +if (ssc != null) { + ssc.stop() + ssc = null +} +Utils.deleteRecursively(persistenceDir) +tearDownMQTT() + } -// TODO: Actually test receiving data + test(mqtt input stream) { +val sendMessage = MQTT demo for spark streaming +val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, StorageLevel.MEMORY_ONLY) +var receiveMessage: List[String] = List() +receiveStream.foreachRDD { rdd = + if (rdd.collect.length 0) { +receiveMessage = receiveMessage ::: List(rdd.first) +receiveMessage + } +} +ssc.start() +publishData(sendMessage) +eventually(timeout(1 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage(0))) +} ssc.stop() } + + private def setupMQTT() { +broker = new BrokerService() +connector = new TransportConnector() +connector.setName(mqtt) +connector.setUri(new URI(mqtt: + brokerUri)) +broker.addConnector(connector) +broker.start() + } + + private def tearDownMQTT() { +if (broker != null) { + broker.stop() + broker = null +} +if (connector != null) { + connector.stop() + connector = null +} + } + + private def findFreePort(): Int = { +Utils.startServiceOnPort(23456, (trialPort: Int) = { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) +})._2 + } + + def publishData(data: String): Unit = { +var client: MqttClient = null +try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient(tcp: + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { +val msgTopic: MqttTopic = client.getTopic(topic) +val message: MqttMessage = new MqttMessage(data.getBytes(utf-8)) +message.setQos(1) +message.setRetained(true) +for (i - 0 to 100) + msgTopic.publish(message) --- End diff -- nit: I missed this in the last pass, but this violates the Scala syntax that we follow. I wont block this PR for this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68665022 Merging this, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/3844 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68589126 [Test build #25007 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25007/consoleFull) for PR 3844 at commit [`fac3904`](https://github.com/apache/spark/commit/fac3904a8e702722acca2a0e7217c5440ecda84a). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68589130 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25007/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68587233 [Test build #25007 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25007/consoleFull) for PR 3844 at commit [`fac3904`](https://github.com/apache/spark/commit/fac3904a8e702722acca2a0e7217c5440ecda84a). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22435908 --- Diff: external/mqtt/pom.xml --- @@ -47,6 +47,11 @@ version1.0.1/version /dependency dependency + groupIdorg.apache.activemq/groupId + artifactIdactivemq-core/artifactId + version5.7.0/version +/dependency --- End diff -- Ummm.. is this dependency necessary only for unit test? In that case it should be added to test scope (See scalatest below). We really try to avoid changing dependencies as any such change can cause conflicts with other stuff (spark's code dependencies) causing unforeseen failures. So if this is only necessary for test, please put it in test scope. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68616859 Only one more comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22429050 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,114 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = //localhost: + freePort + private val topic = def + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test(mqtt input stream) { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = abc -val topic = def + before { +ssc = new StreamingContext(master, framework, batchDuration) +setupMQTT + } -// tests the API, does not actually test data receiving -val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) -val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { +if (ssc != null) { + ssc.stop() + ssc = null +} +Utils.deleteRecursively(persistenceDir) +tearDownMQTT + } -// TODO: Actually test receiving data + test(mqtt input stream) { +val sendMessage = MQTT demo for spark streaming +val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, StorageLevel.MEMORY_ONLY) +var receiveMessage: List[String] = List() +receiveStream.foreachRDD { rdd = + if (rdd.collect.length 0) { +receiveMessage = receiveMessage ::: List(rdd.first) +receiveMessage + } +} +ssc.start() +publishData(sendMessage) +eventually(timeout(1 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage(0))) +} ssc.stop() } + + private def setupMQTT() { +broker = new BrokerService() +connector = new TransportConnector() +connector.setName(mqtt) +connector.setUri(new URI(mqtt: + brokerUri)) +broker.addConnector(connector) +broker.start() + } + + private def tearDownMQTT() { +if (broker != null) { + broker.stop() + broker = null +} +if (connector != null) { + connector.stop() + connector = null +} + } + + private def findFreePort(): Int = { +Utils.startServiceOnPort(23456, (trialPort: Int) = { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) +})._2 + } + + def publishData(data: String): Unit = { +var client: MqttClient = null +try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient(tcp: + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { +val msgTopic: MqttTopic = client.getTopic(topic) +val message: MqttMessage = new MqttMessage(data.getBytes(utf-8)) +message.setQos(1) +message.setRetained(true) +for (i - 0 to 10) + msgTopic.publish(message) + } +} catch { + case e: MqttException = println(Exception Caught: + e) --- End diff -- Why can there be an exception? And if there is an exception, why is it being ignored? Printing and not doing anything is essentially ignoring if the unit test
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68581068 This is almost looking good. few more comments and we are ready. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68525514 [Test build #24994 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24994/consoleFull) for PR 3844 at commit [`04503cf`](https://github.com/apache/spark/commit/04503cfa7f8168038c17198b6e45b16b89591e74). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68525516 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24994/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68525682 [Test build #24996 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24996/consoleFull) for PR 3844 at commit [`4b34ee7`](https://github.com/apache/spark/commit/4b34ee784e7c9c489cf0c22d73311c160bc67c47). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68525686 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24996/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22427641 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,114 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = //localhost: + freePort + private val topic = def + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test(mqtt input stream) { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = abc -val topic = def + before { +ssc = new StreamingContext(master, framework, batchDuration) +setupMQTT + } -// tests the API, does not actually test data receiving -val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) -val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { +if (ssc != null) { + ssc.stop() + ssc = null +} +Utils.deleteRecursively(persistenceDir) +tearDownMQTT + } -// TODO: Actually test receiving data + test(mqtt input stream) { +val sendMessage = MQTT demo for spark streaming +val receiveStream: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, StorageLevel.MEMORY_ONLY) +var receiveMessage: List[String] = List() +receiveStream.foreachRDD { rdd = + if (rdd.collect.length 0) { +receiveMessage = receiveMessage ::: List(rdd.first) +receiveMessage + } +} +ssc.start() +publishData(sendMessage) +eventually(timeout(1 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage(0))) +} ssc.stop() } + + private def setupMQTT() { +broker = new BrokerService() +connector = new TransportConnector() +connector.setName(mqtt) +connector.setUri(new URI(mqtt: + brokerUri)) +broker.addConnector(connector) +broker.start() + } + + private def tearDownMQTT() { +if (broker != null) { + broker.stop() + broker = null +} +if (connector != null) { + connector.stop() + connector = null +} + } + + private def findFreePort(): Int = { +Utils.startServiceOnPort(23456, (trialPort: Int) = { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) +})._2 + } + + def publishData(data: String): Unit = { +var client: MqttClient = null +try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath) + client = new MqttClient(tcp: + brokerUri, MqttClient.generateClientId(), persistence) + client.connect() + if (client.isConnected) { +val msgTopic: MqttTopic = client.getTopic(topic) +val message: MqttMessage = new MqttMessage(data.getBytes(utf-8)) +message.setQos(1) +message.setRetained(true) +for (i - 0 to 10) + msgTopic.publish(message) + } +} catch { + case e: MqttException = println(Exception Caught: + e) +} --- End diff -- nit: finally should be previous line after } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22427589 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,114 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = //localhost: + freePort + private val topic = def + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test(mqtt input stream) { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = abc -val topic = def + before { +ssc = new StreamingContext(master, framework, batchDuration) +setupMQTT + } -// tests the API, does not actually test data receiving -val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) -val test2: ReceiverInputDStream[String] = - MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + after { +if (ssc != null) { + ssc.stop() + ssc = null +} +Utils.deleteRecursively(persistenceDir) +tearDownMQTT --- End diff -- Same comment as for `setupMQTT` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22427587 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,114 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite +import java.net.{URI, ServerSocket} -import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.activemq.broker.{TransportConnector, BrokerService} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Milliseconds(500) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val freePort = findFreePort() + private val brokerUri = //localhost: + freePort + private val topic = def + private var ssc: StreamingContext = _ + private val persistenceDir = Utils.createTempDir() + private var broker: BrokerService = _ + private var connector: TransportConnector = _ - test(mqtt input stream) { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = abc -val topic = def + before { +ssc = new StreamingContext(master, framework, batchDuration) +setupMQTT --- End diff -- nit: this should be `setupMQTT()`. See Scala style guide - http://docs.scala-lang.org/style/method-invocation.html#arity0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22427535 --- Diff: external/mqtt/pom.xml --- @@ -16,65 +16,71 @@ ~ limitations under the License. -- --- End diff -- This file should not change completely like this. I think there has been some incorrect changes in the indentation. Please revert/fix this. If there are any changes necessary related to test scope (for unit test) only those lines should change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user Bilna commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22405599 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,65 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite - +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Seconds(1) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val brokerUrl = tcp://localhost:1883 --- End diff -- TCP/IP port 1883 is reserved with IANA for use with MQTT. That is why I hardcoded it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68428985 [Test build #24948 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24948/consoleFull) for PR 3844 at commit [`b1ac4ad`](https://github.com/apache/spark/commit/b1ac4ad62ff6d537f669699d5da49bc4ee1ab154). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68428989 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24948/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68429312 [Test build #24949 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24949/consoleFull) for PR 3844 at commit [`4b58094`](https://github.com/apache/spark/commit/4b580943de5137e947d1a6cdadd054020932ed8e). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68429316 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24949/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68433088 [Test build #24956 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24956/consoleFull) for PR 3844 at commit [`fc8eb28`](https://github.com/apache/spark/commit/fc8eb286db6aa8e78a567537996011f554eed969). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68435869 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24956/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68435866 [Test build #24956 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24956/consoleFull) for PR 3844 at commit [`fc8eb28`](https://github.com/apache/spark/commit/fc8eb286db6aa8e78a567537996011f554eed969). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22397269 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,65 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite - +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Seconds(1) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val brokerUrl = tcp://localhost:1883 --- End diff -- Who is running the broker? Also this port is hardcoded. There is a small, non-trivial chance that this port may not be free (in Jenkins, where multiple series of test maybe running in parallel) causing the server to not bind thus failing test. Can you find a free port (see [FlumeStreamSuite](https://github.com/apache/spark/blob/master/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala#L78)) and use that instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22397277 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,65 @@ package org.apache.spark.streaming.mqtt -import org.scalatest.FunSuite - +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { - - val batchDuration = Seconds(1) +class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter { + private val batchDuration = Seconds(1) private val master: String = local[2] - private val framework: String = this.getClass.getSimpleName + private val brokerUrl = tcp://localhost:1883 + private val topic = def + private var ssc: StreamingContext = _ - test(mqtt input stream) { -val ssc = new StreamingContext(master, framework, batchDuration) -val brokerUrl = abc -val topic = def + before { +ssc = new StreamingContext(master, framework, batchDuration) + } + after { +if (ssc != null) { + ssc.stop() + ssc = null +} + } -// tests the API, does not actually test data receiving -val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) -val test2: ReceiverInputDStream[String] = + test(mqtt input stream) { +val sendMessage = MQTT demo for spark streaming +publishData(sendMessage) +val receiveStream: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) - -// TODO: Actually test receiving data +var receiveMessage: String = +receiveStream.foreachRDD { rdd = + receiveMessage = rdd.first + receiveMessage +} +ssc.start() +eventually(timeout(1 milliseconds), interval(100 milliseconds)) { + assert(sendMessage.equals(receiveMessage)) +} ssc.stop() } + + def publishData(sendMessage: String): Unit = { +try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(/tmp) + val client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) + client.connect() + val msgTopic: MqttTopic = client.getTopic(topic) + val message: MqttMessage = new MqttMessage(String.valueOf(sendMessage).getBytes(utf-8)) + message.setQos(1) + message.setRetained(true) + msgTopic.publish(message) + println(Published data \ntopic: + msgTopic.getName() + \nMessage: + message) + client.disconnect() +} catch { + case e: MqttException = println(Exception Caught: + e) +} --- End diff -- Shouldnt there be a `finally` to close any running servers (client, etc.)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
GitHub user Bilna opened a pull request: https://github.com/apache/spark/pull/3844 [SPARK-4631] unit test for MQTT You can merge this pull request into a Git repository by running: $ git pull https://github.com/Bilna/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3844.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3844 commit 86164950acfc794c6c9b1db3663716ac4626c55b Author: bilna bil...@am.amrita.edu Date: 2014-12-30T13:06:09Z [SPARK-4631] unit test for MQTT --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68355440 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user prabeesh commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68355645 @tdas verify this patch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68417426 Jenksin, this is ok to test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68417591 [Test build #24922 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24922/consoleFull) for PR 3844 at commit [`e8b6623`](https://github.com/apache/spark/commit/e8b6623e5bd31fcb583fdeae5f1c954be672403d). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22372726 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,58 @@ package org.apache.spark.streaming.mqtt +import org.apache.spark.Logging import org.scalatest.FunSuite - +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { +abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging { val batchDuration = Seconds(1) + val master: String = local[2] + val framework: String = this.getClass.getSimpleName + val brokerUrl = tcp://localhost:1883 + val topic = def + + def publishData(sendMessage: String): Unit = { +try { + val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(/tmp) + val client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) + client.connect() + val msgTopic: MqttTopic = client.getTopic(topic) + val message: MqttMessage = new MqttMessage(String.valueOf(sendMessage).getBytes(utf-8)) + message.setQos(1) + message.setRetained(true) + msgTopic.publish(message) + println(Published data \ntopic: + msgTopic.getName() + \nMessage: + message) + client.disconnect() +} catch { + case e: MqttException = println(Exception Caught: + e) +} + } +} - private val master: String = local[2] - - private val framework: String = this.getClass.getSimpleName - +class MQTTStreamSuite extends MQTTStreamSuiteBase { test(mqtt input stream) { val ssc = new StreamingContext(master, framework, batchDuration) --- End diff -- This can cause the SparkContext to be not shutdown if there is an exception in the unit test, causing a leaked SparkContext. Take a look at how it is done in the KafkaStreamSuite with BeforeAndAfter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3844#discussion_r22372739 --- Diff: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala --- @@ -17,31 +17,58 @@ package org.apache.spark.streaming.mqtt +import org.apache.spark.Logging import org.scalatest.FunSuite - +import org.scalatest.concurrent.Eventually +import scala.concurrent.duration._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -class MQTTStreamSuite extends FunSuite { +abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging { --- End diff -- Why make this class if this is used by only one other class? Are you planning to another testsuite, which would justify this abstract class (like KafkaStreamSuiteBase is used by KafkaStreamSuite and ReliableKafkaStreamSuite)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68419761 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24922/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68419759 [Test build #24922 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24922/consoleFull) for PR 3844 at commit [`e8b6623`](https://github.com/apache/spark/commit/e8b6623e5bd31fcb583fdeae5f1c954be672403d). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with Logging ` * `class MQTTStreamSuite extends MQTTStreamSuiteBase ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68426613 [Test build #24946 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24946/consoleFull) for PR 3844 at commit [`5f6bfd2`](https://github.com/apache/spark/commit/5f6bfd2f4b11c08e76d02ccf5a5594151ccd9af5). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68427068 [Test build #24948 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24948/consoleFull) for PR 3844 at commit [`b1ac4ad`](https://github.com/apache/spark/commit/b1ac4ad62ff6d537f669699d5da49bc4ee1ab154). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68427356 [Test build #24949 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24949/consoleFull) for PR 3844 at commit [`4b58094`](https://github.com/apache/spark/commit/4b580943de5137e947d1a6cdadd054020932ed8e). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68428438 [Test build #24946 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24946/consoleFull) for PR 3844 at commit [`5f6bfd2`](https://github.com/apache/spark/commit/5f6bfd2f4b11c08e76d02ccf5a5594151ccd9af5). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4631] unit test for MQTT
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3844#issuecomment-68428440 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24946/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org