Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/4723#discussion_r29031149
--- Diff: python/pyspark/streaming/kafka.py ---
@@ -70,7 +71,195 @@ def createStream(ssc, zkQuorum, groupId, topics,
kafkaParams={},
except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
- print """
+ KafkaUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+ return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
+
+ @staticmethod
+ def createDirectStream(ssc, topics, kafkaParams,
+ keyDecoder=utf8_decoder,
valueDecoder=utf8_decoder):
+ """
+ .. note:: Experimental
+
+ Create an input stream that directly pulls messages from a Kafka
Broker.
+
+ This is not a receiver based Kafka input stream, it directly pulls
the message from Kafka
+ in each batch duration and processed without storing.
+
+ This does not use Zookeeper to store offsets. The consumed offsets
are tracked
+ by the stream itself. For interoperability with Kafka monitoring
tools that depend on
+ Zookeeper, you have to update Kafka/Zookeeper yourself from the
streaming application.
+ You can access the offsets used in each batch from the generated
RDDs (see
+
+ To recover from driver failures, you have to enable checkpointing
in the StreamingContext.
+ The information on consumed offset can be recovered from the
checkpoint.
+ See the programming guide for details (constraints, etc.).
+
+ :param ssc: StreamingContext object
+ :param topics: list of topic_name to consume.
+ :param kafkaParams: Additional params for Kafka
+ :param keyDecoder: A function used to decode key (default is
utf8_decoder)
+ :param valueDecoder: A function used to decode value (default is
utf8_decoder)
+ :return: A DStream object
+ """
+ if not isinstance(topics, list):
+ raise TypeError("topics should be list")
+ if not isinstance(kafkaParams, dict):
+ raise TypeError("kafkaParams should be dict")
+
+ jtopics = SetConverter().convert(topics,
ssc.sparkContext._gateway._gateway_client)
+ jparam = MapConverter().convert(kafkaParams,
ssc.sparkContext._gateway._gateway_client)
+
+ try:
+ helperClass =
ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createDirectStream(ssc._jssc, jparam, jtopics)
+ except Py4JJavaError, e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ KafkaUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+ return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
+
+ @staticmethod
+ def createDirectStreamFromOffset(ssc, kafkaParams, fromOffsets,
+ keyDecoder=utf8_decoder,
valueDecoder=utf8_decoder):
+ """
+ .. note:: Experimental
+
+ Create an input stream that directly pulls messages from a Kafka
Broker and specific offset.
+
+ This is not a receiver based Kafka input stream, it directly pulls
the message from Kafka
+ in each batch duration and processed without storing.
+
+ This does not use Zookeeper to store offsets. The consumed offsets
are tracked
+ by the stream itself. For interoperability with Kafka monitoring
tools that depend on
+ Zookeeper, you have to update Kafka/Zookeeper yourself from the
streaming application.
+ You can access the offsets used in each batch from the generated
RDDs (see
+
+ To recover from driver failures, you have to enable checkpointing
in the StreamingContext.
+ The information on consumed offset can be recovered from the
checkpoint.
+ See the programming guide for details (constraints, etc.).
+
+ :param ssc: StreamingContext object.
+ :param kafkaParams: Additional params for Kafka.
+ :param fromOffsets: Per-topic/partition Kafka offsets defining the
(inclusive) starting
+ point of the stream.
+ :param keyDecoder: A function used to decode key (default is
utf8_decoder).
+ :param valueDecoder: A function used to decode value (default is
utf8_decoder).
+ :return: A DStream object
+ """
+ if not isinstance(kafkaParams, dict):
+ raise TypeError("kafkaParams should be dict")
+ if not isinstance(fromOffsets, dict):
+ raise TypeError("fromOffsets should be dict")
+
+ jparam = MapConverter().convert(kafkaParams,
ssc.sparkContext._gateway._gateway_client)
+ try:
+ helperClass =
ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jfromOffsets = MapConverter().convert(
+ dict([(k._jTopicAndPartition(helper), v) for (k, v) in
fromOffsets.items()]),
+ ssc.sparkContext._gateway._gateway_client)
+ jstream = helper.createDirectStream(ssc._jssc, jparam,
jfromOffsets)
+ except Py4JJavaError, e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ KafkaUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+ return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
+
+ @staticmethod
+ def createRDD(sc, kafkaParams, offsetRanges,
+ keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
+ """
+ .. note:: Experimental
+
+ Create a RDD from Kafka using offset ranges for each topic and
partition.
+ :param sc: SparkContext object
+ :param kafkaParams: Additional params for Kafka
+ :param offsetRanges: list of offsetRange to specify
topic:partition:[start, end) to consume
+ :param keyDecoder: A function used to decode key (default is
utf8_decoder)
+ :param valueDecoder: A function used to decode value (default is
utf8_decoder)
+ :return: A RDD object
+ """
+ if not isinstance(kafkaParams, dict):
+ raise TypeError("kafkaParams should be a dict")
+ if not isinstance(offsetRanges, list):
+ raise TypeError("offsetRanges should be list")
+
+ jparam = MapConverter().convert(kafkaParams,
sc._gateway._gateway_client)
+ try:
+ helperClass =
sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ joffsetRanges =
ListConverter().convert([o._jOffsetRange(helper) for o in offsetRanges],
+
sc._gateway._gateway_client)
+ jrdd = helper.createRDD(sc._jsc, jparam, joffsetRanges)
+ except Py4JJavaError, e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ KafkaUtils._printErrorMsg(sc)
+ raise e
+
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ rdd = RDD(jrdd, sc, ser)
+ return rdd.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
+
+ @staticmethod
+ def createRDDWithLeaders(sc, kafkaParams, offsetRanges, leaders,
--- End diff --
Similar comment to createDirectStream, lets just have only one `createRDD`
with `leader` as optional parameters
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]