lianetm commented on code in PR #21457:
URL: https://github.com/apache/kafka/pull/21457#discussion_r2800175968


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -656,21 +659,33 @@ private void process(final CurrentLagEvent event) {
 
             final OptionalLong lagOpt;
             if (lag == null) {
-                if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null &&
-                    
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
-                    // If the log end offset is unknown and there isn't 
already an in-flight list offset
-                    // request, issue one with the goal that the lag will be 
available the next time the
-                    // user calls currentLag().
-                    log.info("Requesting the log end offset for {} in order to 
compute lag", topicPartition);
-                    subscriptions.requestPartitionEndOffset(topicPartition);
-
-                    // Emulates the Consumer.endOffsets() logic...
-                    Map<TopicPartition, Long> timestampToSearch = 
Collections.singletonMap(
-                        topicPartition,
-                        ListOffsetsRequest.LATEST_TIMESTAMP
-                    );
-
-                    
requestManagers.offsetsRequestManager.fetchOffsets(timestampToSearch, false);
+                if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null) {
+                    // The LIST_OFFSETS lag lookup is serialized, so if 
there's an inflight request it must
+                    // finish before another request can be issued. This 
serialization mechanism is controlled
+                    // by the 'end offset requested' flag in SubscriptionState.
+                    if 
(subscriptions.partitionEndOffsetRequested(topicPartition)) {
+                        log.info("Not requesting the log end offset for {} to 
compute lag as an outstanding request already exists", topicPartition);
+                    } else {
+                        // If the log end offset is unknown and there isn't 
already an in-flight list offset
+                        // request, issue one with the goal that the lag will 
be available the next time the
+                        // user calls currentLag().
+                        log.info("Requesting the log end offset for {} in 
order to compute lag", topicPartition);
+                        
subscriptions.requestPartitionEndOffset(topicPartition);
+
+                        // Emulates the Consumer.endOffsets() logic...
+                        Map<TopicPartition, Long> timestampToSearch = 
Collections.singletonMap(
+                            topicPartition,
+                            ListOffsetsRequest.LATEST_TIMESTAMP
+                        );
+
+                        // The currentLag() API is a "best effort" attempt at 
calling the LIST_OFFSETS RPC. If it
+                        // fails, don't retry the attempt internally, but let 
the user attempt it again.

Review Comment:
   here is were I think we may be getting something wrong. Agree we don't want 
to retry the ListOffset triggered from currentLag if it fails. But we do want 
to retry discovering a leader in order to be able to send the request (which is 
what the internal retries I pointed in the comment above do).
   
   So if this retry=false is only used to skip that leader re-discovery retry, 
I think we should remove it (no param needed probably?)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -559,7 +565,17 @@ private List<NetworkClientDelegate.UnsentRequest> 
buildListOffsetsRequests(
         Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
                 groupListOffsetRequests(timestampsToSearch, 
Optional.of(listOffsetsRequestState));
         if (timestampsToSearchByNode.isEmpty()) {
-            throw new StaleMetadataException();
+            if (listOffsetsRequestState.shouldRetry) {
+                // This exception is caught in prepareFetchOffsetsRequests() 
and signals to retry this set
+                // of partitions. Only do that if the caller wants to retry 
LIST_OFFSETS calls, of course.
+                throw new StaleMetadataException();
+            } else {
+                // If the caller doesn't want to retry LIST_OFFSETS calls, go 
ahead and clear the
+                // 'end offsets requested' flag since there won't be another 
chance. Then return an empty
+                // list to signal that there are no requests to be sent.
+                
offsetFetcherUtils.clearPartitionEndOffsetRequests(timestampsToSearch.keySet());
+                return List.of();
+            }

Review Comment:
   uhm I would say we shouldn't change this retry logic. This internal retry is 
for the case where when we don't know the leader for a partition when fetching 
offsets, so we really couldn't even send the request. In this case we want to 
refresh metadata and try again once we know the leader. Is there a reason why 
we want to change that in relationship with the issue of the flag not being 
cleared?
    
   With this change, I'm afraid that if the leader is not known when we call 
`currentLag`, it would require a next call to currentLag to issue another 
request, and a third call to lag to get the result vs the current behaviour, 
where we would retry here to discover the leader and fetch, so the next call to 
lag would find the results already. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to