lianetm commented on PR #16241:
URL: https://github.com/apache/kafka/pull/16241#issuecomment-2163819417
Hey @kirktrue, I don't expect the code snippets you shared in attempt1 would
work because in the end it creates a new event every time.
The approach I had in mind is more like what the legacy does by keeping a
`PendingCommittedOffsetRequest`. We should keep a `pendingFetchEvent` that we
would reuse on polls as long as the set of partitions are the same. Then we
simply need to update the `initWithCommittedOffsetsIfNeeded` , instead of :
> final FetchCommittedOffsetsEvent event =
> new FetchCommittedOffsetsEvent(
> initializingPartitions,
> calculateDeadlineMs(timer));
> wakeupTrigger.setActiveTask(event.future());
> final Map<TopicPartition, OffsetAndMetadata> offsets =
applicationEventHandler.addAndGet(event);
We would:
> // TODO: also consider if the set of initializingPartitions are
not the same, would need a new request
> if (pendingFetchEvent == null) {
> pendingFetchEvent = new FetchCommittedOffsetsEvent(
> initializingPartitions,
> Long.MAX_VALUE);
> wakeupTrigger.setActiveTask(pendingFetchEvent.future());
> applicationEventHandler.add(pendingFetchEvent);
> }
>
> // final Map<TopicPartition, OffsetAndMetadata> offsets =
applicationEventHandler.addAndGet(event);
> final Map<TopicPartition, OffsetAndMetadata> offsets =
> ConsumerUtils.getResult(pendingFetchEvent.future(), timer);
> pendingFetchEvent = null;
With that the new integration test passes! This is a rough try I gave at
this, it still needs the TODO, and I guess we should consider how to expire
that FetchEvent with max_value timeout that may stay forever in the background.
With this we would have a really simple approach, very similar to the legacy
one, which feels safe, but I could be missing details of why it wouldn't work?
Just sharing in case it helps but happy to review the current PR approach and
align on it if you think it's best at this point.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]