[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2017-09-07 Thread Saurabh Bidwai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156559#comment-16156559
 ] 

Saurabh Bidwai commented on SPARK-8337:
---

for this i'm getting this error

*kstream = KafkaUtils.createDirectStream(ssc, topics = ['twitterstream'], 
kafkaParams = {"metadata.broker.list": 
["dn1001:6667","dn2001:6667","dn3001:6667","dn4001:6667"]}) 
*

---
Py4JJavaError Traceback (most recent call last)
 in ()
> 1 streamer(sc)

 in streamer(sc)
  5 pwords = 
load_wordlist("/home/daasuser/twitter/kafkatweets/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/Dataset/positive.txt")
  6 nwords = 
load_wordlist("/home/daasuser/twitter/kafkatweets/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/Dataset/negative.txt")
> 7 counts = stream(ssc, pwords, nwords, 600)
  8 make_plot(counts)

 in stream(ssc, pwords, nwords, duration)
  1 def stream(ssc, pwords, nwords, duration):
> 2 kstream = KafkaUtils.createDirectStream(ssc, topics = 
['twitterstream'], kafkaParams = {"metadata.broker.list": 
["dn1001:6667","dn2001:6667","dn3001:6667","dn4001:6667"]})
  3 tweets = kstream.map(lambda x: x[1].encode("utf-8", "ignore"))
  4 
  5 # Each element of tweets will be the text of a tweet.

/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/streaming/kafka.py 
in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, 
valueDecoder, messageHandler)
150 if 'ClassNotFoundException' in str(e.java_exception):
151 KafkaUtils._printErrorMsg(ssc.sparkContext)
--> 152 raise e
153 
154 stream = DStream(jstream, ssc, ser).map(func)

Py4JJavaError: An error occurred while calling o40.loadClass.
: java.lang.ClassNotFoundException: 
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)

> KafkaUtils.createDirectStream for python is lacking API/feature parity with 
> the Scala/Java version
> --
>
> Key: SPARK-8337
> URL: https://issues.apache.org/jira/browse/SPARK-8337
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, PySpark
>Affects Versions: 1.4.0
>Reporter: Amit Ramesh
>Priority: Critical
>
> See the following thread for context.
> http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2016-10-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15570186#comment-15570186
 ] 

Cody Koeninger commented on SPARK-8337:
---

Can this be closed, given that the subtasks are resolved and any future 
discussion of python dstream kafka support seems to be in SPARK-16534

> KafkaUtils.createDirectStream for python is lacking API/feature parity with 
> the Scala/Java version
> --
>
> Key: SPARK-8337
> URL: https://issues.apache.org/jira/browse/SPARK-8337
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Streaming
>Affects Versions: 1.4.0
>Reporter: Amit Ramesh
>Priority: Critical
>
> See the following thread for context.
> http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-07-15 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627728#comment-14627728
 ] 

Saisai Shao commented on SPARK-8337:


OK, thanks TD.

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-07-15 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627725#comment-14627725
 ] 

Tathagata Das commented on SPARK-8337:
--

[~jerryshao] Never mind, I made it myself.

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-07-15 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627722#comment-14627722
 ] 

Tathagata Das commented on SPARK-8337:
--

[~jerryshao] Could you make a separate sub-task of this JIRA for the message 
handler API fix (to maintain consistency with other parity related subtask). 
And update the PR with that JIRA numbers. 

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-07-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627451#comment-14627451
 ] 

Apache Spark commented on SPARK-8337:
-

User 'jerryshao' has created a pull request for this issue:
https://github.com/apache/spark/pull/7410

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-07-09 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621253#comment-14621253
 ] 

Tathagata Das commented on SPARK-8337:
--

Now that SPARK-8389 has been fixed, I am open to discussion for the 
messageHandler function. 

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-30 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14609102#comment-14609102
 ] 

Tathagata Das commented on SPARK-8337:
--

