Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/4723#discussion_r25306398
--- Diff: python/pyspark/streaming/kafka.py ---
@@ -81,3 +79,128 @@ def getClassByName(name):
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
+
+ @staticmethod
+ def createDirectStream(ssc, brokerList, topics, kafkaParams={},
+ keyDecoder=utf8_decoder,
valueDecoder=utf8_decoder):
+ """
+ ..note:: experimental
+
+ Create an input stream that directly pulls messages from a Kafka
Broker.
+
+ This is not a receiver based Kafka input stream, it directly pulls
the message from Kafka
+ in each batch duration and processed without storing.
+
+ This does not use Zookeeper to store offsets. The consumed offsets
are tracked
+ by the stream itself. For interoperability with Kafka monitoring
tools that depend on
+ Zookeeper, you have to update Kafka/Zookeeper yourself from the
streaming application.
+ You can access the offsets used in each batch from the generated
RDDs (see
+
+ To recover from driver failures, you have to enable checkpointing
in the StreamingContext.
+ The information on consumed offset can be recovered from the
checkpoint.
+ See the programming guide for details (constraints, etc.).
+
+ :param ssc: StreamingContext object
+ :param brokerList: A String representing a list of seed Kafka
brokers (hostname:port,...)
+ :param topics: list of topic_name to consume.
+ :param kafkaParams: Additional params for Kafka
+ :param keyDecoder: A function used to decode key (default is
utf8_decoder)
+ :param valueDecoder: A function used to decode value (default is
utf8_decoder)
+ :return: A DStream object
+ """
+ java_import(ssc._jvm,
"org.apache.spark.streaming.kafka.KafkaUtils")
+
+ kafkaParams.update({"metadata.broker.list": brokerList})
+
+ if not isinstance(topics, list):
+ raise TypeError("topics should be list")
+ jtopics = SetConverter().convert(topics,
ssc.sparkContext._gateway._gateway_client)
+ jparam = MapConverter().convert(kafkaParams,
ssc.sparkContext._gateway._gateway_client)
+
+ try:
+ array = KafkaUtils._getClassByName(ssc._jvm, "[B")
+ decoder = KafkaUtils._getClassByName(ssc._jvm,
"kafka.serializer.DefaultDecoder")
+ jstream = ssc._jvm.KafkaUtils.createDirectStream(ssc._jssc,
array, array, decoder,
+ decoder,
jparam, jtopics)
+ except Py4JError, e:
+ # TODO: use --jar once it also work on driver
+ if not e.message or 'call a package' in e.message:
+ print "No kafka package, please put the assembly jar into
classpath:"
+ print " $ bin/spark-submit --driver-class-path
external/kafka-assembly/target/" + \
+ "scala-*/spark-streaming-kafka-assembly-*.jar"
+ raise e
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+ return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
+
+ @staticmethod
+ def createRDD(sc, brokerList, offsetRanges, kafkaParams={},
+ keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
+ """
+ ..note:: experimental
+
+ Create a RDD from Kafka using offset ranges for each topic and
partition.
+ :param sc: SparkContext object
+ :param brokerList: A String representing a list of seed Kafka
brokers (hostname:port,...)
+ :param offsetRanges: list of offsetRange to specify
topic:partition [start,
+ end) to consume.
+ :param kafkaParams: Additional params for Kafka
+ :param keyDecoder: A function used to decode key (default is
utf8_decoder)
+ :param valueDecoder: A function used to decode value (default is
utf8_decoder)
+ :return: A RDD object
+ """
+ java_import(sc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")
+ java_import(sc._jvm,
"org.apache.spark.streaming.kafka.OffsetRange")
+
+ kafkaParams.update({"metadata.broker.list": brokerList})
+
+ if not isinstance(offsetRanges, list):
+ raise TypeError("offsetRanges should be list")
+ jparam = MapConverter().convert(kafkaParams,
sc._gateway._gateway_client)
+
+ try:
+ array = KafkaUtils._getClassByName(sc._jvm, "[B")
+ decoder = KafkaUtils._getClassByName(sc._jvm,
"kafka.serializer.DefaultDecoder")
+ joffsetRanges = sc._gateway.new_array(sc._jvm.OffsetRange,
len(offsetRanges))
+ for idx, o in enumerate(offsetRanges):
+ joffsetRanges[idx] = o._joffsetRange(sc)
+ jrdd = sc._jvm.KafkaUtils.createRDD(sc._jsc, array, array,
decoder, decoder,
+ jparam, joffsetRanges)
+ except Py4JError, e:
+ # TODO: use --jar once it also work on driver
+ if not e.message or 'call a package' in e.message:
+ print "No kafka package, please put the assembly jar into
classpath:"
+ print " $ bin/spark-submit --driver-class-path
external/kafka-assembly/target/" + \
+ "scala-*/spark-streaming-kafka-assembly-*.jar"
+ raise e
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
--- End diff --
There are lots of duplicated code here, could you figure out a way to
re-use them? (it's OK to go as this, if it's not easy)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]