kirktrue commented on code in PR #21457:
URL: https://github.com/apache/kafka/pull/21457#discussion_r2802291014
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -559,7 +565,17 @@ private List<NetworkClientDelegate.UnsentRequest>
buildListOffsetsRequests(
Map<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch,
Optional.of(listOffsetsRequestState));
if (timestampsToSearchByNode.isEmpty()) {
- throw new StaleMetadataException();
+ if (listOffsetsRequestState.shouldRetry) {
+ // This exception is caught in prepareFetchOffsetsRequests()
and signals to retry this set
+ // of partitions. Only do that if the caller wants to retry
LIST_OFFSETS calls, of course.
+ throw new StaleMetadataException();
+ } else {
+ // If the caller doesn't want to retry LIST_OFFSETS calls, go
ahead and clear the
+ // 'end offsets requested' flag since there won't be another
chance. Then return an empty
+ // list to signal that there are no requests to be sent.
+
offsetFetcherUtils.clearPartitionEndOffsetRequests(timestampsToSearch.keySet());
+ return List.of();
+ }
Review Comment:
> In this case we want to refresh metadata and try again once we know the
leader. Is there a reason why we want to change that in relationship with the
issue of the flag not being cleared?
The existing behavior from `ClassicKafkaConsumer.currentLag()` is to make a
single request to `LIST_OFFSETS`. I'm trying to mimic with the
`AsyncKafkaConsumer` implementation.
I added a test to `OffsetsRequestManager` that approximates the call to
`currentLag()` that will fail due to a missing leader. From looking at the
code, it seems as though the `OffsetsRequestManager` will detect the missing
leader and retry the `LIST_OFFSETS` repeatedly, without a timeout or limit.
I'm not 100% convinced that the fire-and-forget style approach of
`ClassicKafkaConsumer` is correct or optimal either, but it's the approach I'm
trying to copy.
Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]