I will let you guys figure out the implementation among yourselves. Let me 
chime in and say that I am more concerned about exposing offset ranges, than 
supporting access to MessageAndMetadata. So it would be great if you can make a 
PR with the first, and then we can focus on the latter in a later iteration.

In that respect, if you are making a PR, please use the other JIRA - 
SPARK-8389, which was specifically for the offset ranges. I am reopening that 
JIRA and marking it as a sub jira of this one.

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-29 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605588#comment-14605588
 ] 

Juan Rodríguez Hortalá commented on SPARK-8337:
---

Hi [~jerryshao], 

That is a good idea, I should had paid more attention to the discussion in the 
duplicated issue. I will try that way, and tell you how it went. 

Greetings, 

Juan

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-29 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605241#comment-14605241
 ] 

Saisai Shao commented on SPARK-8337:


Hi [~juanrh], I think the best choice is to keep the python programming way 
similar to Scala/Java, here in Java/Scala, we use offsetRange like:

{code}
directKafkaStream.foreachRDD { rdd = 
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
 // offsetRanges.length = # of Kafka partitions being consumed
 ...
 }
{code}

It would be better to keep Python the same programming way. Looks like your 
implementation is a different way. From my understanding, you will return the 
offsetRange with each record of KafkaRDD, actually offsetRange is only related 
to RDD, not records of RDD, so maybe a little strange from my point, you have 
to serialize the offsetRange from driver to each executor.

Here is what TD suggested, though still have some details should be figured 
out. I tried a bit but still have something block on the road.

{quote}
I think the way it works is that the Java/Python friendly DStream returned
by Java APIs of KafkaUtils, is wrapped in Python's DStream class in
dstream.py. The foreachRDD of that class uses another Python class
TransformFunction to wrap the JavaRDDs into Python's RDD objects and
applies the user defined python function on them. To allow the wrapped
Python RDDs to have a method called offsetRanges, you have to
1. Create a custom KafkaRDD Python class (extending to Python's RDD class)
which can wrap a KafkaRDD class. This may actually require defining a
JavaKafkaRDD class
2. Create a custom KafkaTransformFunc Python class (extending Python's
TransformFunc class) which wraps JavaRDDs into Python's KafkaRDD classes,
and applies user's function on those.
3. Create a custom KafkaDStream Python class (extending Python's DStream
class) which overrides transform() and foreachRDD() to use
KafkaTransformFunc instead of TransformFunc.
From my cursory look, this may work. Think about it.
TD
{quote}


 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14604633#comment-14604633
 ] 

Juan Rodríguez Hortalá commented on SPARK-8337:
---

Hi, 

I have worked a bit on the OffsetRange way, you can access the code at 
https://github.com/juanrh/spark/commit/56fbd5c38bd30b825a7818f1c56abb1f8b2beaff.
 I have added the following method to pyspark KafkaUtils

@staticmethod
def getOffsetRanges(rdd):
scalaRdd = rdd._jrdd.rdd()
offsetRangesArray = scalaRdd.offsetRanges()
return [ OffsetRange(topic = offsetRange.topic(),
 partition = offsetRange.partition(), 
 fromOffset = offsetRange.fromOffset(), 
 untilOffset = offsetRange.untilOffset())
for offsetRange in offsetRangesArray]


This method is used in KafkaUtils.createDirectStreamJB, which is based on the 
original KafkaUtilsPythonHelper.createDirectStream. The main problem I have is 
that I don't  know where to store the OffsetRange objects. The naive trick of 
adding them to the __dict__ of each python RDD object doesn't work, the new 
field is lost in the pyspark wrappers. So the new method createDirectStreamJB 
takes two additional options, one for performing an action on the OffsetRange 
list, and another for adding it to each record of the DStream

def createDirectStreamJB(ssc, topics, kafkaParams, fromOffsets={},
   keyDecoder=utf8_decoder, valueDecoder=utf8_decoder, 
offsetRangeForeach=None, addOffsetRange=False):

