Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/11863
  
    @koeninger Okay, I think I understand the code at a high level. And I have 
a number of architecture and API concerns. Let me start with the highest level 
architectural concern to keep the discussion focused.
    
    - This code expects the user to figure out the prefered hosts for each 
Kafka partition, and then just keeps setting that as the preferred location for 
RDD partition. So if the user sets that to the default Map.empty, then what 
happens? Will each RDD partition get randomly assigned to the executors? Then 
every executor will have one cached consumer for each partition. Would that not 
lead to scalability problems for the Kafka cluster? 
    
    - i think the right way here is to automatically record the executors that 
read each topic+partition in the previous batch, and use those executors as the 
preferred ones in the next batch. And ideally, periodically adjust if the 
executor-to-partition assignment is too unbalanced. The last one may be done 
later, but we should at least do the first one. In other words, "consistent" 
scheduling should be out of the box.
    
    Next, let me explain the API concerns. Note that this is the 2.0 release, 
and we are extremely conservative about the new APIs that we are exposing, as 
we will ideally have to support them for the 2.x.y line. So we have to be 
absolutely sure about the public API we expose, and its better not to expose it 
if we are not sure about being able to support it for the next few years (even 
if 3rd party library breaks compatibility and all). Also we have learnt a bunch 
of things from mistakes in the past - mistakes like exposing 3rd-party library 
classes in the Spark API which is forcing us to make kafka-0-8 and kafka-0-10. 
All that said, I find the current DirectKafkaInputStream API very weird. It 
took me and @zsxwing couple of hours to really understand how to use it. Here 
are the concerns. 
    
    - The driverConsumer function is the most confusing. Looking at it, its not 
clear how i create the function such that I want to start from a particular 
offset or the latest offset. Why not reciprocate the 
KafkaUtils.createDirectStream() API which takes fromOffsets? I believe that is 
a much simpler API for the user as the user does not have to learn how to write 
the driverConsumer such that it starts from a particular offset? And that is a 
much narrow API+functionality that can be supported no matter how things change 
in Kafka.
    
    - There does not seem to be real need for the having public vars 
preferBrokers and preferConsistent. All the user should have to say is 
"optionally" here is the preferred locations. If the the preferred location is 
empty, then the system ensures consistent scheduling. That's all.
    
    - We MUST NOT expose any Kafka classes in the API. That includes 
TopicAndPartition. I would create a mirror class for that.
    
    - DirectKafkaInputStream and KafkaRDD should not be public at all. They 
were not public in 0.8, they should not be public in 0.10 either.
    
    
    
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to