Github user lresende commented on a diff in the pull request:
https://github.com/apache/bahir/pull/71#discussion_r237038699
--- Diff:
streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
---
@@ -17,147 +17,75 @@
package org.apache.spark.streaming.zeromq
+import java.lang.{Iterable => JIterable}
+import java.util.{List => JList}
+
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import akka.actor.{ActorSystem, Props, SupervisorStrategy}
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.api.java.function.{Function => JFunction,
Function0 => JFunction0}
+import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream,
JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
object ZeroMQUtils {
/**
- * Create an input stream that receives messages pushed by a zeromq
publisher.
- * @param ssc StreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames
for each topic
+ * Create an input stream that receives messages pushed by a ZeroMQ
publisher.
+ * @param ssc Streaming context
+ * @param publisherUrl URL of remote ZeroMQ publisher
+ * @param connect When positive, connector will try to establish
connectivity with remote server.
+ * Otherwise, it attempts to create and bind local socket.
+ * @param topics List of topics to subscribe
+ * @param bytesToObjects ZeroMQ stream publishes sequence of frames for
each topic
* and each frame has sequence of byte thus it
needs the converter
--- End diff --
Can this be abstracted from the consumer at least by default, enabling them
to configure a complex function if needed (or maybe defining the char encode to
be used) ? Otherwise, maybe a better name, like the one you used in the
examples (messageConverter)
---