FIXME: temporary working placeholder
:param offsetRangeForeach: if different to None, this function should 
be a function from a list of OffsetRange to None, and is applied to the 
OffsetRange
list of each rdd
:param addOffsetRange: if False (default) output records are of the 
shape (kafkaKey, kafkaValue); if True output records are of the shape 
(offsetRange, (kafkaKey, kafkaValue)) for offsetRange the OffsetRange value for 
the Spark partition for the record


This is an example of using createDirectStreamJB:

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 1)
topics = [test]
kafkaParams = {metadata.broker.list : localhost:9092}
def offsetRangeForeach(offsetRangeList):
print 
print 
for offsetRange in offsetRangeList:
print offsetRange
print 
print 

kafkaStream = KafkaUtils.createDirectStreamJB(ssc, topics, kafkaParams, 
offsetRangeForeach=offsetRangeForeach, addOffsetRange=True)

# OffsetRange printed as pyspark.streaming.kafka.OffsetRange object at 
0x7f2fdc045950, I guess due to some kind of pyspark proxy 
kafkaStrStream = kafkaStream.map(lambda (offRan, (k, v)) :  
str(offRan._fromOffset) +   + str(offRan._untilOffset) +   + str(k) +   + 
str(v))
# kafkaStream.pprint()
kafkaStrStream.pprint()
ssc.start()
ssc.awaitTermination(timeout=5)

which gets the following output

15/06/28 12:36:03 INFO InputInfoTracker: remove old batch metadata: 
1435487761000 ms


OffsetRange(topic=test, partition=0, fromOffset=178, untilOffset=179)


15/06/28 12:36:04 INFO JobScheduler: Added jobs for time 1435487764000 ms
...
15/06/28 12:36:04 INFO DAGScheduler: Job 4 finished: runJob at 
PythonRDD.scala:366, took 0,075387 s
---
Time: 2015-06-28 12:36:04
---
178 179 None hola
()
15/06/28 12:36:04 INFO JobScheduler: Finished job streaming job 1435487764000 
ms.0 from job set of time 1435487764000 ms
...
15/06/28 12:36:05 INFO BlockManager: Removing RDD 12


OffsetRange(topic=test, partition=0, fromOffset=179, untilOffset=180)


15/06/28 12:36:06 INFO JobScheduler: Starting job streaming job 1435487766000 
ms.0 from job set of time 1435487766000 ms

15/06/28 12:36:06 INFO DAGScheduler: Job 6 finished: start at 
NativeMethodAccessorImpl.java:-2, took 0,077993 s
---
Time: 2015-06-28 12:36:06
---
179 180 None caracola
()
15/06/28 12:36:06 INFO JobScheduler: Finished job streaming job 1435487766000 
ms.0 from job set of time 1435487766000 ms

Any thoughts on this will be appreciated, in particular about a suitable place 
to store the list of OffsetRange objects

Greetings, 

Juan



 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the 

[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-24 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14600468#comment-14600468
 ] 

Saisai Shao commented on SPARK-8337:


OK, well, I'd like to take a crack at it :).

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-24 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14600469#comment-14600469
 ] 

Saisai Shao commented on SPARK-8337:


OK, well, I'd like to take a crack at it :).

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-24 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14600470#comment-14600470
 ] 

Saisai Shao commented on SPARK-8337:


OK, well, I'd like to take a crack at it :).

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-24 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14600010#comment-14600010
 ] 

Juan Rodríguez Hortalá commented on SPARK-8337:
---

Hi, 

As I said above, I don't know much about the internals of pyspark, and 
currently the original RDD from Scala is wrapped by several wrappers for the 
communication with python, and so the RDD implementing HasOffsetRanges is 
hidden by those layers. However, after its merge with SPARK-8389, it looks like 
this issue has got the attention of several Spark committers, and I'm sure they 
will be able to come up with a solution that makes OffsetRanges accessible from 
pyspark.

Greetings, 

Juan

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-23 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14598681#comment-14598681
 ] 

