[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156559#comment-16156559 ] Saurabh Bidwai commented on SPARK-8337: --- for this i'm getting this error *kstream = KafkaUtils.createDirectStream(ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": ["dn1001:6667","dn2001:6667","dn3001:6667","dn4001:6667"]}) * --- Py4JJavaError Traceback (most recent call last) in () > 1 streamer(sc) in streamer(sc) 5 pwords = load_wordlist("/home/daasuser/twitter/kafkatweets/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/Dataset/positive.txt") 6 nwords = load_wordlist("/home/daasuser/twitter/kafkatweets/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/Dataset/negative.txt") > 7 counts = stream(ssc, pwords, nwords, 600) 8 make_plot(counts) in stream(ssc, pwords, nwords, duration) 1 def stream(ssc, pwords, nwords, duration): > 2 kstream = KafkaUtils.createDirectStream(ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": ["dn1001:6667","dn2001:6667","dn3001:6667","dn4001:6667"]}) 3 tweets = kstream.map(lambda x: x[1].encode("utf-8", "ignore")) 4 5 # Each element of tweets will be the text of a tweet. /usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/streaming/kafka.py in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, valueDecoder, messageHandler) 150 if 'ClassNotFoundException' in str(e.java_exception): 151 KafkaUtils._printErrorMsg(ssc.sparkContext) --> 152 raise e 153 154 stream = DStream(jstream, ssc, ser).map(func) Py4JJavaError: An error occurred while calling o40.loadClass. : java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) > KafkaUtils.createDirectStream for python is lacking API/feature parity with > the Scala/Java version > -- > > Key: SPARK-8337 > URL: https://issues.apache.org/jira/browse/SPARK-8337 > Project: Spark > Issue Type: Bug > Components: DStreams, PySpark >Affects Versions: 1.4.0 >Reporter: Amit Ramesh >Priority: Critical > > See the following thread for context. > http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15570186#comment-15570186 ] Cody Koeninger commented on SPARK-8337: --- Can this be closed, given that the subtasks are resolved and any future discussion of python dstream kafka support seems to be in SPARK-16534 > KafkaUtils.createDirectStream for python is lacking API/feature parity with > the Scala/Java version > -- > > Key: SPARK-8337 > URL: https://issues.apache.org/jira/browse/SPARK-8337 > Project: Spark > Issue Type: Bug > Components: PySpark, Streaming >Affects Versions: 1.4.0 >Reporter: Amit Ramesh >Priority: Critical > > See the following thread for context. > http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627728#comment-14627728 ] Saisai Shao commented on SPARK-8337: OK, thanks TD. KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627725#comment-14627725 ] Tathagata Das commented on SPARK-8337: -- [~jerryshao] Never mind, I made it myself. KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627722#comment-14627722 ] Tathagata Das commented on SPARK-8337: -- [~jerryshao] Could you make a separate sub-task of this JIRA for the message handler API fix (to maintain consistency with other parity related subtask). And update the PR with that JIRA numbers. KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627451#comment-14627451 ] Apache Spark commented on SPARK-8337: - User 'jerryshao' has created a pull request for this issue: https://github.com/apache/spark/pull/7410 KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621253#comment-14621253 ] Tathagata Das commented on SPARK-8337: -- Now that SPARK-8389 has been fixed, I am open to discussion for the messageHandler function. KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14609102#comment-14609102 ] Tathagata Das commented on SPARK-8337: -- I will let you guys figure out the implementation among yourselves. Let me chime in and say that I am more concerned about exposing offset ranges, than supporting access to MessageAndMetadata. So it would be great if you can make a PR with the first, and then we can focus on the latter in a later iteration. In that respect, if you are making a PR, please use the other JIRA - SPARK-8389, which was specifically for the offset ranges. I am reopening that JIRA and marking it as a sub jira of this one. KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605588#comment-14605588 ] Juan Rodríguez Hortalá commented on SPARK-8337: --- Hi [~jerryshao], That is a good idea, I should had paid more attention to the discussion in the duplicated issue. I will try that way, and tell you how it went. Greetings, Juan KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605241#comment-14605241 ] Saisai Shao commented on SPARK-8337: Hi [~juanrh], I think the best choice is to keep the python programming way similar to Scala/Java, here in Java/Scala, we use offsetRange like: {code} directKafkaStream.foreachRDD { rdd = val offsetRanges = rdd.asInstanceOf[HasOffsetRanges] // offsetRanges.length = # of Kafka partitions being consumed ... } {code} It would be better to keep Python the same programming way. Looks like your implementation is a different way. From my understanding, you will return the offsetRange with each record of KafkaRDD, actually offsetRange is only related to RDD, not records of RDD, so maybe a little strange from my point, you have to serialize the offsetRange from driver to each executor. Here is what TD suggested, though still have some details should be figured out. I tried a bit but still have something block on the road. {quote} I think the way it works is that the Java/Python friendly DStream returned by Java APIs of KafkaUtils, is wrapped in Python's DStream class in dstream.py. The foreachRDD of that class uses another Python class TransformFunction to wrap the JavaRDDs into Python's RDD objects and applies the user defined python function on them. To allow the wrapped Python RDDs to have a method called offsetRanges, you have to 1. Create a custom KafkaRDD Python class (extending to Python's RDD class) which can wrap a KafkaRDD class. This may actually require defining a JavaKafkaRDD class 2. Create a custom KafkaTransformFunc Python class (extending Python's TransformFunc class) which wraps JavaRDDs into Python's KafkaRDD classes, and applies user's function on those. 3. Create a custom KafkaDStream Python class (extending Python's DStream class) which overrides transform() and foreachRDD() to use KafkaTransformFunc instead of TransformFunc. From my cursory look, this may work. Think about it. TD {quote} KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14604633#comment-14604633 ] Juan Rodríguez Hortalá commented on SPARK-8337: --- Hi, I have worked a bit on the OffsetRange way, you can access the code at https://github.com/juanrh/spark/commit/56fbd5c38bd30b825a7818f1c56abb1f8b2beaff. I have added the following method to pyspark KafkaUtils @staticmethod def getOffsetRanges(rdd): scalaRdd = rdd._jrdd.rdd() offsetRangesArray = scalaRdd.offsetRanges() return [ OffsetRange(topic = offsetRange.topic(), partition = offsetRange.partition(), fromOffset = offsetRange.fromOffset(), untilOffset = offsetRange.untilOffset()) for offsetRange in offsetRangesArray] This method is used in KafkaUtils.createDirectStreamJB, which is based on the original KafkaUtilsPythonHelper.createDirectStream. The main problem I have is that I don't know where to store the OffsetRange objects. The naive trick of adding them to the __dict__ of each python RDD object doesn't work, the new field is lost in the pyspark wrappers. So the new method createDirectStreamJB takes two additional options, one for performing an action on the OffsetRange list, and another for adding it to each record of the DStream def createDirectStreamJB(ssc, topics, kafkaParams, fromOffsets={}, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder, offsetRangeForeach=None, addOffsetRange=False): FIXME: temporary working placeholder :param offsetRangeForeach: if different to None, this function should be a function from a list of OffsetRange to None, and is applied to the OffsetRange list of each rdd :param addOffsetRange: if False (default) output records are of the shape (kafkaKey, kafkaValue); if True output records are of the shape (offsetRange, (kafkaKey, kafkaValue)) for offsetRange the OffsetRange value for the Spark partition for the record This is an example of using createDirectStreamJB: from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils ssc = StreamingContext(sc, 1) topics = [test] kafkaParams = {metadata.broker.list : localhost:9092} def offsetRangeForeach(offsetRangeList): print print for offsetRange in offsetRangeList: print offsetRange print print kafkaStream = KafkaUtils.createDirectStreamJB(ssc, topics, kafkaParams, offsetRangeForeach=offsetRangeForeach, addOffsetRange=True) # OffsetRange printed as pyspark.streaming.kafka.OffsetRange object at 0x7f2fdc045950, I guess due to some kind of pyspark proxy kafkaStrStream = kafkaStream.map(lambda (offRan, (k, v)) : str(offRan._fromOffset) + + str(offRan._untilOffset) + + str(k) + + str(v)) # kafkaStream.pprint() kafkaStrStream.pprint() ssc.start() ssc.awaitTermination(timeout=5) which gets the following output 15/06/28 12:36:03 INFO InputInfoTracker: remove old batch metadata: 1435487761000 ms OffsetRange(topic=test, partition=0, fromOffset=178, untilOffset=179) 15/06/28 12:36:04 INFO JobScheduler: Added jobs for time 1435487764000 ms ... 15/06/28 12:36:04 INFO DAGScheduler: Job 4 finished: runJob at PythonRDD.scala:366, took 0,075387 s --- Time: 2015-06-28 12:36:04 --- 178 179 None hola () 15/06/28 12:36:04 INFO JobScheduler: Finished job streaming job 1435487764000 ms.0 from job set of time 1435487764000 ms ... 15/06/28 12:36:05 INFO BlockManager: Removing RDD 12 OffsetRange(topic=test, partition=0, fromOffset=179, untilOffset=180) 15/06/28 12:36:06 INFO JobScheduler: Starting job streaming job 1435487766000 ms.0 from job set of time 1435487766000 ms 15/06/28 12:36:06 INFO DAGScheduler: Job 6 finished: start at NativeMethodAccessorImpl.java:-2, took 0,077993 s --- Time: 2015-06-28 12:36:06 --- 179 180 None caracola () 15/06/28 12:36:06 INFO JobScheduler: Finished job streaming job 1435487766000 ms.0 from job set of time 1435487766000 ms Any thoughts on this will be appreciated, in particular about a suitable place to store the list of OffsetRange objects Greetings, Juan KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14600468#comment-14600468 ] Saisai Shao commented on SPARK-8337: OK, well, I'd like to take a crack at it :). KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14600469#comment-14600469 ] Saisai Shao commented on SPARK-8337: OK, well, I'd like to take a crack at it :). KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14600470#comment-14600470 ] Saisai Shao commented on SPARK-8337: OK, well, I'd like to take a crack at it :). KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14600010#comment-14600010 ] Juan Rodríguez Hortalá commented on SPARK-8337: --- Hi, As I said above, I don't know much about the internals of pyspark, and currently the original RDD from Scala is wrapped by several wrappers for the communication with python, and so the RDD implementing HasOffsetRanges is hidden by those layers. However, after its merge with SPARK-8389, it looks like this issue has got the attention of several Spark committers, and I'm sure they will be able to come up with a solution that makes OffsetRanges accessible from pyspark. Greetings, Juan KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14598681#comment-14598681 ] Saisai Shao commented on SPARK-8337: Hi [~juanrh], will you also address {{OffsetRange}} problem described in SPARK-8389. KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark, Streaming Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14592354#comment-14592354 ] Juan Rodríguez Hortalá commented on SPARK-8337: --- Hi, I have made some additional experiments: - I have replaced the dictionary with a named tuple, this is just an aesthetic detail. Regarding your comment Amit, what you propose could also be a good option _MessageAndMetadata = namedtuple(MessageAndMetadata, [key, value, topic, partition, offset]) so we get --- Time: 2015-06-18 20:38:46 --- MessageAndMetadata(key=None, value=u'hola', topic=u'test', partition=0, offset=104L) () ... - Regarding the message handler approach, I don't know much about py4j, but from http://py4j.sourceforge.net/advanced_topics.html#implementing-java-interfaces-from-python-callback I understand that the limited support py4j offers for calling Java interfaces implemented in Python cannot be used in this situation. That would be necessary to wrap a Python lambda into a org.apache.spark.api.java.function.Function with something like this class JFunction(object): def __init__(self, f): self._f = f def call(self, v): return self._f(v) class Java: implements = ['org.apache.spark.api.java.function.Function'] - Another option is returning MessageAndMetadata directly instead of encoding them with tuples and then converting to named tuples. But that leads to Unexpected element type class kafka.message.MessageAndMetadata in PythonRDD 15/06/18 20:45:50 INFO DAGScheduler: Job 9 failed: runJob at PythonRDD.scala:366, took 0,034251 s Traceback (most recent call last): File /home/juanrh/git/spark/python/pyspark/streaming/util.py, line 57, in call r = self.func(t, *rdds) File /home/juanrh/git/spark/python/pyspark/streaming/dstream.py, line 171, in takeAndPrint taken = rdd.take(num + 1) File /home/juanrh/git/spark/python/pyspark/rdd.py, line 1265, in take res = self.context.runJob(self, takeUpToNumLeft, p, True) File /home/juanrh/git/spark/python/pyspark/context.py, line 891, in runJob allowLocal) File /home/juanrh/git/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ self.target_id, self.name) File /home/juanrh/git/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value format(target_id, '.', name), value) Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 9, localhost): org.apache.spark.SparkException: Unexpected element type class kafka.message.MessageAndMetadata at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:422) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:425) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:425) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:425) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:248) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1771) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208) I don't know the details of pyspark, and the reason why it supports so little data types. On the other hand an approach based on hasoffsets is complicated by the wrapper objects introduced when passing from Scala to Python, but maybe it could be possible to add an OffsetRange object to the __dict__ of each RDD. Again, as I don't know about the design of pyspark and its serialization mechanism, I don't know whether that information is erased or not. This is as far as I go with my limited knowledge about pyspark. So maybe, as you suggest Cody, it would be better that another person who knows more about the internals of pyspark takes the baton now. KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context.
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14592736#comment-14592736 ] Amit Ramesh commented on SPARK-8337: [~ap4dl] can you please chime in here? Thanks! KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14588064#comment-14588064 ] Juan Rodríguez Hortalá commented on SPARK-8337: --- Hi, I've made some advances. Due to the limited support for data types in pyspark and org.apache.spark.api.python.PythonRDD, I think adding a function to createDirectStream from MessageAndMetadata to arbitrary values is not such a good idea. In fact currently pyspark communicates with the Scala API by using JavaPairInputDStream[Array[Byte], Array[Byte]] and then decoding those arrays of bytes in python. So what I propose is adding an argument to choose between returning a dstream of (key, value) like it is done so far, and a dstream of dictionaries with entries for the key, the value (the message), and also the topic, partition and offset. An approximation to that is implemented in https://github.com/juanrh/spark/commit/7a824a814f56f839d2f3fbeda7e9f7467e683c6e as a python static method KafkaUtils.createDirectStreamJ, that uses KafkaUtilsPythonHelper.createDirectStreamJ. The following Python code can be used for using it: from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils ssc = StreamingContext(sc, 1) topics = [test] kafkaParams = {metadata.broker.list : localhost:9092} kafkaStream = KafkaUtils.createDirectStreamJ(ssc, topics, kafkaParams) kafkaStream.pprint() ssc.start() ssc.awaitTermination(timeout=5) which gets the following output 15/06/16 15:31:00 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 15/06/16 15:31:00 INFO DAGScheduler: ResultStage 8 (start at NativeMethodAccessorImpl.java:-2) finished 15/06/16 15:31:00 INFO DAGScheduler: Job 8 finished: start at NativeMethodAccessorImpl.java:-2, took 0,0 --- Time: 2015-06-16 15:31:00 --- {'topic': u'test', 'partition': 0, 'value': u'q tal?', 'key': None, 'offset': 87L} () 15/06/16 15:31:00 I have encoded the dictionary with the following Scala type alias, that uses types that PythonRDD can understand /** Using this weird type due to the limited set of types * supported by PythonRDD. This corresponds to * * ((key, message), (topic, (partition, offset))) * * where the key and the message are encoded as Array[Byte], * and topic, partition and offset are encoded as String. * Note we cannot even use triples because only pairs are supported * (we get an exception Unexpected element type class scala.Tuple3) */ type PyKafkaMsgWrapper = ((Array[Byte], Array[Byte]), (String, (String, String))) If this is enough for you I can refactor thing to join KafkaUtils.createDirectStreamJ and KafkaUtils.createDirectStream in a single method, with an additional argument to specify if the meta info is required, with a default value of False so the behaviour is the same as before by default Looking forward to hearing your opinions on this. Greetings, Juan Rodriguez Hortala KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14588517#comment-14588517 ] Cody Koeninger commented on SPARK-8337: --- So one thing to keep in mind is that if the Kafka project ends up adding more fields to MessageAndMetadata, the scala interface is going to continue to give users access to those fields, without changing anything other than the Kafka version. If you go with the approach of building a Python dict, someone's going to have to remember to go manually change the code to give access to the new fields. I don't have enough Python knowledge to comment on whether the approach of passing a messageHandler function is feasible... I can try to get up to speed on it. It may be worth trying to get the attention of Davies Liu after the spark conference hubub has died down. KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version
[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14588439#comment-14588439 ] Amit Ramesh commented on SPARK-8337: [~juanrh] this looks pretty good to me. And from what I can see shouldn't add much overhead compared to the existing logic. It is perfect in terms of what are in need of :). One stylistic suggestion is that you could return (key, value, kafka_offsets) where kafka_offsets is a dict of topic, parition and offset. This would keep things a little more consistent with what is returned when meta info is False. Thanks! Amit KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version -- Key: SPARK-8337 URL: https://issues.apache.org/jira/browse/SPARK-8337 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.4.0 Reporter: Amit Ramesh Priority: Critical See the following thread for context. http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org