Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/4723#discussion_r28302015
--- Diff: python/pyspark/streaming/kafka.py ---
@@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics,
kafkaParams={},
except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
- print """
+ KafkaUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+ return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
+
+ @staticmethod
+ def createDirectStream(ssc, brokerList, topics, kafkaParams={},
--- End diff --
I agree as such it is not a complicated thing, just that this way of doing
things is non-standard practice (unless you show me some examples of this
being done in other python kafka libraries) and therefore should be added
only if there is a good use case / demand for it.
On Mon, Apr 13, 2015 at 10:48 PM, Saisai Shao <[email protected]>
wrote:
> In python/pyspark/streaming/kafka.py
> <https://github.com/apache/spark/pull/4723#discussion_r28301930>:
>
> > @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics,
kafkaParams={},
> > except Py4JJavaError, e:
> > # TODO: use --jar once it also work on driver
> > if 'ClassNotFoundException' in str(e.java_exception):
> > - print """
> > + KafkaUtils._printErrorMsg(ssc.sparkContext)
> > + raise e
> > + ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
> > + stream = DStream(jstream, ssc, ser)
> > + return stream.map(lambda (k, v): (keyDecoder(k),
valueDecoder(v)))
> > +
> > + @staticmethod
> > + def createDirectStream(ssc, brokerList, topics, kafkaParams={},
>
> OK, I will leave this to another PR.
>
> Basically this solution is not so complicated, just write (topic,
> partition, key, message, offset) with a controlled format into a byte
> array. And the python code unpack this byte array with same format into
the
> original data and construct a object used for message handler function.
>
> â
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/4723/files#r28301930>.
>
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]