Github user tdas commented on the issue: https://github.com/apache/spark/pull/11863 *1+2. Architecture* I had to double check the code to understand the use of sorted list of executors to get consistent topic-to-preferred locations. PLEASE WRITE MORE SCALA DOCS on how the logic in the method work and what is returned! Otherwise its really tedious to understand all the nuances of the logic (in this case, when preferred location). Also, this does not completely eliminate my original concern because preferred location DOES NOT guarantee that the task will be scheduled on that executor. So tasks could get scheduled on other executors with some probability (especially if there is contention in the cluster, etc.) and small chance that multiple executors will have a cached consumer for the same Kafka partition. But this chance is small, and we can fix it later if this indeed becomes a problem. *3+4. API* My concern was with the Consumer class itself, it they change packaging and all. However, my colleagues convinced me that since this is a narrowly package focused on kafka-0-10 so if it breaks we can always release kafka-x-y. So I will concede on that. However, I still think we should provide simpler API for common use cases, and similar to the older KafkaUtils so that people trying migrate know what to look for. Here is my proposed API for direct stream and we can discuss feasibilty/pros/cons on this). This assumes that the common case is a fixed set of topics, user wants to start from earliest/latest/specific offsets. ``` object KafkaUtils { // Simpler API for common use cases def createDirectStream[K: ClassTag, V: ClassTag]( ssc: StreamingContext, topicAndPosition: TopicPartitionOffset, // defines the topics and starting offset, see classes below kafkaParams: Map[String, String], preferredHosts: Map[KafkaTopicPartition, String] = Map.empty): InputDStream[(K, V)] = ??? // Advanced API for everything else def createDirectStream[K: ClassTag, V: ClassTag]( ssc: StreamingContext, driverConsumer: () => Consumer[K, V], executorKafkaParams: ju.Map[String, Object], preferredHosts: ju.Map[TopicPartition, String] // this at last as this is least used, will be mostly empty ): InputDStream[(K, V)] = ??? } /** Helper class to define the topics to subscribe and starting offsets. This should be JavaAPI */ object TopicPartitionOffsets { def latestOffsets(topics: Set[String]) = Latest(topics) def earliestOffsets(topics: Set[String) = Earliest(topics) def customOffsets(startingOffsets: Map[KafkaTopicPartition, Long]) = Custom(startingOffsets) } sealed abstract class TopicPartitionOffsets Private[kafka] case class Latest(topics: Set[String]) extends TopicPartitionOffsets Private[kafka] case class Earliest(topics: Set[String) extends TopicPartitionOffsets Private[kafka] case class Custom(startingOffsets: Map[KafkaTopicPartition, Long]) extends TopicPartitionOffsets ``` The reason I am proposing TopicPartitionOffsets is to eliminate one case of overloading (see the two KafkaUtils.createDirectStream for scala in 0.8) AND also eliminate the weirdness of reusing kafka param "auto.offset.reset" for the purpose specifying latest/earliest. The API for Java createDirectStream would be similar. For RDD, KafkaUtils.createRDD would be similar to how it is currently in the KafkaRDD object. *Summary* To summarize in terms of API here are proposed changes - Add KafkaUtils with the above proposed API - Remove the DirectKafkaInputDStream and KafkaRDD objects. There is no need for them after KafkaUtils. What do you think?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org