[
https://issues.apache.org/jira/browse/BAHIR-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703987#comment-16703987
]
ASF GitHub Bot commented on BAHIR-66:
-
Github user lukasz-antoniak commented on a diff in the pull request:
https://github.com/apache/bahir/pull/71#discussion_r237691415
--- Diff:
streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
---
@@ -15,105 +15,123 @@
* limitations under the License.
*/
-// scalastyle:off println awaitresult
package org.apache.spark.examples.streaming.zeromq
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
+import scala.util.Random
-import akka.actor.ActorSystem
-import akka.actor.actorRef2Scala
-import akka.util.ByteString
-import akka.zeromq._
-import akka.zeromq.Subscribe
import org.apache.log4j.{Level, Logger}
+import org.zeromq.ZContext
+import org.zeromq.ZMQ
+import org.zeromq.ZMQException
+import org.zeromq.ZMsg
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.zeromq._
+import org.apache.spark.streaming.zeromq.ZeroMQUtils
/**
- * A simple publisher for demonstration purposes, repeatedly publishes
random Messages
- * every one second.
+ * Simple publisher for demonstration purposes,
+ * repeatedly publishes random messages every one second.
*/
object SimpleZeroMQPublisher {
-
def main(args: Array[String]): Unit = {
if (args.length < 2) {
- System.err.println("Usage: SimpleZeroMQPublisher
")
+ // scalastyle:off println
+ System.err.println("Usage: SimpleZeroMQPublisher
")
+ // scalastyle:on println
System.exit(1)
}
val Seq(url, topic) = args.toSeq
-val acs: ActorSystem = ActorSystem()
-
-val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub,
Bind(url))
-implicit def stringToByteString(x: String): ByteString = ByteString(x)
-val messages: List[ByteString] = List("words ", "may ", "count ")
-while (true) {
- Thread.sleep(1000)
- pubSocket ! ZMQMessage(ByteString(topic) :: messages)
-}
-Await.result(acs.whenTerminated, Duration.Inf)
+val context = new ZContext
+val socket = context.createSocket(ZMQ.PUB)
+socket.bind(url)
+
+val zmqThread = new Thread(new Runnable {
+ def run() {
+val messages = List("words", "may", "count infinitely")
+val random = new Random
+while (!Thread.currentThread.isInterrupted) {
+ try {
+Thread.sleep(random.nextInt(1000))
+val msg1 = new ZMsg
+msg1.add(topic.getBytes)
+msg1.add(messages(random.nextInt(messages.size)).getBytes)
+msg1.send(socket)
+ } catch {
+case e: ZMQException if ZMQ.Error.ETERM.getCode ==
e.getErrorCode =>
+ Thread.currentThread.interrupt()
+case e: InterruptedException =>
+case e: Throwable => throw e
+ }
+}
+ }
+})
+
+sys.addShutdownHook( {
+ context.destroy()
+ zmqThread.interrupt()
+ zmqThread.join()
+} )
+
+zmqThread.start()
}
}
-// scalastyle:off
/**
- * A sample wordcount with ZeroMQStream stream
- *
- * To work with zeroMQ, some native libraries have to be installed.
- * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
- * (http://www.zeromq.org/intro:get-the-software)
+ * Sample word count with ZeroMQ stream.
*
- * Usage: ZeroMQWordCount
- *and describe where zeroMq publisher is running.
+ * Usage: ZeroMQWordCount
+ *describes where ZeroMQ publisher is running
+ *defines logical message type
*
- * To run this example locally, you may run publisher as
+ * To run this example locally, you may start publisher as:
*`$ bin/run-example \
* org.apache.spark.examples.streaming.zeromq.SimpleZeroMQPublisher
tcp://127.0.0.1:1234 foo`
- * and run the example as
+ * and run the example as:
*`$ bin/run-example \
* org.apache.spark.examples.streaming.zeromq.ZeroMQWordCount
tcp://127.0.0.1:1234 foo`
*/
-// scalastyle:on
object ZeroMQWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
- System.err.println("Usage: ZeroMQWordCount ")
+ // scalastyle:off println
+ System.err.println("Usage: ZeroMQWordCount