Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4723#discussion_r25306398
  
    --- Diff: python/pyspark/streaming/kafka.py ---
    @@ -81,3 +79,128 @@ def getClassByName(name):
             ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
             stream = DStream(jstream, ssc, ser)
             return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
    +
    +    @staticmethod
    +    def createDirectStream(ssc, brokerList, topics, kafkaParams={},
    +                           keyDecoder=utf8_decoder, 
valueDecoder=utf8_decoder):
    +        """
    +        ..note:: experimental
    +
    +        Create an input stream that directly pulls messages from a Kafka 
Broker.
    +
    +        This is not a receiver based Kafka input stream, it directly pulls 
the message from Kafka
    +        in each batch duration and processed without storing.
    +
    +        This does not use Zookeeper to store offsets. The consumed offsets 
are tracked
    +        by the stream itself. For interoperability with Kafka monitoring 
tools that depend on
    +        Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
    +        You can access the offsets used in each batch from the generated 
RDDs (see
    +
    +        To recover from driver failures, you have to enable checkpointing 
in the StreamingContext.
    +        The information on consumed offset can be recovered from the 
checkpoint.
    +        See the programming guide for details (constraints, etc.).
    +
    +        :param ssc:  StreamingContext object
    +        :param brokerList: A String representing a list of seed Kafka 
brokers (hostname:port,...)
    +        :param topics:  list of topic_name to consume.
    +        :param kafkaParams: Additional params for Kafka
    +        :param keyDecoder:  A function used to decode key (default is 
utf8_decoder)
    +        :param valueDecoder:  A function used to decode value (default is 
utf8_decoder)
    +        :return: A DStream object
    +        """
    +        java_import(ssc._jvm, 
"org.apache.spark.streaming.kafka.KafkaUtils")
    +
    +        kafkaParams.update({"metadata.broker.list": brokerList})
    +
    +        if not isinstance(topics, list):
    +            raise TypeError("topics should be list")
    +        jtopics = SetConverter().convert(topics, 
ssc.sparkContext._gateway._gateway_client)
    +        jparam = MapConverter().convert(kafkaParams, 
ssc.sparkContext._gateway._gateway_client)
    +
    +        try:
    +            array = KafkaUtils._getClassByName(ssc._jvm, "[B")
    +            decoder = KafkaUtils._getClassByName(ssc._jvm, 
"kafka.serializer.DefaultDecoder")
    +            jstream = ssc._jvm.KafkaUtils.createDirectStream(ssc._jssc, 
array, array, decoder,
    +                                                             decoder, 
jparam, jtopics)
    +        except Py4JError, e:
    +            # TODO: use --jar once it also work on driver
    +            if not e.message or 'call a package' in e.message:
    +                print "No kafka package, please put the assembly jar into 
classpath:"
    +                print " $ bin/spark-submit --driver-class-path 
external/kafka-assembly/target/" + \
    +                      "scala-*/spark-streaming-kafka-assembly-*.jar"
    +            raise e
    +        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
    +        stream = DStream(jstream, ssc, ser)
    +        return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
    +
    +    @staticmethod
    +    def createRDD(sc, brokerList, offsetRanges, kafkaParams={},
    +                  keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
    +        """
    +        ..note:: experimental
    +
    +        Create a RDD from Kafka using offset ranges for each topic and 
partition.
    +        :param sc:  SparkContext object
    +        :param brokerList: A String representing a list of seed Kafka 
brokers (hostname:port,...)
    +        :param offsetRanges:  list of offsetRange to specify 
topic:partition [start,
    +        end) to consume.
    +        :param kafkaParams: Additional params for Kafka
    +        :param keyDecoder:  A function used to decode key (default is 
utf8_decoder)
    +        :param valueDecoder:  A function used to decode value (default is 
utf8_decoder)
    +        :return: A RDD object
    +        """
    +        java_import(sc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")
    +        java_import(sc._jvm, 
"org.apache.spark.streaming.kafka.OffsetRange")
    +
    +        kafkaParams.update({"metadata.broker.list": brokerList})
    +
    +        if not isinstance(offsetRanges, list):
    +            raise TypeError("offsetRanges should be list")
    +        jparam = MapConverter().convert(kafkaParams, 
sc._gateway._gateway_client)
    +
    +        try:
    +            array = KafkaUtils._getClassByName(sc._jvm, "[B")
    +            decoder = KafkaUtils._getClassByName(sc._jvm, 
"kafka.serializer.DefaultDecoder")
    +            joffsetRanges = sc._gateway.new_array(sc._jvm.OffsetRange, 
len(offsetRanges))
    +            for idx, o in enumerate(offsetRanges):
    +                joffsetRanges[idx] = o._joffsetRange(sc)
    +            jrdd = sc._jvm.KafkaUtils.createRDD(sc._jsc, array, array, 
decoder, decoder,
    +                                                jparam, joffsetRanges)
    +        except Py4JError, e:
    +            # TODO: use --jar once it also work on driver
    +            if not e.message or 'call a package' in e.message:
    +                print "No kafka package, please put the assembly jar into 
classpath:"
    +                print " $ bin/spark-submit --driver-class-path 
external/kafka-assembly/target/" + \
    +                      "scala-*/spark-streaming-kafka-assembly-*.jar"
    +            raise e
    +        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
    --- End diff --
    
    There are lots of duplicated code here, could you figure out a way to 
re-use them? (it's OK to go as this, if it's not easy)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to