kirktrue commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1733075083


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1581,48 +1577,27 @@ private Fetch<K, V> collectFetch() {
 
         return fetch;
     }
+
     /**
      * Set the fetch position to the committed position (if there is one)
      * or reset it using the offset reset policy the user has configured.
      *
-     * @throws AuthenticationException If authentication fails. See the 
exception for more details
-     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
-     *             defined
      * @return true iff the operation completed without timing out
+     * @throws AuthenticationException       If authentication fails. See the 
exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *                                       defined
      */
     private boolean updateFetchPositions(final Timer timer) {
         try {
-            // Validate positions using the partition leader end offsets, to 
detect if any partition
-            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
-            // request, retrieve the partition end offsets, and validate the 
current position against it.
-            applicationEventHandler.addAndGet(new 
ValidatePositionsEvent(calculateDeadlineMs(timer)));
-
-            cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
-            if (cachedSubscriptionHasAllFetchPositions) return true;
-
-            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
-            // partitions which do not have a valid position and are not 
awaiting reset. This will
-            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
-            // will only do a coordinator lookup if there are partitions which 
have missing
-            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
-            // dependence by always ensuring that assigned partitions have an 
initial position.
-            if (isCommittedOffsetsManagementEnabled() && 
!initWithCommittedOffsetsIfNeeded(timer))
-                return false;
-
-            // If there are partitions still needing a position and a reset 
policy is defined,
-            // request reset using the default policy. If no reset strategy is 
defined and there
-            // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-            subscriptions.resetInitializingPositions();
-
-            // Reset positions using partition offsets retrieved from the 
leader, for any partitions
-            // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
-            // partition offsets according to the strategy (ex. earliest, 
latest), and update the
-            // positions.
-            applicationEventHandler.addAndGet(new 
ResetPositionsEvent(calculateDeadlineMs(timer)));
-            return true;
+            UpdateFetchPositionsEvent updateFetchPositionsEvent = new 
UpdateFetchPositionsEvent(calculateDeadlineMs(timer));
+            wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future());

Review Comment:
   Why do we need to involve the wakeup trigger for this case?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +253,14 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
-    }
-
-    private void process(final ValidatePositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     * Fetch committed offsets and use them to update positions in the 
subscription state. If no
+     * committed offsets available, fetch offsets from the leader.
+     */
+    private void process(final UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        CompletableFuture<Boolean> future =
+            
requestManagers.offsetsRequestManager.updateFetchPositions(updateFetchPositionsEvent.deadlineMs());
+        future.whenComplete(complete(updateFetchPositionsEvent.future()));

Review Comment:
   Super nit: any reason not to make the variable named `event` like the other 
methods? That'll help reduce visual clutter and make the lines a bit shorter:
   
   ```suggestion
       private void process(final UpdateFetchPositionsEvent event) {
           CompletableFuture<Boolean> future =
               
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
           future.whenComplete(complete(event.future()));
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -116,15 +138,27 @@ public OffsetsRequestManager(final SubscriptionState 
subscriptionState,
         this.subscriptionState = subscriptionState;
         this.time = time;
         this.requestTimeoutMs = requestTimeoutMs;
+        this.defaultApiTimeoutMs = defaultApiTimeoutMs;
         this.apiVersions = apiVersions;
         this.networkClientDelegate = networkClientDelegate;
-        this.backgroundEventHandler = backgroundEventHandler;
         this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, 
subscriptionState,
                 time, retryBackoffMs, apiVersions);
         // Register the cluster metadata update callback. Note this only 
relies on the
         // requestsToRetry initialized above, and won't be invoked until all 
managers are
         // initialized and the network thread started.
         this.metadata.addClusterUpdateListener(this);
+        this.commitRequestManager = commitRequestManager;
+    }
+
+    private static class PendingFetchCommittedRequest {
+        final Set<TopicPartition> requestedPartitions;
+        final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
+
+        private PendingFetchCommittedRequest(final Set<TopicPartition> 
requestedPartitions,
+                                              final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {

Review Comment:
   Super-duper nit: parameter alignment:
   
   ```suggestion
                                                final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> fetchO
                         result.fetchedOffsets));
     }
 
