Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/11863
  
    Regarding the possible arguments to preferredHosts, I'm pretty sure you're
    misunderstanding what happens.
    
    There are 3 uses cases here:
    
    - I don't care where things run.  I pass preferConsistent, and the
    definition of KafkaRDD.getPreferredLocations does the right thing.
    - I know my executors are on the same nodes as my brokers, and I want
    locality.  I pass preferBrokers, and getPreferredHosts figures out the
    brokers for me.
    - I want to specify the mapping, because of skewed topics or whatever.  I
    pass my own map.
    
    If you think this would be less confusing if the static constants didn't
    exist and the docs said "pass null" instead of "pass preferBrokers" and
    "pass an empty map" instead of "pass preferConsistent", sure, we can do
    that. If we want a full fledged interface here, ok, I think it's overkill,
    but I'll do it.  But your description of what the code is currently doing
    is not accurate, and your recommendation does not meet the use cases.
    
    Regarding DirectKafkaInputDStream and KafkaRDD being visible vs being
    hidden in KafkaUtils, that's fine for consistency sake, no prob.
    
    Regarding your other API concerns... this has been discussed pretty heavily
    in the JIRA for months.  Have you read any of that discussion?
    
    Bottom line, I guarantee you that I cannot do my job with this connector
    unless I have unfettered ability to create the Consumer on the driver side.
    
    If we want to provide convenience methods to create a function setting up a
    Consumer given just a list of topics or a list of offsets, great, we can do
    that.  But without being able to arbitrarily configure the consumer,
    getting things like dynamic topic subscriptions working without auto offset
    reset is just not going to be possible... to say nothing of allowing people
    to create their own custom offset generation policies without having to
    recompile spark.
    
    The problem here is not that we made too much stuff public in 0.8.  The
    problem here is that the semantics and abilities of the underlying consumer
    have changed - things like messageHandler in 0.8 flat out wouldn't work any
    more.  Hiding more of the Kafka classes would not have helped, at all.
    Keep in mind that we are not requiring a concrete Kafka class, we're
    requiring an implementation of an interface (Consumer, not KafkaConsumer).
    If the Kafka project drastically adds to that interface for 0.11, this
    current code isn't going to have to change much (if at all).
    
    If instead, we wrap everything with Spark reimplementations of all the
    Kafka interfaces, it's going to have to change quite a bit, regardless.
    
    
    
    On Wed, Jun 22, 2016 at 9:01 PM, Tathagata Das <notificati...@github.com>
    wrote:
    
    > @koeninger <https://github.com/koeninger> Okay, I think I understand the
    > code at a high level. And I have a number of architecture and API 
concerns.
    > Let me start with the highest level architectural concern to keep the
    > discussion focused.
    >
    >    -
    >
    >    This code expects the user to figure out the prefered hosts for each
    >    Kafka partition, and then just keeps setting that as the preferred 
location
    >    for RDD partition. So if the user sets that to the default Map.empty, 
then
    >    what happens? Will each RDD partition get randomly assigned to the
    >    executors? Then every executor will have one cached consumer for each
    >    partition. Would that not lead to scalability problems for the Kafka
    >    cluster?
    >    -
    >
    >    i think the right way here is to automatically record the executors
    >    that read each topic+partition in the previous batch, and use those
    >    executors as the preferred ones in the next batch. And ideally,
    >    periodically adjust if the executor-to-partition assignment is too
    >    unbalanced. The last one may be done later, but we should at least do 
the
    >    first one. In other words, "consistent" scheduling should be out of 
the box.
    >
    > Next, let me explain the API concerns. Note that this is the 2.0 release,
    > and we are extremely conservative about the new APIs that we are exposing,
    > as we will ideally have to support them for the 2.x.y line. So we have to
    > be absolutely sure about the public API we expose, and its better not to
    > expose it if we are not sure about being able to support it for the next
    > few years (even if 3rd party library breaks compatibility and all). Also 
we
    > have learnt a bunch of things from mistakes in the past - mistakes like
    > exposing 3rd-party library classes in the Spark API which is forcing us to
    > make kafka-0-8 and kafka-0-10. All that said, I find the current
    > DirectKafkaInputStream API very weird. It took me and @zsxwing
    > <https://github.com/zsxwing> couple of hours to really understand how to
    > use it. Here are the concerns.
    >
    >    -
    >
    >    The driverConsumer function is the most confusing. Looking at it, its
    >    not clear how i create the function such that I want to start from a
    >    particular offset or the latest offset. Why not reciprocate the
    >    KafkaUtils.createDirectStream() API which takes fromOffsets? I believe 
that
    >    is a much simpler API for the user as the user does not have to learn 
how
    >    to write the driverConsumer such that it starts from a particular 
offset?
    >    And that is a much narrow API+functionality that can be supported no 
matter
    >    how things change in Kafka.
    >    -
    >
    >    There does not seem to be real need for the having public vars
    >    preferBrokers and preferConsistent. All the user should have to say is
    >    "optionally" here is the preferred locations. If the the preferred 
location
    >    is empty, then the system ensures consistent scheduling. That's all.
    >    -
    >
    >    We MUST NOT expose any Kafka classes in the API. That includes
    >    TopicAndPartition. I would create a mirror class for that.
    >    -
    >
    >    DirectKafkaInputStream and KafkaRDD should not be public at all. They
    >    were not public in 0.8, they should not be public in 0.10 either.
    >
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/spark/pull/11863#issuecomment-227929834>, or 
mute
    > the thread
    > 
<https://github.com/notifications/unsubscribe/AAGAByW_63A4018pBuCjg49Fq3-ClpGLks5qOekFgaJpZM4H1Pg1>
    > .
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to