[jira] [Commented] (BAHIR-66) Add test that ZeroMQ streaming connector can receive data

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703989#comment-16703989
 ] 

ASF GitHub Bot commented on BAHIR-66:
-

Github user lukasz-antoniak commented on a diff in the pull request:

https://github.com/apache/bahir/pull/71#discussion_r237691550
  
--- Diff: streaming-zeromq/pom.xml ---
@@ -45,22 +45,18 @@
   ${spark.version}
   provided
 
+
+  org.zeromq
+  jeromq
+  0.4.3
+
 
--- End diff --

Which files do you refer to? Could not locate them.


> Add test that ZeroMQ streaming connector can receive data
> -
>
> Key: BAHIR-66
> URL: https://issues.apache.org/jira/browse/BAHIR-66
> Project: Bahir
>  Issue Type: Sub-task
>  Components: Spark Streaming Connectors
>Reporter: Christian Kadner
>Priority: Major
>  Labels: test
>
> Add test cases that verify that the *ZeroMQ streaming connector* can receive 
> streaming data.
> See [BAHIR-63|https://issues.apache.org/jira/browse/BAHIR-63]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BAHIR-66) Add test that ZeroMQ streaming connector can receive data

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/BAHIR-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16703987#comment-16703987
 ] 

ASF GitHub Bot commented on BAHIR-66:
-

Github user lukasz-antoniak commented on a diff in the pull request:

https://github.com/apache/bahir/pull/71#discussion_r237691415
  
--- Diff: 
streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala
 ---
@@ -15,105 +15,123 @@
  * limitations under the License.
  */
 
-// scalastyle:off println awaitresult
 package org.apache.spark.examples.streaming.zeromq
 
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 import scala.language.implicitConversions
+import scala.util.Random
 
-import akka.actor.ActorSystem
-import akka.actor.actorRef2Scala
-import akka.util.ByteString
-import akka.zeromq._
-import akka.zeromq.Subscribe
 import org.apache.log4j.{Level, Logger}
+import org.zeromq.ZContext
+import org.zeromq.ZMQ
+import org.zeromq.ZMQException
+import org.zeromq.ZMsg
 
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.zeromq._
+import org.apache.spark.streaming.zeromq.ZeroMQUtils
 
 /**
- * A simple publisher for demonstration purposes, repeatedly publishes 
random Messages
- * every one second.
+ * Simple publisher for demonstration purposes,
+ * repeatedly publishes random messages every one second.
  */
 object SimpleZeroMQPublisher {
-
   def main(args: Array[String]): Unit = {
 if (args.length < 2) {
-  System.err.println("Usage: SimpleZeroMQPublisher   
")
+  // scalastyle:off println
+  System.err.println("Usage: SimpleZeroMQPublisher  
")
+  // scalastyle:on println
   System.exit(1)
 }
 
 val Seq(url, topic) = args.toSeq
-val acs: ActorSystem = ActorSystem()
-
-val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, 
Bind(url))
-implicit def stringToByteString(x: String): ByteString = ByteString(x)
-val messages: List[ByteString] = List("words ", "may ", "count ")
-while (true) {
-  Thread.sleep(1000)
-  pubSocket ! ZMQMessage(ByteString(topic) :: messages)
-}
-Await.result(acs.whenTerminated, Duration.Inf)
+val context = new ZContext
+val socket = context.createSocket(ZMQ.PUB)
+socket.bind(url)
+
+val zmqThread = new Thread(new Runnable {
+  def run() {
+val messages = List("words", "may", "count infinitely")
+val random = new Random
+while (!Thread.currentThread.isInterrupted) {
+  try {
+Thread.sleep(random.nextInt(1000))
+val msg1 = new ZMsg
+msg1.add(topic.getBytes)
+msg1.add(messages(random.nextInt(messages.size)).getBytes)
+msg1.send(socket)
+  } catch {
+case e: ZMQException if ZMQ.Error.ETERM.getCode == 
e.getErrorCode =>
+  Thread.currentThread.interrupt()
+case e: InterruptedException =>
+case e: Throwable => throw e
+  }
+}
+  }
+})
+
+sys.addShutdownHook( {
+  context.destroy()
+  zmqThread.interrupt()
+  zmqThread.join()
+} )
+
+zmqThread.start()
   }
 }
 
-// scalastyle:off
 /**
- * A sample wordcount with ZeroMQStream stream
- *
- * To work with zeroMQ, some native libraries have to be installed.
- * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
- * (http://www.zeromq.org/intro:get-the-software)
+ * Sample word count with ZeroMQ stream.
  *
- * Usage: ZeroMQWordCount  
- *and  describe where zeroMq publisher is running.
+ * Usage: ZeroMQWordCount  
+ *describes where ZeroMQ publisher is running
+ *defines logical message type
  *
- * To run this example locally, you may run publisher as
+ * To run this example locally, you may start publisher as:
  *`$ bin/run-example \
  *  org.apache.spark.examples.streaming.zeromq.SimpleZeroMQPublisher 
tcp://127.0.0.1:1234 foo`
- * and run the example as
+ * and run the example as:
  *`$ bin/run-example \
  *  org.apache.spark.examples.streaming.zeromq.ZeroMQWordCount 
tcp://127.0.0.1:1234 foo`
  */
-// scalastyle:on
 object ZeroMQWordCount {
   def main(args: Array[String]) {
 if (args.length < 2) {
-  System.err.println("Usage: ZeroMQWordCount  ")
+  // scalastyle:off println
+  System.err.println("Usage: ZeroMQWordCount 

Re: Release 2.3.x

2018-11-29 Thread Jean-Baptiste Onofré
Nothing from my side. +1 for the release.

Regards
JB

On 29/11/2018 10:40, Luciano Resende wrote:
> I am planning to start working on getting the 2.3.x releases soon, any
> jiras or prs that I should wait for ?
> 
> Thanks
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Release 2.3.x

2018-11-29 Thread Luciano Resende
I am planning to start working on getting the 2.3.x releases soon, any
jiras or prs that I should wait for ?

Thanks
-- 
Sent from my Mobile device