Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/4723#discussion_r28301651
--- Diff: python/pyspark/streaming/kafka.py ---
@@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics,
kafkaParams={},
except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
- print """
+ KafkaUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+ return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
+
+ @staticmethod
+ def createDirectStream(ssc, brokerList, topics, kafkaParams={},
--- End diff --
Lets leave that for another PR. That sounds complicated, and needs careful
thought.
On Mon, Apr 13, 2015 at 10:32 PM, Saisai Shao <[email protected]>
wrote:
> In python/pyspark/streaming/kafka.py
> <https://github.com/apache/spark/pull/4723#discussion_r28301541>:
>
> > @@ -70,7 +71,103 @@ def createStream(ssc, zkQuorum, groupId, topics,
kafkaParams={},
> > except Py4JJavaError, e:
> > # TODO: use --jar once it also work on driver
> > if 'ClassNotFoundException' in str(e.java_exception):
> > - print """
> > + KafkaUtils._printErrorMsg(ssc.sparkContext)
> > + raise e
> > + ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
> > + stream = DStream(jstream, ssc, ser)
> > + return stream.map(lambda (k, v): (keyDecoder(k),
valueDecoder(v)))
> > +
> > + @staticmethod
> > + def createDirectStream(ssc, brokerList, topics, kafkaParams={},
>
> Currently I have a solution, serialize the content of MessageAndMetadata
> into byte array in Scala code, and unpack, reconstruct the python object
of
> MessageAndMetadata in python code. So the handler can be supported. What
> do you think of this solution?
>
> â
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/4723/files#r28301541>.
>
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]