kirktrue commented on code in PR #21457:
URL: https://github.com/apache/kafka/pull/21457#discussion_r2830691810
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -153,11 +155,17 @@ public void onSuccess(ListOffsetResult value) {
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets,
isolationLevel);
+
+ if (updatePartitionEndOffsetsFlag)
+
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
Review Comment:
There are two steps to clear the flags for all the partitions. In the
`onSuccess()` handler, the original set of partitions is split into subsets in
`fetchedOffsets` and `partitionsToRetry`:
- The successful partitions in `fetchedOffsets` have their flag cleared via
`OffsetFetcherUtils.updateSubscriptionState()`
- The failed partitions in `partitionsToRetry` have their flag cleared via
`OffsetFetcherUtils.clearPartitionEndOffsetRequests()`
So by means of one of the of the By the end of `onSuccess()` all partitions
will have their respective flags cleared.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]