Saisai Shao commented on SPARK-8337:


Hi [~juanrh], will you also address {{OffsetRange}} problem described in 
SPARK-8389.

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Streaming
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-18 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14592354#comment-14592354
 ] 

Juan Rodríguez Hortalá commented on SPARK-8337:
---

Hi, 

I have made some additional experiments:

- I have replaced the dictionary with a named tuple, this is just an aesthetic 
detail. Regarding your comment Amit, what you propose could also be a good 
option

_MessageAndMetadata = namedtuple(MessageAndMetadata, [key, value, 
topic, partition, offset])

so we get

---
Time: 2015-06-18 20:38:46
---
MessageAndMetadata(key=None, value=u'hola', topic=u'test', partition=0, 
offset=104L)
()
...

- Regarding the message handler approach, I don't know much about py4j, but 
from 
http://py4j.sourceforge.net/advanced_topics.html#implementing-java-interfaces-from-python-callback
 I understand that the limited support py4j offers for calling Java interfaces 
implemented in Python cannot be used in this situation. That would be necessary 
to wrap a Python lambda into a org.apache.spark.api.java.function.Function with 
something like this

class JFunction(object):
def __init__(self, f):
self._f = f

def call(self, v):
return self._f(v)

class Java:
implements = ['org.apache.spark.api.java.function.Function']


- Another option is returning MessageAndMetadata directly instead of encoding 
them with tuples and then converting to named tuples. But that leads to  
Unexpected element type class kafka.message.MessageAndMetadata in PythonRDD


15/06/18 20:45:50 INFO DAGScheduler: Job 9 failed: runJob at 
PythonRDD.scala:366, took 0,034251 s
Traceback (most recent call last):
  File /home/juanrh/git/spark/python/pyspark/streaming/util.py, line 57, in 
call
r = self.func(t, *rdds)
  File /home/juanrh/git/spark/python/pyspark/streaming/dstream.py, line 171, 
in takeAndPrint
taken = rdd.take(num + 1)
  File /home/juanrh/git/spark/python/pyspark/rdd.py, line 1265, in take
res = self.context.runJob(self, takeUpToNumLeft, p, True)
  File /home/juanrh/git/spark/python/pyspark/context.py, line 891, in runJob
allowLocal)
  File 
/home/juanrh/git/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, 
line 538, in __call__
self.target_id, self.name)
  File 
/home/juanrh/git/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 
300, in get_return_value
format(target_id, '.', name), value)
Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 
9, localhost): org.apache.spark.SparkException: Unexpected element type class 
kafka.message.MessageAndMetadata
at 
org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:422)
at 
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:425)
at 
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:425)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:425)
at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:248)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1771)
at 
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

I don't know the details of pyspark, and the reason why it supports so little 
data types. On the other hand an approach based on hasoffsets is complicated by 
the wrapper objects introduced when passing from Scala to Python, but maybe it 
could be possible to add an OffsetRange object to the __dict__ of each RDD. 
Again, as I don't know about the design of pyspark and its serialization 
mechanism, I don't know whether that information is erased or not. 

This is as far as I go with my limited knowledge about pyspark. So maybe, as 
you suggest Cody, it would be better that another person who knows more about 
the internals of pyspark takes the baton now. 


 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 

[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-18 Thread Amit Ramesh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14592736#comment-14592736
 ] 

Amit Ramesh commented on SPARK-8337:


[~ap4dl] can you please chime in here? Thanks!

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-16 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14588064#comment-14588064
 ] 

Juan Rodríguez Hortalá commented on SPARK-8337:
---

Hi, 

