Github user koeninger commented on the issue: https://github.com/apache/spark/pull/11863 Regarding the possible arguments to preferredHosts, I'm pretty sure you're misunderstanding what happens. There are 3 uses cases here: - I don't care where things run. I pass preferConsistent, and the definition of KafkaRDD.getPreferredLocations does the right thing. - I know my executors are on the same nodes as my brokers, and I want locality. I pass preferBrokers, and getPreferredHosts figures out the brokers for me. - I want to specify the mapping, because of skewed topics or whatever. I pass my own map. If you think this would be less confusing if the static constants didn't exist and the docs said "pass null" instead of "pass preferBrokers" and "pass an empty map" instead of "pass preferConsistent", sure, we can do that. If we want a full fledged interface here, ok, I think it's overkill, but I'll do it. But your description of what the code is currently doing is not accurate, and your recommendation does not meet the use cases. Regarding DirectKafkaInputDStream and KafkaRDD being visible vs being hidden in KafkaUtils, that's fine for consistency sake, no prob. Regarding your other API concerns... this has been discussed pretty heavily in the JIRA for months. Have you read any of that discussion? Bottom line, I guarantee you that I cannot do my job with this connector unless I have unfettered ability to create the Consumer on the driver side. If we want to provide convenience methods to create a function setting up a Consumer given just a list of topics or a list of offsets, great, we can do that. But without being able to arbitrarily configure the consumer, getting things like dynamic topic subscriptions working without auto offset reset is just not going to be possible... to say nothing of allowing people to create their own custom offset generation policies without having to recompile spark. The problem here is not that we made too much stuff public in 0.8. The problem here is that the semantics and abilities of the underlying consumer have changed - things like messageHandler in 0.8 flat out wouldn't work any more. Hiding more of the Kafka classes would not have helped, at all. Keep in mind that we are not requiring a concrete Kafka class, we're requiring an implementation of an interface (Consumer, not KafkaConsumer). If the Kafka project drastically adds to that interface for 0.11, this current code isn't going to have to change much (if at all). If instead, we wrap everything with Spark reimplementations of all the Kafka interfaces, it's going to have to change quite a bit, regardless. On Wed, Jun 22, 2016 at 9:01 PM, Tathagata Das <notificati...@github.com> wrote: > @koeninger <https://github.com/koeninger> Okay, I think I understand the > code at a high level. And I have a number of architecture and API concerns. > Let me start with the highest level architectural concern to keep the > discussion focused. > > - > > This code expects the user to figure out the prefered hosts for each > Kafka partition, and then just keeps setting that as the preferred location > for RDD partition. So if the user sets that to the default Map.empty, then > what happens? Will each RDD partition get randomly assigned to the > executors? Then every executor will have one cached consumer for each > partition. Would that not lead to scalability problems for the Kafka > cluster? > - > > i think the right way here is to automatically record the executors > that read each topic+partition in the previous batch, and use those > executors as the preferred ones in the next batch. And ideally, > periodically adjust if the executor-to-partition assignment is too > unbalanced. The last one may be done later, but we should at least do the > first one. In other words, "consistent" scheduling should be out of the box. > > Next, let me explain the API concerns. Note that this is the 2.0 release, > and we are extremely conservative about the new APIs that we are exposing, > as we will ideally have to support them for the 2.x.y line. So we have to > be absolutely sure about the public API we expose, and its better not to > expose it if we are not sure about being able to support it for the next > few years (even if 3rd party library breaks compatibility and all). Also we > have learnt a bunch of things from mistakes in the past - mistakes like > exposing 3rd-party library classes in the Spark API which is forcing us to > make kafka-0-8 and kafka-0-10. All that said, I find the current > DirectKafkaInputStream API very weird. It took me and @zsxwing > <https://github.com/zsxwing> couple of hours to really understand how to > use it. Here are the concerns. > > - > > The driverConsumer function is the most confusing. Looking at it, its > not clear how i create the function such that I want to start from a > particular offset or the latest offset. Why not reciprocate the > KafkaUtils.createDirectStream() API which takes fromOffsets? I believe that > is a much simpler API for the user as the user does not have to learn how > to write the driverConsumer such that it starts from a particular offset? > And that is a much narrow API+functionality that can be supported no matter > how things change in Kafka. > - > > There does not seem to be real need for the having public vars > preferBrokers and preferConsistent. All the user should have to say is > "optionally" here is the preferred locations. If the the preferred location > is empty, then the system ensures consistent scheduling. That's all. > - > > We MUST NOT expose any Kafka classes in the API. That includes > TopicAndPartition. I would create a mirror class for that. > - > > DirectKafkaInputStream and KafkaRDD should not be public at all. They > were not public in 0.8, they should not be public in 0.10 either. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/11863#issuecomment-227929834>, or mute > the thread > <https://github.com/notifications/unsubscribe/AAGAByW_63A4018pBuCjg49Fq3-ClpGLks5qOekFgaJpZM4H1Pg1> > . >
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org