Github user tdas commented on the issue:
https://github.com/apache/spark/pull/11863
@koeninger Okay, I think I understand the code at a high level. And I have
a number of architecture and API concerns. Let me start with the highest level
architectural concern to keep the discussion focused.
- This code expects the user to figure out the prefered hosts for each
Kafka partition, and then just keeps setting that as the preferred location for
RDD partition. So if the user sets that to the default Map.empty, then what
happens? Will each RDD partition get randomly assigned to the executors? Then
every executor will have one cached consumer for each partition. Would that not
lead to scalability problems for the Kafka cluster?
- i think the right way here is to automatically record the executors that
read each topic+partition in the previous batch, and use those executors as the
preferred ones in the next batch. And ideally, periodically adjust if the
executor-to-partition assignment is too unbalanced. The last one may be done
later, but we should at least do the first one. In other words, "consistent"
scheduling should be out of the box.
Next, let me explain the API concerns. Note that this is the 2.0 release,
and we are extremely conservative about the new APIs that we are exposing, as
we will ideally have to support them for the 2.x.y line. So we have to be
absolutely sure about the public API we expose, and its better not to expose it
if we are not sure about being able to support it for the next few years (even
if 3rd party library breaks compatibility and all). Also we have learnt a bunch
of things from mistakes in the past - mistakes like exposing 3rd-party library
classes in the Spark API which is forcing us to make kafka-0-8 and kafka-0-10.
All that said, I find the current DirectKafkaInputStream API very weird. It
took me and @zsxwing couple of hours to really understand how to use it. Here
are the concerns.
- The driverConsumer function is the most confusing. Looking at it, its not
clear how i create the function such that I want to start from a particular
offset or the latest offset. Why not reciprocate the
KafkaUtils.createDirectStream() API which takes fromOffsets? I believe that is
a much simpler API for the user as the user does not have to learn how to write
the driverConsumer such that it starts from a particular offset? And that is a
much narrow API+functionality that can be supported no matter how things change
in Kafka.
- There does not seem to be real need for the having public vars
preferBrokers and preferConsistent. All the user should have to say is
"optionally" here is the preferred locations. If the the preferred location is
empty, then the system ensures consistent scheduling. That's all.
- We MUST NOT expose any Kafka classes in the API. That includes
TopicAndPartition. I would create a mirror class for that.
- DirectKafkaInputStream and KafkaRDD should not be public at all. They
were not public in 0.8, they should not be public in 0.10 either.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]