kirktrue commented on code in PR #21457:
URL: https://github.com/apache/kafka/pull/21457#discussion_r2806625149
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -559,7 +565,17 @@ private List<NetworkClientDelegate.UnsentRequest>
buildListOffsetsRequests(
Map<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch,
Optional.of(listOffsetsRequestState));
if (timestampsToSearchByNode.isEmpty()) {
- throw new StaleMetadataException();
+ if (listOffsetsRequestState.shouldRetry) {
+ // This exception is caught in prepareFetchOffsetsRequests()
and signals to retry this set
+ // of partitions. Only do that if the caller wants to retry
LIST_OFFSETS calls, of course.
+ throw new StaleMetadataException();
+ } else {
+ // If the caller doesn't want to retry LIST_OFFSETS calls, go
ahead and clear the
+ // 'end offsets requested' flag since there won't be another
chance. Then return an empty
+ // list to signal that there are no requests to be sent.
+
offsetFetcherUtils.clearPartitionEndOffsetRequests(timestampsToSearch.keySet());
+ return List.of();
+ }
Review Comment:
I've created KAFKA-20188 to track the work for adding a timeout to
`OffsetsRequestManager.fetchOffsets()`. So we can focus on that in a separate
PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]