I've made some advances. Due to the limited support for data types in pyspark 
and org.apache.spark.api.python.PythonRDD, I think adding a function to 
createDirectStream from MessageAndMetadata to arbitrary values is not such a 
good idea. In fact currently pyspark communicates with the Scala API by using 
JavaPairInputDStream[Array[Byte], Array[Byte]] and then decoding those arrays 
of bytes in python. So what I propose is adding an argument to choose between 
returning a dstream of (key, value) like it is done so far, and a dstream of 
dictionaries with entries for the key, the value (the message), and also the 
topic, partition and offset. An approximation to that is implemented in 
https://github.com/juanrh/spark/commit/7a824a814f56f839d2f3fbeda7e9f7467e683c6e 
as a python static method  KafkaUtils.createDirectStreamJ, that uses 
KafkaUtilsPythonHelper.createDirectStreamJ. The following Python code can be 
used for using it:

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 1)
topics = [test]
kafkaParams = {metadata.broker.list : localhost:9092}
kafkaStream = KafkaUtils.createDirectStreamJ(ssc, topics, kafkaParams)
kafkaStream.pprint()
ssc.start()
ssc.awaitTermination(timeout=5)

which gets the following output

15/06/16 15:31:00 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have 
all completed, from pool
15/06/16 15:31:00 INFO DAGScheduler: ResultStage 8 (start at 
NativeMethodAccessorImpl.java:-2) finished
15/06/16 15:31:00 INFO DAGScheduler: Job 8 finished: start at 
NativeMethodAccessorImpl.java:-2, took 0,0
---
Time: 2015-06-16 15:31:00
---
{'topic': u'test', 'partition': 0, 'value': u'q tal?', 'key': None, 'offset': 
87L}
()
15/06/16 15:31:00 

I have encoded the dictionary with the following Scala type alias, that uses 
types that PythonRDD can understand

/** Using this weird type due to the limited set of types
  * supported by PythonRDD. This corresponds to 
  *
  * ((key, message), (topic, (partition, offset)))
  *
  * where the key and the message are encoded as Array[Byte], 
  * and topic, partition and offset are encoded as String.
  * Note we cannot even use triples because only pairs are supported
  * (we get an exception Unexpected element type class scala.Tuple3)
  */
  type PyKafkaMsgWrapper = ((Array[Byte], Array[Byte]), (String, (String, 
String)))

If this is enough for you I can refactor thing to join  
KafkaUtils.createDirectStreamJ and  KafkaUtils.createDirectStream in a single 
method, with an additional argument to specify if the meta info is required, 
with a default value of False so the behaviour is the same as before by default

Looking forward to hearing your opinions on this.

Greetings, 

Juan Rodriguez Hortala


 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-16 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14588517#comment-14588517
 ] 

Cody Koeninger commented on SPARK-8337:
---

So one thing to keep in mind is that if the Kafka project ends up adding more 
fields to MessageAndMetadata, the scala interface is going to continue to give 
users access to those fields, without changing anything other than the Kafka 
version.

If you go with the approach of building a Python dict, someone's going to have 
to remember to go manually change the code to give access to the new fields.

I don't have enough Python knowledge to comment on whether the approach of 
passing a messageHandler function is feasible... I can try to get up to speed 
on it.  It may be worth trying to get the attention of Davies Liu after the 
spark conference hubub has died down.

 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8337) KafkaUtils.createDirectStream for python is lacking API/feature parity with the Scala/Java version

2015-06-16 Thread Amit Ramesh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14588439#comment-14588439
 ] 

Amit Ramesh commented on SPARK-8337:


[~juanrh] this looks pretty good to me. And from what I can see shouldn't add 
much overhead compared to the existing logic. It is perfect in terms of what 
are in need of :). One stylistic suggestion is that you could return (key, 
value, kafka_offsets) where kafka_offsets is a dict of topic, parition and 
offset. This would keep things a little more consistent with what is returned 
when meta info is False.

Thanks!
Amit


 KafkaUtils.createDirectStream for python is lacking API/feature parity with 
 the Scala/Java version
 --

 Key: SPARK-8337
 URL: https://issues.apache.org/jira/browse/SPARK-8337
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.4.0
Reporter: Amit Ramesh
Priority: Critical

 See the following thread for context.
 http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org