+    private boolean 
maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) {
+        Throwable exception = cachedUpdatePositionsException.getAndSet(null);
+        if (exception != null) {
+            // Return exception that may have been encountered on a previous 
attempt to update
+            // positions, after the triggering event had already expired.
+            result.completeExceptionally(exception);
+            return true;
+        }
+        return false;
+    }
+
+    public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+
+        if (maybeCompleteWithPreviousKnownException(result)) {
+            return result;
+        }
+
+        result.whenComplete((__, error) -> {
+            boolean updatePositionsExpired = time.milliseconds() >= deadlineMs;
+            if (error != null && updatePositionsExpired) {
+                // Update fetch positions operations are triggered 
asynchronously here in the
+                // background thread, so they may complete (with error)
+                // when the triggering UpdateFetchPositionsEvent has been 
already expired. Keep
+                // exception saved to be thrown on the next call to update 
positions.
+                cachedUpdatePositionsException.set(error);
+            }
+        });
+
+        try {
+

Review Comment:
   Super-nit: extra newline.
   
   ```suggestion
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -42,6 +42,7 @@
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
+

Review Comment:
   Super nit: extra whitespace:
   
   ```suggestion
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> fetchO
                         result.fetchedOffsets));
     }
 
+    private boolean 
maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) {
+        Throwable exception = cachedUpdatePositionsException.getAndSet(null);
+        if (exception != null) {
+            // Return exception that may have been encountered on a previous 
attempt to update
+            // positions, after the triggering event had already expired.
+            result.completeExceptionally(exception);
+            return true;
+        }
+        return false;
+    }
+
+    public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+
+        if (maybeCompleteWithPreviousKnownException(result)) {
+            return result;
+        }
+
+        result.whenComplete((__, error) -> {
+            boolean updatePositionsExpired = time.milliseconds() >= deadlineMs;
+            if (error != null && updatePositionsExpired) {
+                // Update fetch positions operations are triggered 
asynchronously here in the
+                // background thread, so they may complete (with error)
+                // when the triggering UpdateFetchPositionsEvent has been 
already expired. Keep
+                // exception saved to be thrown on the next call to update 
positions.
+                cachedUpdatePositionsException.set(error);
+            }
+        });
+
+        try {
+
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position
+            // against it. It will throw an exception if log truncation is 
detected.
+            validatePositionsIfNeeded();
+
+            boolean hasAllFetchPositions = 
subscriptionState.hasAllFetchPositions();
+            if (hasAllFetchPositions) {
+                result.complete(true);
+                return result;
+            }
+
+            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
+            // partitions which do not have a valid position and are not 
awaiting reset. This will
+            // trigger an OffsetFetch request and update positions with the 
offsets retrieved.
+            if (commitRequestManager != null) {
+                CompletableFuture<Void> initWithCommittedOffsetsResult = 
initWithCommittedOffsetsIfNeeded(deadlineMs);
+                initWithCommittedOffsetsResult.whenComplete((__, error) -> {
+                    if (error == null) {
+                        initWithPartitionOffsetsIfNeeded(result);
+                    } else {
+                        result.completeExceptionally(error);
+                    }
+                });
+            } else {
+                initWithPartitionOffsetsIfNeeded(result);
+            }
+        } catch (Exception e) {
+            result.completeExceptionally(maybeWrapAsKafkaException(e));
+        }
+        return result;
+    }
+
+    /**
+     * If there are partitions still needing a position and a reset policy is 
defined, request
+     * reset using the default policy.
+     *
+     * @param result Future that will complete when the reset operation 
completes.
+     * @throws NoOffsetForPartitionException If no reset strategy is configured
+     */
+    private void initWithPartitionOffsetsIfNeeded(CompletableFuture<Boolean> 
result) {
+        try {
+            // Mark partitions that need reset, using the configured reset 
strategy. If no
+            // strategy is defined, this will raise a 
NoOffsetForPartitionException exception.
+            subscriptionState.resetInitializingPositions();
+        } catch (Exception e) {
+            result.completeExceptionally(e);
+            return;
+        }
+
+        // For partitions awaiting reset, generate a ListOffset request to 
retrieve the partition
+        // offsets according to the strategy (ex. earliest, latest), and 
update the positions.
+        resetPositionsIfNeeded().whenComplete((resetResult, error) -> {
+            if (error == null) {
+                result.complete(false);
+            } else {
+                result.completeExceptionally(error);
+            }
+        });
+    }
+
+    // Visible for testing
+    boolean hasPendingOffsetFetchEvent() {
+        return pendingOffsetFetchEvent != null;
+    }
+
+    /**
+     * Fetch the committed offsets for partitions that require initialization. 
Use them to set
+     * the fetch positions in the subscription state.
+     *
+     * @throws TimeoutException If offsets could not be retrieved within the 
timeout
+     */
+    private CompletableFuture<Void> initWithCommittedOffsetsIfNeeded(long 
deadlineMs) {
+        final Set<TopicPartition> initializingPartitions = 
subscriptionState.initializingPartitions();
+
+        if (initializingPartitions.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        // The shorter the timeout provided to poll(), the more likely the 
offsets fetch will time out. To handle
+        // this case, on the first attempt to fetch the committed offsets, a 
FetchCommittedOffsetsEvent is created
+        // (with potentially a longer timeout) and stored. The event is used 
for the first attempt, but in the
+        // case it times out, subsequent attempts will also use the event in 
order to wait for the results.
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
fetchCommittedFuture;
+        if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+            if (pendingOffsetFetchEvent != null) {
+                // This will be the case where we were waiting for a fetch 
committed offsets request
+                // to update positions, but the set of initializing partitions 
changed. We need to
+                // cancel the pending future, to ensure that it's results are 
not used to update
+                pendingOffsetFetchEvent.result.cancel(true);
+            }
+            // Need to generate a new request to fetch committed offsets
+            final long fetchCommittedDeadlineMs = Math.max(deadlineMs, 
time.milliseconds() + defaultApiTimeoutMs);
+            fetchCommittedFuture = 
commitRequestManager.fetchOffsets(initializingPartitions, 
fetchCommittedDeadlineMs);
+            pendingOffsetFetchEvent = new 
PendingFetchCommittedRequest(initializingPartitions,
+                fetchCommittedFuture);
+        } else {
+            fetchCommittedFuture = pendingOffsetFetchEvent.result;
+        }
+
+        // when the ongoing OffsetFetch completes, carry on with updating 
positions and
+        // completing the result future for the current attempt to 
initWithCommittedOffsetsIfNeeded
+        fetchCommittedFuture.whenComplete((offsets, error) -> {
+            pendingOffsetFetchEvent = null;
+            if (error instanceof CancellationException) {
+                // Abort updating positions
+                return;
+            }
+            // If an offset fetch triggered to update positions finishes 
without being
+            // cancelled, we update positions even if the original event 
expired. The event
+            // is cancelled whenever the set of partitions to initialize 
changes
+            if (error == null) {
+                refreshCommittedOffsets(offsets, metadata, subscriptionState);
+                result.complete(null);
+            } else {
+                log.error("Error fetching committed offsets to update 
positions", error);
+                result.completeExceptionally(error);
+            }
+        });
+
+        return result;
+    }
+
+    /**
+     * This determines if the {@link #pendingOffsetFetchEvent pending offset 
fetch event} can be reused. Reuse
+     * is only possible if all the following conditions are true:
+     *
+     * <ul>
+     *     <li>A pending offset fetch event exists</li>
+     *     <li>The partition set of the pending offset fetch event is the same 
as the given partition set</li>
+     * </ul>
+     */
+    private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> 
partitions) {
+        if (pendingOffsetFetchEvent == null) {
+            return false;
+        }
+
+        return pendingOffsetFetchEvent.requestedPartitions.equals(partitions);
+    }

