lianetm commented on code in PR #21457:
URL: https://github.com/apache/kafka/pull/21457#discussion_r2800175968
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -656,21 +659,33 @@ private void process(final CurrentLagEvent event) {
final OptionalLong lagOpt;
if (lag == null) {
- if (subscriptions.partitionEndOffset(topicPartition,
isolationLevel) == null &&
-
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
- // If the log end offset is unknown and there isn't
already an in-flight list offset
- // request, issue one with the goal that the lag will be
available the next time the
- // user calls currentLag().
- log.info("Requesting the log end offset for {} in order to
compute lag", topicPartition);
- subscriptions.requestPartitionEndOffset(topicPartition);
-
- // Emulates the Consumer.endOffsets() logic...
- Map<TopicPartition, Long> timestampToSearch =
Collections.singletonMap(
- topicPartition,
- ListOffsetsRequest.LATEST_TIMESTAMP
- );
-
-
requestManagers.offsetsRequestManager.fetchOffsets(timestampToSearch, false);
+ if (subscriptions.partitionEndOffset(topicPartition,
isolationLevel) == null) {
+ // The LIST_OFFSETS lag lookup is serialized, so if
there's an inflight request it must
+ // finish before another request can be issued. This
serialization mechanism is controlled
+ // by the 'end offset requested' flag in SubscriptionState.
+ if
(subscriptions.partitionEndOffsetRequested(topicPartition)) {
+ log.info("Not requesting the log end offset for {} to
compute lag as an outstanding request already exists", topicPartition);
+ } else {
+ // If the log end offset is unknown and there isn't
already an in-flight list offset
+ // request, issue one with the goal that the lag will
be available the next time the
+ // user calls currentLag().
+ log.info("Requesting the log end offset for {} in
order to compute lag", topicPartition);
+
subscriptions.requestPartitionEndOffset(topicPartition);
+
+ // Emulates the Consumer.endOffsets() logic...
+ Map<TopicPartition, Long> timestampToSearch =
Collections.singletonMap(
+ topicPartition,
+ ListOffsetsRequest.LATEST_TIMESTAMP
+ );
+
+ // The currentLag() API is a "best effort" attempt at
calling the LIST_OFFSETS RPC. If it
+ // fails, don't retry the attempt internally, but let
the user attempt it again.
Review Comment:
here is were I think we may be getting something wrong. Agree we don't want
to retry the ListOffset triggered from currentLag if it fails. But we do want
to retry discovering a leader in order to be able to send the request (which is
what the internal retries I pointed in the comment above do).
So if this retry=false is only used to skip that leader re-discovery retry,
I think we should remove it (no param needed probably?)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -559,7 +565,17 @@ private List<NetworkClientDelegate.UnsentRequest>
buildListOffsetsRequests(
Map<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch,
Optional.of(listOffsetsRequestState));
if (timestampsToSearchByNode.isEmpty()) {
- throw new StaleMetadataException();
+ if (listOffsetsRequestState.shouldRetry) {
+ // This exception is caught in prepareFetchOffsetsRequests()
and signals to retry this set
+ // of partitions. Only do that if the caller wants to retry
LIST_OFFSETS calls, of course.
+ throw new StaleMetadataException();
+ } else {
+ // If the caller doesn't want to retry LIST_OFFSETS calls, go
ahead and clear the
+ // 'end offsets requested' flag since there won't be another
chance. Then return an empty
+ // list to signal that there are no requests to be sent.
+
offsetFetcherUtils.clearPartitionEndOffsetRequests(timestampsToSearch.keySet());
+ return List.of();
+ }
Review Comment:
uhm I would say we shouldn't change this retry logic. This internal retry is
for the case where when we don't know the leader for a partition when fetching
offsets, so we really couldn't even send the request. In this case we want to
refresh metadata and try again once we know the leader. Is there a reason why
we want to change that in relationship with the issue of the flag not being
cleared?
With this change, I'm afraid that if the leader is not known when we call
`currentLag`, it would require a next call to currentLag to issue another
request, and a third call to lag to get the result vs the current behaviour,
where we would retry here to discover the leader and fetch, so the next call to
lag would find the results already.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]