kirktrue commented on code in PR #21457:
URL: https://github.com/apache/kafka/pull/21457#discussion_r2802300415
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -656,21 +659,33 @@ private void process(final CurrentLagEvent event) {
final OptionalLong lagOpt;
if (lag == null) {
- if (subscriptions.partitionEndOffset(topicPartition,
isolationLevel) == null &&
-
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
- // If the log end offset is unknown and there isn't
already an in-flight list offset
- // request, issue one with the goal that the lag will be
available the next time the
- // user calls currentLag().
- log.info("Requesting the log end offset for {} in order to
compute lag", topicPartition);
- subscriptions.requestPartitionEndOffset(topicPartition);
-
- // Emulates the Consumer.endOffsets() logic...
- Map<TopicPartition, Long> timestampToSearch =
Collections.singletonMap(
- topicPartition,
- ListOffsetsRequest.LATEST_TIMESTAMP
- );
-
-
requestManagers.offsetsRequestManager.fetchOffsets(timestampToSearch, false);
+ if (subscriptions.partitionEndOffset(topicPartition,
isolationLevel) == null) {
+ // The LIST_OFFSETS lag lookup is serialized, so if
there's an inflight request it must
+ // finish before another request can be issued. This
serialization mechanism is controlled
+ // by the 'end offset requested' flag in SubscriptionState.
+ if
(subscriptions.partitionEndOffsetRequested(topicPartition)) {
+ log.info("Not requesting the log end offset for {} to
compute lag as an outstanding request already exists", topicPartition);
+ } else {
+ // If the log end offset is unknown and there isn't
already an in-flight list offset
+ // request, issue one with the goal that the lag will
be available the next time the
+ // user calls currentLag().
+ log.info("Requesting the log end offset for {} in
order to compute lag", topicPartition);
+
subscriptions.requestPartitionEndOffset(topicPartition);
+
+ // Emulates the Consumer.endOffsets() logic...
+ Map<TopicPartition, Long> timestampToSearch =
Collections.singletonMap(
+ topicPartition,
+ ListOffsetsRequest.LATEST_TIMESTAMP
+ );
+
+ // The currentLag() API is a "best effort" attempt at
calling the LIST_OFFSETS RPC. If it
+ // fails, don't retry the attempt internally, but let
the user attempt it again.
Review Comment:
The leader rediscovery comes from the call to `Metadata.requestUpdate()`
that's made in `OffsetsRequestManager.groupListOffsetRequests()`, right? I
didn't see that throwing the `StaleMetadataException` did anything besides
re-enque the `ListOffsetsRequestState`, which would then send a follow up
`LIST_OFFSETS` on the next call to `OffsetsRequestManager.poll()`, right?
My apologies if I'm missing something obvious 😄
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]