Review Comment:
   The corresponding code in 
`AsyncKafkaConsumer.canReusePendingOffsetFetchEvent` also checked that the 
`pendingOffsetFetchEvent`’s deadline hadn't passed. Is that no longer a concern 
after the refactor?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##########
@@ -90,9 +88,9 @@ public class OffsetsRequestManagerTest {
     private OffsetsRequestManager requestManager;
     private ConsumerMetadata metadata;
     private SubscriptionState subscriptionState;
-    private MockTime time;
+    private final Time time = mock(Time.class);

Review Comment:
   Out of curiosity, what prompted this change?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> fetchO
                         result.fetchedOffsets));
     }
 
+    private boolean 
maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) {
+        Throwable exception = cachedUpdatePositionsException.getAndSet(null);
+        if (exception != null) {
+            // Return exception that may have been encountered on a previous 
attempt to update
+            // positions, after the triggering event had already expired.
+            result.completeExceptionally(exception);
+            return true;
+        }
+        return false;
+    }
+
+    public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+
+        if (maybeCompleteWithPreviousKnownException(result)) {
+            return result;
+        }
+
+        result.whenComplete((__, error) -> {
+            boolean updatePositionsExpired = time.milliseconds() >= deadlineMs;
+            if (error != null && updatePositionsExpired) {
+                // Update fetch positions operations are triggered 
asynchronously here in the
+                // background thread, so they may complete (with error)
+                // when the triggering UpdateFetchPositionsEvent has been 
already expired. Keep
+                // exception saved to be thrown on the next call to update 
positions.
+                cachedUpdatePositionsException.set(error);
+            }
+        });
+
+        try {
+
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position
+            // against it. It will throw an exception if log truncation is 
detected.
+            validatePositionsIfNeeded();
+
+            boolean hasAllFetchPositions = 
subscriptionState.hasAllFetchPositions();
+            if (hasAllFetchPositions) {
+                result.complete(true);
+                return result;
+            }

Review Comment:
   I don't think we need the temporary variable with the refactored code.
   
   ```suggestion
               if (subscriptionState.hasAllFetchPositions()) {
                   result.complete(true);
                   return result;
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to