Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/3798#discussion_r24019631 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -144,4 +150,174 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. + * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: ClassTag, + T <: Decoder[_]: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange] + ): RDD[(K, V)] with HasOffsetRanges = { + val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) + val kc = new KafkaCluster(kafkaParams) + val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet + val leaders = kc.findLeaders(topics).fold( + errs => throw new SparkException(errs.mkString("\n")), + ok => ok + ) + new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + } + + /** A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param sc SparkContext object + * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> + * configuration parameters</a>. + * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message into the desired type + */ + @Experimental + def createRDD[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: ClassTag, + T <: Decoder[_]: ClassTag, + R: ClassTag] ( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], --- End diff -- That's correct on both counts. If you don't provide a way for clients to supply offset ranges and leaders at the same, you're forcing twice the number of remote calls (because the usual way to get the end of the offset range is to talk to the leader). Yes, there's no way for people to actually use this currently unless they have their own copy of the functionality provided by KafkaCluster. In my case, I'm just going to remove SparkException from KafkaCluster, since it's the only spark dependency, and distribute it as a separate jar under a different namespace.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org