[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-29 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-72061693
  
See #4270 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-28 Thread srowen
Github user srowen commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-71854715
  
@dragos good catch. It sounds like an issue with the test if anything. You 
could just reopen SPARK-4631 with a workaround.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-28 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-71858250
  
@srowen I commented on the ticket, but I can't re-open it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-28 Thread dragos
Github user dragos commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-71852913
  
There also seems to be a race condition introduced by this test. It fails 
consistently for me (but passes if I add a `Thread.sleep(50)` inside 
`publishData`). I'll open a ticket.

```
[info] - mqtt input stream *** FAILED *** (552 milliseconds)
[info]   org.eclipse.paho.client.mqttv3.MqttException: Too many publishes 
in progress
[info]   at 
org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:432)
[info]   at 
org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:121)
[info]   at 
org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:139)
[info]   at 
org.eclipse.paho.client.mqttv3.MqttTopic.publish(MqttTopic.java:107)
[info]   at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:126)
[info]   at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$publishData$1.apply(MQTTStreamSuite.scala:124)
[info]   at scala.collection.immutable.Range.foreach(Range.scala:141)
[info]   at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite.publishData(MQTTStreamSuite.scala:124)
[info]   at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply$mcV$sp(MQTTStreamSuite.scala:78)
[info]   at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply(MQTTStreamSuite.scala:66)
[info]   at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$3.apply(MQTTStreamSuite.scala:66)
[info]   at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-12 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-69675455
  
It looks like there's maybe a port-binding / racing issue here?


https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=centos/1356/testReport/

```
sbt.ForkMain$ForkError: Transport Connector could not be registered in JMX: 
Failed to bind to server socket: mqtt://localhost:23456 due to: 
java.net.BindException: Address already in use
at 
org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:27)
at 
org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:1977)
at 
org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2468)
at 
org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2385)
at 
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:684)
at 
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:642)
at 
org.apache.activemq.broker.BrokerService.start(BrokerService.java:578)
at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite.org$apache$spark$streaming$mqtt$MQTTStreamSuite$$setupMQTT(MQTTStreamSuite.scala:90)
at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply$mcV$sp(MQTTStreamSuite.scala:53)
at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply(MQTTStreamSuite.scala:51)
at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite$$anonfun$1.apply(MQTTStreamSuite.scala:51)
at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:195)
at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite.runTest(MQTTStreamSuite.scala:37)
at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
at org.scalatest.Suite$class.run(Suite.scala:1424)
at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite.org$scalatest$BeforeAndAfter$$super$run(MQTTStreamSuite.scala:37)
at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
at 
org.apache.spark.streaming.mqtt.MQTTStreamSuite.run(MQTTStreamSuite.scala:37)
at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
at sbt.ForkMain$Run$2.call(ForkMain.java:294)
at sbt.ForkMain$Run$2.call(ForkMain.java:284)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: sbt.ForkMain$ForkError: Failed to bind to server socket: 
mqtt://localhost:23456 due to: java.net.BindException: Address already in use
at 
org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33)
at 
org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:138)
at 
org.apache.activemq.transport.tcp.TcpTransportFactory.doBind(TcpTransportFactory.java:60)
at 
org.apache.activemq.transport.TransportFactory.bind(TransportFactory.java:124)
at 
org.apache.activemq.broker.TransportConnector.createTransportServer(TransportConnector.java:310)
at 
org.apache.activemq.broker.TransportConnector.getServer(TransportConnector.java:136)
at 
org.apache.activemq.broker.TransportConnector.asManagedConnector(TransportConnector.java:105)
at 
org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:1972)
... 38 more
Caused by: sbt.ForkMain$ForkError: Address 

