Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/11863
  
    *1+2. Architecture*
    I had to double check the code to understand the use of sorted list of 
executors to get consistent topic-to-preferred locations. PLEASE WRITE MORE 
SCALA DOCS on how the logic in the method work and what is returned! Otherwise 
its really tedious to understand all the nuances of the logic (in this case, 
when preferred location). Also, this does not completely eliminate my original 
concern because preferred location DOES NOT guarantee that the task will be 
scheduled on that executor. So tasks could get scheduled on other executors 
with some probability (especially if there is contention in the cluster, etc.) 
and small chance that multiple executors will have a cached consumer for the 
same Kafka partition. But this chance is small, and we can fix it later if this 
indeed becomes a problem.
    
    *3+4. API*
    My concern was with the Consumer class itself, it they change packaging and 
all. However, my colleagues convinced me that since this is a narrowly package 
focused on kafka-0-10 so if it breaks we can always release kafka-x-y. So I 
will concede on that. However, I still think we should provide simpler API for 
common use cases, and similar to the older KafkaUtils so that people trying 
migrate know what to look for. Here is my proposed API for direct stream and we 
can discuss feasibilty/pros/cons on this). This assumes that the common case is 
a fixed set of topics, user wants to start from earliest/latest/specific 
offsets. 
    ```
    object KafkaUtils {
      // Simpler API for common use cases
      def createDirectStream[K: ClassTag, V: ClassTag](
          ssc: StreamingContext,
          topicAndPosition: TopicPartitionOffset,    // defines the topics and 
starting offset, see classes below
          kafkaParams: Map[String, String],
          preferredHosts: Map[KafkaTopicPartition, String] = Map.empty): 
InputDStream[(K, V)] = ???
    
      // Advanced API for everything else
      def createDirectStream[K: ClassTag, V: ClassTag](
          ssc: StreamingContext,
          driverConsumer: () => Consumer[K, V],
          executorKafkaParams: ju.Map[String, Object],
          preferredHosts: ju.Map[TopicPartition, String] // this at last as 
this is least used, will be mostly empty
        ): InputDStream[(K, V)] = ???
    }
    
    /** Helper class to define the topics to subscribe and starting offsets. 
This should be JavaAPI */
    object TopicPartitionOffsets {
        def latestOffsets(topics: Set[String]) = Latest(topics)
        def earliestOffsets(topics: Set[String) = Earliest(topics)
        def customOffsets(startingOffsets: Map[KafkaTopicPartition, Long]) = 
Custom(startingOffsets)
    }
    
    sealed abstract class TopicPartitionOffsets
    
    Private[kafka] case class Latest(topics: Set[String]) extends 
TopicPartitionOffsets
    Private[kafka] case class Earliest(topics: Set[String) extends 
TopicPartitionOffsets
    Private[kafka] case class Custom(startingOffsets: Map[KafkaTopicPartition, 
Long]) 
    extends TopicPartitionOffsets
    ```
    The reason I am proposing TopicPartitionOffsets is to eliminate one case of 
overloading (see the two KafkaUtils.createDirectStream for scala in 0.8) AND 
also eliminate the weirdness of reusing kafka param "auto.offset.reset" for the 
purpose specifying latest/earliest. The API for Java createDirectStream would 
be similar. For RDD, KafkaUtils.createRDD would be similar to how it is 
currently in the KafkaRDD object.
    
    *Summary*
    To summarize in terms of API here are proposed changes
    - Add KafkaUtils with the above proposed API
    - Remove the DirectKafkaInputDStream and KafkaRDD objects. There is no need 
for them after KafkaUtils.
    
    What do you think?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to