lianetm commented on code in PR #21457:
URL: https://github.com/apache/kafka/pull/21457#discussion_r2805174124
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -559,7 +565,17 @@ private List<NetworkClientDelegate.UnsentRequest>
buildListOffsetsRequests(
Map<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch,
Optional.of(listOffsetsRequestState));
if (timestampsToSearchByNode.isEmpty()) {
- throw new StaleMetadataException();
+ if (listOffsetsRequestState.shouldRetry) {
+ // This exception is caught in prepareFetchOffsetsRequests()
and signals to retry this set
+ // of partitions. Only do that if the caller wants to retry
LIST_OFFSETS calls, of course.
+ throw new StaleMetadataException();
+ } else {
+ // If the caller doesn't want to retry LIST_OFFSETS calls, go
ahead and clear the
+ // 'end offsets requested' flag since there won't be another
chance. Then return an empty
+ // list to signal that there are no requests to be sent.
+
offsetFetcherUtils.clearPartitionEndOffsetRequests(timestampsToSearch.keySet());
+ return List.of();
+ }
Review Comment:
ok, good point agree on the unbounded retries gap, but then, isn't that a
problem for calls to beginningOffsets/end/offsetsForTimes too? They will
timeout correctly at the API level (even expires), but seems they may leave the
`OffsetsRequestManager` in the background retrying? (or is there a mechanism in
place that I'm missing?)
If so, then agree on addressing the unbounded retries here, but should we
address it consistently with all calls retrying while there is time? (instead
of having the `shouldRetry` that splits the path)
The gap seems to be having this `OffsetsRequestManager.fetchOffsets` call
that has not deadline.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -153,11 +162,14 @@ public void onSuccess(ListOffsetResult value) {
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets,
isolationLevel);
+
offsetFetcherUtils.clearPartitionEndOffsetRequests(remainingToSearch.keySet());
Review Comment:
ok, we both agree we need it for currentLag/timerExpired. But in the way
it's called now it applies to all cases, that's my concern. Isn't this going to
clear the flag also in the case where there is time left to retry, and there is
a partition that didn't have a known leader?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]