[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-12 Thread Bilna
Github user Bilna commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-69694551
  
ok.. I will look into it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-05 Thread Bilna
Github user Bilna commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68686800
  
@tdas, Thanks 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-05 Thread Bilna
Github user Bilna commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22453522
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,111 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = //localhost: + freePort
+  private val topic = def
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test(mqtt input stream) {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = abc
-val topic = def
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT()
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
-  MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+Utils.deleteRecursively(persistenceDir)
+tearDownMQTT()
+  }
 
-// TODO: Actually test receiving data
+  test(mqtt input stream) {
+val sendMessage = MQTT demo for spark streaming
+val receiveStream: ReceiverInputDStream[String] =
+  MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, 
StorageLevel.MEMORY_ONLY)
+var receiveMessage: List[String] = List()
+receiveStream.foreachRDD { rdd =
+  if (rdd.collect.length  0) {
+receiveMessage = receiveMessage ::: List(rdd.first)
+receiveMessage
+  }
+}
+ssc.start()
+publishData(sendMessage)
+eventually(timeout(1 milliseconds), interval(100 milliseconds)) {
+  assert(sendMessage.equals(receiveMessage(0)))
+}
 ssc.stop()
   }
+
+  private def setupMQTT() {
+broker = new BrokerService()
+connector = new TransportConnector()
+connector.setName(mqtt)
+connector.setUri(new URI(mqtt: + brokerUri))
+broker.addConnector(connector)
+broker.start()
+  }
+
+  private def tearDownMQTT() {
+if (broker != null) {
+  broker.stop()
+  broker = null
+}
+if (connector != null) {
+  connector.stop()
+  connector = null
+}
+  }
+
+  private def findFreePort(): Int = {
+Utils.startServiceOnPort(23456, (trialPort: Int) = {
+  val socket = new ServerSocket(trialPort)
+  socket.close()
+  (null, trialPort)
+})._2
+  }
+
+  def publishData(data: String): Unit = {
+var client: MqttClient = null
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+  client = new MqttClient(tcp: + brokerUri, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  if (client.isConnected) {
+val msgTopic: MqttTopic = client.getTopic(topic)
+val message: MqttMessage = new MqttMessage(data.getBytes(utf-8))
+message.setQos(1)
+message.setRetained(true)
+for (i - 0 to 100)
+  msgTopic.publish(message)
--- End diff --

Can you explain what is the correction here. Just to understand what went 
wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes 

[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22459212
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,111 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = //localhost: + freePort
+  private val topic = def
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test(mqtt input stream) {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = abc
-val topic = def
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT()
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
-  MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+Utils.deleteRecursively(persistenceDir)
+tearDownMQTT()
+  }
 
-// TODO: Actually test receiving data
+  test(mqtt input stream) {
+val sendMessage = MQTT demo for spark streaming
+val receiveStream: ReceiverInputDStream[String] =
+  MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, 
StorageLevel.MEMORY_ONLY)
+var receiveMessage: List[String] = List()
+receiveStream.foreachRDD { rdd =
+  if (rdd.collect.length  0) {
+receiveMessage = receiveMessage ::: List(rdd.first)
+receiveMessage
+  }
+}
+ssc.start()
+publishData(sendMessage)
+eventually(timeout(1 milliseconds), interval(100 milliseconds)) {
+  assert(sendMessage.equals(receiveMessage(0)))
+}
 ssc.stop()
   }
+
+  private def setupMQTT() {
+broker = new BrokerService()
+connector = new TransportConnector()
+connector.setName(mqtt)
+connector.setUri(new URI(mqtt: + brokerUri))
+broker.addConnector(connector)
+broker.start()
+  }
+
+  private def tearDownMQTT() {
+if (broker != null) {
+  broker.stop()
+  broker = null
+}
+if (connector != null) {
+  connector.stop()
+  connector = null
+}
+  }
+
+  private def findFreePort(): Int = {
+Utils.startServiceOnPort(23456, (trialPort: Int) = {
+  val socket = new ServerSocket(trialPort)
+  socket.close()
+  (null, trialPort)
+})._2
+  }
+
+  def publishData(data: String): Unit = {
+var client: MqttClient = null
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+  client = new MqttClient(tcp: + brokerUri, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  if (client.isConnected) {
+val msgTopic: MqttTopic = client.getTopic(topic)
+val message: MqttMessage = new MqttMessage(data.getBytes(utf-8))
+message.setQos(1)
+message.setRetained(true)
+for (i - 0 to 100)
+  msgTopic.publish(message)
--- End diff --

```
for (...) {
msgTopic.publish(message)
}
```

Such code block should either be in one line or be within braces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If 

[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-05 Thread Bilna
Github user Bilna commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22507984
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,111 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = //localhost: + freePort
+  private val topic = def
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test(mqtt input stream) {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = abc
-val topic = def
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT()
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
-  MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+Utils.deleteRecursively(persistenceDir)
+tearDownMQTT()
+  }
 
-// TODO: Actually test receiving data
+  test(mqtt input stream) {
+val sendMessage = MQTT demo for spark streaming
+val receiveStream: ReceiverInputDStream[String] =
+  MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, 
StorageLevel.MEMORY_ONLY)
+var receiveMessage: List[String] = List()
+receiveStream.foreachRDD { rdd =
+  if (rdd.collect.length  0) {
+receiveMessage = receiveMessage ::: List(rdd.first)
+receiveMessage
+  }
+}
+ssc.start()
+publishData(sendMessage)
+eventually(timeout(1 milliseconds), interval(100 milliseconds)) {
+  assert(sendMessage.equals(receiveMessage(0)))
+}
 ssc.stop()
   }
+
+  private def setupMQTT() {
+broker = new BrokerService()
+connector = new TransportConnector()
+connector.setName(mqtt)
+connector.setUri(new URI(mqtt: + brokerUri))
+broker.addConnector(connector)
+broker.start()
+  }
+
+  private def tearDownMQTT() {
+if (broker != null) {
+  broker.stop()
+  broker = null
+}
+if (connector != null) {
+  connector.stop()
+  connector = null
+}
+  }
+
+  private def findFreePort(): Int = {
+Utils.startServiceOnPort(23456, (trialPort: Int) = {
+  val socket = new ServerSocket(trialPort)
+  socket.close()
+  (null, trialPort)
+})._2
+  }
+
+  def publishData(data: String): Unit = {
+var client: MqttClient = null
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+  client = new MqttClient(tcp: + brokerUri, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  if (client.isConnected) {
+val msgTopic: MqttTopic = client.getTopic(topic)
+val message: MqttMessage = new MqttMessage(data.getBytes(utf-8))
+message.setQos(1)
+message.setRetained(true)
+for (i - 0 to 100)
+  msgTopic.publish(message)
--- End diff --

ok.. thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact 

[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68646513
  
  [Test build #25035 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25035/consoleFull)
 for   PR 3844 at commit 
[`acea3a3`](https://github.com/apache/spark/commit/acea3a31eba9d0853cb7484a16f8916219057be0).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class MQTTStreamSuite extends FunSuite with Eventually with 
BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68646517
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25035/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22446708
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,111 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = //localhost: + freePort
+  private val topic = def
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test(mqtt input stream) {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = abc
-val topic = def
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT()
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
-  MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+Utils.deleteRecursively(persistenceDir)
+tearDownMQTT()
+  }
 
-// TODO: Actually test receiving data
+  test(mqtt input stream) {
+val sendMessage = MQTT demo for spark streaming
+val receiveStream: ReceiverInputDStream[String] =
+  MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, 
StorageLevel.MEMORY_ONLY)
+var receiveMessage: List[String] = List()
+receiveStream.foreachRDD { rdd =
+  if (rdd.collect.length  0) {
+receiveMessage = receiveMessage ::: List(rdd.first)
+receiveMessage
+  }
+}
+ssc.start()
+publishData(sendMessage)
+eventually(timeout(1 milliseconds), interval(100 milliseconds)) {
+  assert(sendMessage.equals(receiveMessage(0)))
+}
 ssc.stop()
   }
+
+  private def setupMQTT() {
+broker = new BrokerService()
+connector = new TransportConnector()
+connector.setName(mqtt)
+connector.setUri(new URI(mqtt: + brokerUri))
+broker.addConnector(connector)
+broker.start()
+  }
+
+  private def tearDownMQTT() {
+if (broker != null) {
+  broker.stop()
+  broker = null
+}
+if (connector != null) {
+  connector.stop()
+  connector = null
+}
+  }
+
+  private def findFreePort(): Int = {
+Utils.startServiceOnPort(23456, (trialPort: Int) = {
+  val socket = new ServerSocket(trialPort)
+  socket.close()
+  (null, trialPort)
+})._2
+  }
+
+  def publishData(data: String): Unit = {
+var client: MqttClient = null
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+  client = new MqttClient(tcp: + brokerUri, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  if (client.isConnected) {
+val msgTopic: MqttTopic = client.getTopic(topic)
+val message: MqttMessage = new MqttMessage(data.getBytes(utf-8))
+message.setQos(1)
+message.setRetained(true)
+for (i - 0 to 100)
+  msgTopic.publish(message)
--- End diff --

nit: I missed this in the last pass, but this violates the Scala syntax 
that we follow. I wont block this PR for this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not 

[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68665022
  
Merging this, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3844


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68589126
  
  [Test build #25007 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25007/consoleFull)
 for   PR 3844 at commit 
[`fac3904`](https://github.com/apache/spark/commit/fac3904a8e702722acca2a0e7217c5440ecda84a).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class MQTTStreamSuite extends FunSuite with Eventually with 
BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68589130
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25007/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68587233
  
  [Test build #25007 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25007/consoleFull)
 for   PR 3844 at commit 
[`fac3904`](https://github.com/apache/spark/commit/fac3904a8e702722acca2a0e7217c5440ecda84a).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22435908
  
--- Diff: external/mqtt/pom.xml ---
@@ -47,6 +47,11 @@
   version1.0.1/version
 /dependency
 dependency
+  groupIdorg.apache.activemq/groupId
+  artifactIdactivemq-core/artifactId
+  version5.7.0/version
+/dependency
--- End diff --

Ummm.. is this dependency necessary only for unit test? In that case it 
should be added to test scope (See scalatest below). We really try to avoid 
changing dependencies as any such change can cause conflicts with other stuff 
(spark's code dependencies) causing unforeseen failures. So if this is only 
necessary for test, please put it in test scope. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-03 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68616859
  
Only one more comment. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22429050
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,114 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = //localhost: + freePort
+  private val topic = def
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test(mqtt input stream) {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = abc
-val topic = def
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
-  MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+Utils.deleteRecursively(persistenceDir)
+tearDownMQTT
+  }
 
-// TODO: Actually test receiving data
+  test(mqtt input stream) {
+val sendMessage = MQTT demo for spark streaming
+val receiveStream: ReceiverInputDStream[String] =
+  MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, 
StorageLevel.MEMORY_ONLY)
+var receiveMessage: List[String] = List()
+receiveStream.foreachRDD { rdd =
+  if (rdd.collect.length  0) {
+receiveMessage = receiveMessage ::: List(rdd.first)
+receiveMessage
+  }
+}
+ssc.start()
+publishData(sendMessage)
+eventually(timeout(1 milliseconds), interval(100 milliseconds)) {
+  assert(sendMessage.equals(receiveMessage(0)))
+}
 ssc.stop()
   }
+
+  private def setupMQTT() {
+broker = new BrokerService()
+connector = new TransportConnector()
+connector.setName(mqtt)
+connector.setUri(new URI(mqtt: + brokerUri))
+broker.addConnector(connector)
+broker.start()
+  }
+
+  private def tearDownMQTT() {
+if (broker != null) {
+  broker.stop()
+  broker = null
+}
+if (connector != null) {
+  connector.stop()
+  connector = null
+}
+  }
+
+  private def findFreePort(): Int = {
+Utils.startServiceOnPort(23456, (trialPort: Int) = {
+  val socket = new ServerSocket(trialPort)
+  socket.close()
+  (null, trialPort)
+})._2
+  }
+
+  def publishData(data: String): Unit = {
+var client: MqttClient = null
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+  client = new MqttClient(tcp: + brokerUri, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  if (client.isConnected) {
+val msgTopic: MqttTopic = client.getTopic(topic)
+val message: MqttMessage = new MqttMessage(data.getBytes(utf-8))
+message.setQos(1)
+message.setRetained(true)
+for (i - 0 to 10)
+  msgTopic.publish(message)
+  }
+} catch {
+  case e: MqttException = println(Exception Caught:  + e)
--- End diff --

Why can there be an exception? And if there is an exception, why is it 
being ignored? Printing and not doing anything is essentially ignoring if the 
unit test 

[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-02 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68581068
  
This is almost looking good. few more comments and we are ready. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68525514
  
  [Test build #24994 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24994/consoleFull)
 for   PR 3844 at commit 
[`04503cf`](https://github.com/apache/spark/commit/04503cfa7f8168038c17198b6e45b16b89591e74).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class MQTTStreamSuite extends FunSuite with Eventually with 
BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68525516
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24994/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68525682
  
  [Test build #24996 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24996/consoleFull)
 for   PR 3844 at commit 
[`4b34ee7`](https://github.com/apache/spark/commit/4b34ee784e7c9c489cf0c22d73311c160bc67c47).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68525686
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24996/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22427641
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,114 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = //localhost: + freePort
+  private val topic = def
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test(mqtt input stream) {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = abc
-val topic = def
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
-  MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+Utils.deleteRecursively(persistenceDir)
+tearDownMQTT
+  }
 
-// TODO: Actually test receiving data
+  test(mqtt input stream) {
+val sendMessage = MQTT demo for spark streaming
+val receiveStream: ReceiverInputDStream[String] =
+  MQTTUtils.createStream(ssc, tcp: + brokerUri, topic, 
StorageLevel.MEMORY_ONLY)
+var receiveMessage: List[String] = List()
+receiveStream.foreachRDD { rdd =
+  if (rdd.collect.length  0) {
+receiveMessage = receiveMessage ::: List(rdd.first)
+receiveMessage
+  }
+}
+ssc.start()
+publishData(sendMessage)
+eventually(timeout(1 milliseconds), interval(100 milliseconds)) {
+  assert(sendMessage.equals(receiveMessage(0)))
+}
 ssc.stop()
   }
+
+  private def setupMQTT() {
+broker = new BrokerService()
+connector = new TransportConnector()
+connector.setName(mqtt)
+connector.setUri(new URI(mqtt: + brokerUri))
+broker.addConnector(connector)
+broker.start()
+  }
+
+  private def tearDownMQTT() {
+if (broker != null) {
+  broker.stop()
+  broker = null
+}
+if (connector != null) {
+  connector.stop()
+  connector = null
+}
+  }
+
+  private def findFreePort(): Int = {
+Utils.startServiceOnPort(23456, (trialPort: Int) = {
+  val socket = new ServerSocket(trialPort)
+  socket.close()
+  (null, trialPort)
+})._2
+  }
+
+  def publishData(data: String): Unit = {
+var client: MqttClient = null
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
+  client = new MqttClient(tcp: + brokerUri, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  if (client.isConnected) {
+val msgTopic: MqttTopic = client.getTopic(topic)
+val message: MqttMessage = new MqttMessage(data.getBytes(utf-8))
+message.setQos(1)
+message.setRetained(true)
+for (i - 0 to 10)
+  msgTopic.publish(message)
+  }
+} catch {
+  case e: MqttException = println(Exception Caught:  + e)
+}
--- End diff --

nit: finally should be previous line after }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as 

[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22427589
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,114 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = //localhost: + freePort
+  private val topic = def
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test(mqtt input stream) {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = abc
-val topic = def
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
-  MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+Utils.deleteRecursively(persistenceDir)
+tearDownMQTT
--- End diff --

Same comment as for `setupMQTT`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22427587
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,114 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
+import java.net.{URI, ServerSocket}
 
-import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Milliseconds(500)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val freePort = findFreePort()
+  private val brokerUri = //localhost: + freePort
+  private val topic = def
+  private var ssc: StreamingContext = _
+  private val persistenceDir = Utils.createTempDir()
+  private var broker: BrokerService = _
+  private var connector: TransportConnector = _
 
-  test(mqtt input stream) {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = abc
-val topic = def
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+setupMQTT
--- End diff --

nit: this should be `setupMQTT()`. See Scala style guide - 
http://docs.scala-lang.org/style/method-invocation.html#arity0 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22427535
  
--- Diff: external/mqtt/pom.xml ---
@@ -16,65 +16,71 @@
   ~ limitations under the License.
   --
 
--- End diff --

This file should not change completely like this. I think there has been 
some incorrect changes in the indentation. Please revert/fix this. If there are 
any changes necessary related to test scope (for unit test) only those lines 
should change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2015-01-01 Thread Bilna
Github user Bilna commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22405599
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,65 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
-
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Seconds(1)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val brokerUrl = tcp://localhost:1883
--- End diff --

TCP/IP port 1883 is reserved with IANA for use with MQTT. That is why I 
hardcoded it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68428985
  
  [Test build #24948 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24948/consoleFull)
 for   PR 3844 at commit 
[`b1ac4ad`](https://github.com/apache/spark/commit/b1ac4ad62ff6d537f669699d5da49bc4ee1ab154).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class MQTTStreamSuite extends FunSuite with Eventually with 
BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68428989
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24948/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68429312
  
  [Test build #24949 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24949/consoleFull)
 for   PR 3844 at commit 
[`4b58094`](https://github.com/apache/spark/commit/4b580943de5137e947d1a6cdadd054020932ed8e).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class MQTTStreamSuite extends FunSuite with Eventually with 
BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68429316
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24949/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68433088
  
  [Test build #24956 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24956/consoleFull)
 for   PR 3844 at commit 
[`fc8eb28`](https://github.com/apache/spark/commit/fc8eb286db6aa8e78a567537996011f554eed969).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-31 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68435869
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24956/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-31 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68435866
  
  [Test build #24956 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24956/consoleFull)
 for   PR 3844 at commit 
[`fc8eb28`](https://github.com/apache/spark/commit/fc8eb286db6aa8e78a567537996011f554eed969).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class MQTTStreamSuite extends FunSuite with Eventually with 
BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22397269
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,65 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
-
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Seconds(1)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val brokerUrl = tcp://localhost:1883
--- End diff --

Who is running the broker? Also this port is hardcoded. There is a small, 
non-trivial chance that this port may not be free (in Jenkins, where multiple 
series of test maybe running in parallel) causing the server to not bind thus 
failing test. Can you find a free port (see 
[FlumeStreamSuite](https://github.com/apache/spark/blob/master/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala#L78))
 and use that instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22397277
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,65 @@
 
 package org.apache.spark.streaming.mqtt
 
-import org.scalatest.FunSuite
-
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
-
-  val batchDuration = Seconds(1)
+class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter 
{
 
+  private val batchDuration = Seconds(1)
   private val master: String = local[2]
-
   private val framework: String = this.getClass.getSimpleName
+  private val brokerUrl = tcp://localhost:1883
+  private val topic = def
+  private var ssc: StreamingContext = _
 
-  test(mqtt input stream) {
-val ssc = new StreamingContext(master, framework, batchDuration)
-val brokerUrl = abc
-val topic = def
+  before {
+ssc = new StreamingContext(master, framework, batchDuration)
+  }
+  after {
+if (ssc != null) {
+  ssc.stop()
+  ssc = null
+}
+  }
 
-// tests the API, does not actually test data receiving
-val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, 
brokerUrl, topic)
-val test2: ReceiverInputDStream[String] =
+  test(mqtt input stream) {
+val sendMessage = MQTT demo for spark streaming
+publishData(sendMessage)
+val receiveStream: ReceiverInputDStream[String] =
   MQTTUtils.createStream(ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2)
-
-// TODO: Actually test receiving data
+var receiveMessage: String = 
+receiveStream.foreachRDD { rdd =
+  receiveMessage = rdd.first
+  receiveMessage
+}
+ssc.start()
+eventually(timeout(1 milliseconds), interval(100 milliseconds)) {
+  assert(sendMessage.equals(receiveMessage))
+}
 ssc.stop()
   }
+
+  def publishData(sendMessage: String): Unit = {
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(/tmp)
+  val client: MqttClient = new MqttClient(brokerUrl, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  val msgTopic: MqttTopic = client.getTopic(topic)
+  val message: MqttMessage = new 
MqttMessage(String.valueOf(sendMessage).getBytes(utf-8))
+  message.setQos(1)
+  message.setRetained(true)
+  msgTopic.publish(message)
+  println(Published data \ntopic:  + msgTopic.getName() + 
\nMessage:  + message)
+  client.disconnect()
+} catch {
+  case e: MqttException = println(Exception Caught:  + e)
+}
--- End diff --

Shouldnt there be a `finally` to close any running servers (client, etc.)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread Bilna
GitHub user Bilna opened a pull request:

https://github.com/apache/spark/pull/3844

[SPARK-4631] unit test for MQTT



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Bilna/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3844


commit 86164950acfc794c6c9b1db3663716ac4626c55b
Author: bilna bil...@am.amrita.edu
Date:   2014-12-30T13:06:09Z

[SPARK-4631] unit test for MQTT




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68355440
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread prabeesh
Github user prabeesh commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68355645
  
@tdas verify this patch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68417426
  
Jenksin, this is ok to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68417591
  
  [Test build #24922 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24922/consoleFull)
 for   PR 3844 at commit 
[`e8b6623`](https://github.com/apache/spark/commit/e8b6623e5bd31fcb583fdeae5f1c954be672403d).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22372726
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,58 @@
 
 package org.apache.spark.streaming.mqtt
 
+import org.apache.spark.Logging
 import org.scalatest.FunSuite
-
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
+abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with 
Logging {
 
   val batchDuration = Seconds(1)
+  val master: String = local[2]
+  val framework: String = this.getClass.getSimpleName
+  val brokerUrl = tcp://localhost:1883
+  val topic = def
+  
+  def publishData(sendMessage: String): Unit = {
+try {
+  val persistence: MqttClientPersistence = new 
MqttDefaultFilePersistence(/tmp)
+  val client: MqttClient = new MqttClient(brokerUrl, 
MqttClient.generateClientId(), persistence)
+  client.connect()
+  val msgTopic: MqttTopic = client.getTopic(topic)
+  val message: MqttMessage = new 
MqttMessage(String.valueOf(sendMessage).getBytes(utf-8))
+  message.setQos(1)
+  message.setRetained(true)
+  msgTopic.publish(message)
+  println(Published data \ntopic:  + msgTopic.getName() + 
\nMessage:  + message)
+  client.disconnect()
+} catch {
+  case e: MqttException = println(Exception Caught:  + e)
+}
+  }
+}
 
-  private val master: String = local[2]
-
-  private val framework: String = this.getClass.getSimpleName
-
+class MQTTStreamSuite extends MQTTStreamSuiteBase {
   test(mqtt input stream) {
 val ssc = new StreamingContext(master, framework, batchDuration)
--- End diff --

This can cause the SparkContext to be not shutdown if there is an exception 
in the unit test, causing a leaked SparkContext. Take a look at how it is done 
in the KafkaStreamSuite with BeforeAndAfter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/3844#discussion_r22372739
  
--- Diff: 
external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
 ---
@@ -17,31 +17,58 @@
 
 package org.apache.spark.streaming.mqtt
 
+import org.apache.spark.Logging
 import org.scalatest.FunSuite
-
+import org.scalatest.concurrent.Eventually
+import scala.concurrent.duration._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.eclipse.paho.client.mqttv3._
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
-class MQTTStreamSuite extends FunSuite {
+abstract class MQTTStreamSuiteBase extends FunSuite with Eventually with 
Logging {
--- End diff --

Why make this class if this is used by only one other class? Are you 
planning to another testsuite, which would justify this abstract class (like 
KafkaStreamSuiteBase is used by KafkaStreamSuite and ReliableKafkaStreamSuite)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68419761
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24922/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68419759
  
  [Test build #24922 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24922/consoleFull)
 for   PR 3844 at commit 
[`e8b6623`](https://github.com/apache/spark/commit/e8b6623e5bd31fcb583fdeae5f1c954be672403d).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `abstract class MQTTStreamSuiteBase extends FunSuite with Eventually 
with Logging `
  * `class MQTTStreamSuite extends MQTTStreamSuiteBase `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68426613
  
  [Test build #24946 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24946/consoleFull)
 for   PR 3844 at commit 
[`5f6bfd2`](https://github.com/apache/spark/commit/5f6bfd2f4b11c08e76d02ccf5a5594151ccd9af5).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68427068
  
  [Test build #24948 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24948/consoleFull)
 for   PR 3844 at commit 
[`b1ac4ad`](https://github.com/apache/spark/commit/b1ac4ad62ff6d537f669699d5da49bc4ee1ab154).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68427356
  
  [Test build #24949 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24949/consoleFull)
 for   PR 3844 at commit 
[`4b58094`](https://github.com/apache/spark/commit/4b580943de5137e947d1a6cdadd054020932ed8e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68428438
  
  [Test build #24946 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24946/consoleFull)
 for   PR 3844 at commit 
[`5f6bfd2`](https://github.com/apache/spark/commit/5f6bfd2f4b11c08e76d02ccf5a5594151ccd9af5).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class MQTTStreamSuite extends FunSuite with Eventually with 
BeforeAndAfter `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4631] unit test for MQTT

2014-12-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3844#issuecomment-68428440
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24946/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org