kirktrue commented on PR #16241:
URL: https://github.com/apache/kafka/pull/16241#issuecomment-2163677488

   @lianetm—thanks for your suggestion for handling it at the 
`AsyncKafkaConsumer` layer.
   
   > But what about just moving the timeout to the get result?
   > 
   > 1. issue fetch (max_value as timeout) -> will run across poll iterations 
for the same set of partitions
   > 2. get result with poll timeout -> this ensures that we respect the poll 
iteration timeout, while leaving fetch request running to be used on the next 
poll.
   
   Here's the version of `initWithCommittedOffsetsIfNeeded()` in `trunk`:
   
   ```java
   final FetchCommittedOffsetsEvent event =
       new FetchCommittedOffsetsEvent(
           initializingPartitions,
           calculateDeadlineMs(timer));
   wakeupTrigger.setActiveTask(event.future());
   final Map<TopicPartition, OffsetAndMetadata> offsets = 
applicationEventHandler.addAndGet(event);
   refreshCommittedOffsets(offsets, metadata, subscriptions);
   return true;
   ```
   
   Attempt #1: change to use a "timed" version of `Future.get()` with the 
user's `Timer`:
   
   ```java
   final FetchCommittedOffsetsEvent event =
       new FetchCommittedOffsetsEvent(
           initializingPartitions,
           Long.MAX_VALUE);
   wakeupTrigger.setActiveTask(event.future());
   applicationEventHandler.add(event);
   final Map<TopicPartition, OffsetAndMetadata> offsets = 
ConsumerUtils.getResult(event.future(), timer);
   refreshCommittedOffsets(offsets, metadata, subscriptions);
   return true;
   ```
   
   When I run the new integration test, it prints 'couldn't refresh committed 
offsets' hundreds of times and eventually fails.
   
   Attempt #2: change to use a "timed" version of `Future.get()` with a `Timer` 
of 1 millisecond:
   
   ```java
   final FetchCommittedOffsetsEvent event =
       new FetchCommittedOffsetsEvent(
           initializingPartitions,
           Long.MAX_VALUE);
   wakeupTrigger.setActiveTask(event.future());
   applicationEventHandler.add(event);
   final Map<TopicPartition, OffsetAndMetadata> offsets = 
ConsumerUtils.getResult(event.future(), time.timer(1));
   refreshCommittedOffsets(offsets, metadata, subscriptions);
   return true;
   ```
   
   That passes the new integration test 🤷‍♂️
   
   I haven't worked out why the `Timer` of 0 milliseconds fails, but I believe 
it's because the code considers a timeout of 0 to be an automatic hard fail, 
even if we have the result on hand 😢


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to