Github user tdas commented on the issue:
https://github.com/apache/spark/pull/11863
*1+2. Architecture*
I had to double check the code to understand the use of sorted list of
executors to get consistent topic-to-preferred locations. PLEASE WRITE MORE
SCALA DOCS on how the logic in the method work and what is returned! Otherwise
its really tedious to understand all the nuances of the logic (in this case,
when preferred location). Also, this does not completely eliminate my original
concern because preferred location DOES NOT guarantee that the task will be
scheduled on that executor. So tasks could get scheduled on other executors
with some probability (especially if there is contention in the cluster, etc.)
and small chance that multiple executors will have a cached consumer for the
same Kafka partition. But this chance is small, and we can fix it later if this
indeed becomes a problem.
*3+4. API*
My concern was with the Consumer class itself, it they change packaging and
all. However, my colleagues convinced me that since this is a narrowly package
focused on kafka-0-10 so if it breaks we can always release kafka-x-y. So I
will concede on that. However, I still think we should provide simpler API for
common use cases, and similar to the older KafkaUtils so that people trying
migrate know what to look for. Here is my proposed API for direct stream and we
can discuss feasibilty/pros/cons on this). This assumes that the common case is
a fixed set of topics, user wants to start from earliest/latest/specific
offsets.
```
object KafkaUtils {
// Simpler API for common use cases
def createDirectStream[K: ClassTag, V: ClassTag](
ssc: StreamingContext,
topicAndPosition: TopicPartitionOffset, // defines the topics and
starting offset, see classes below
kafkaParams: Map[String, String],
preferredHosts: Map[KafkaTopicPartition, String] = Map.empty):
InputDStream[(K, V)] = ???
// Advanced API for everything else
def createDirectStream[K: ClassTag, V: ClassTag](
ssc: StreamingContext,
driverConsumer: () => Consumer[K, V],
executorKafkaParams: ju.Map[String, Object],
preferredHosts: ju.Map[TopicPartition, String] // this at last as
this is least used, will be mostly empty
): InputDStream[(K, V)] = ???
}
/** Helper class to define the topics to subscribe and starting offsets.
This should be JavaAPI */
object TopicPartitionOffsets {
def latestOffsets(topics: Set[String]) = Latest(topics)
def earliestOffsets(topics: Set[String) = Earliest(topics)
def customOffsets(startingOffsets: Map[KafkaTopicPartition, Long]) =
Custom(startingOffsets)
}
sealed abstract class TopicPartitionOffsets
Private[kafka] case class Latest(topics: Set[String]) extends
TopicPartitionOffsets
Private[kafka] case class Earliest(topics: Set[String) extends
TopicPartitionOffsets
Private[kafka] case class Custom(startingOffsets: Map[KafkaTopicPartition,
Long])
extends TopicPartitionOffsets
```
The reason I am proposing TopicPartitionOffsets is to eliminate one case of
overloading (see the two KafkaUtils.createDirectStream for scala in 0.8) AND
also eliminate the weirdness of reusing kafka param "auto.offset.reset" for the
purpose specifying latest/earliest. The API for Java createDirectStream would
be similar. For RDD, KafkaUtils.createRDD would be similar to how it is
currently in the KafkaRDD object.
*Summary*
To summarize in terms of API here are proposed changes
- Add KafkaUtils with the above proposed API
- Remove the DirectKafkaInputDStream and KafkaRDD objects. There is no need
for them after KafkaUtils.
What do you think?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]