rhauch opened a new pull request #9780:
URL: https://github.com/apache/kafka/pull/9780


   The existing `Kafka*BackingStore` classes used by Connect all use 
`KafkaBasedLog`, which needs to frequently get the end offsets for the internal 
topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to 
get the end offsets and to consume the records from the topic.
   
   However, the Connect internal topics are often written very infrequently. 
This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` 
classes is already caught up and its last consumer poll is waiting for new 
records to appear, the call to the consumer to fetch end offsets will block 
until the consumer returns after a new record is written (unlikely) or the 
consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the 
consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` 
may block for some period of time even though it’s already caught up to the end.
   
   Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly 
when the log is already caught up. The best way to do this is to have the 
`KafkaBackingStore` use the admin client (rather than the consumer) to fetch 
end offsets for the internal topic. The consumer and the admin API both use the 
same `ListOffset` broker API, so the functionality is ultimately the same but 
we don't have to block for any ongoing consumer activity.
   
   Each Connect distributed runtime includes three instances of the 
`Kafka*BackingStore` classes, which means we have three instances of 
`KafkaBasedLog`. We don't want three instances of the admin client, and should 
have all three instances of the `KafkaBasedLog` share a single admin client 
instance. In fact, each `Kafka*BackingStore` instance currently creates, uses 
and closes an admin client instance when it checks and initializes that store's 
internal topic. If we change `Kafka*BackingStores` to share one admin client 
instance, we can change that initialization logic to also reuse the supplied 
admin client instance.
   
   The final challenge is that `KafkaBasedLog` has been used by projects 
outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public 
API for Connect, we can make these changes in ways that are backward 
compatible: create new constructors and deprecate the old constructors. Connect 
can be changed to only use the new constructors, and this will give time for 
any downstream users to make changes.
   
   These changes are implemented as follows:
   1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier 
from which it can get an admin instance, and deprecate the old constructor. We 
need a supplier rather than just passing an instance because `KafkaBasedLog` is 
instantiated before Connect starts up, so we need to create the admin instance 
only when needed. At the same time, we'll change the existing init function 
parameter from a no-arg function to accept an admin instance as an argument, 
allowing that init function to reuse the shared admin instance used by the 
`KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated 
constructor that is no longer used in AK), the consumer is still used to get 
latest offsets.
   2. Add to the `Kafka*BackingStore` classes a new constructor with the same 
parameters but with an admin supplier, and deprecate the old constructor. When 
the classes instantiate its `KafkaBasedLog` instance, it would pass the admin 
supplier and pass an init function that takes an admin instance.
   3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and 
underlying Admin client) when required, and closes the admin objects when the 
`SharedTopicAdmin` is closed.
   4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate 
the logic of fetching end offsets using the admin client, simplifying the logic 
in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to 
test that logic.
   5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that 
is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing 
the `SharedTopicAdmin` (which is an admin supplier) to all three 
`Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` 
upon termination. (Shutdown of the worker occurs outside of the 
`ConnectDistributed` code, so modify `DistributedHerder` to take in its 
constructor additional `AutoCloseable` objects that should be closed when the 
herder is closed, and then modify `ConnectDistributed` to pass the 
`SharedTopicAdmin` as one of those `AutoCloseable` instances.)
   6. Change `MirrorMaker` similarly to `ConnectDistributed`.
   7. Change existing unit tests to no longer use deprecated constructors.
   8. Add unit tests for new functionality.
   
   This change should be backported to fix the bug in recent releases.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to