lianetm commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1724977673


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     *
+     * Fetch committed offsets and use them to update positions in the 
subscription state. If no
+     * committed offsets available, fetch offsets from the leader.
+     */
+    private void process(final UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        try {
+            // The event could be completed in the app thread before it got to 
be
+            // processed in the background (ex. interrupted)
+            if (updateFetchPositionsEvent.future().isCompletedExceptionally()) 
{
+                log.debug("UpdateFetchPositions event {} was completed 
exceptionally before it " +
+                    "got time to be processed.", updateFetchPositionsEvent);
+                return;
+            }
+
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position
+            // against it. It will throw an exception if log truncation is 
detected.
+            requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
+
+            boolean hasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
+            if (hasAllFetchPositions) {
+                updateFetchPositionsEvent.future().complete(true);
+                return;
+            }
+
+            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
+            // partitions which do not have a valid position and are not 
awaiting reset. This will
+            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
+            // will only do a coordinator lookup if there are partitions which 
have missing
+            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
+            // dependence by always ensuring that assigned partitions have an 
initial position.
+            if (requestManagers.commitRequestManager.isPresent()) {
+                CompletableFuture<Void> initWithCommittedOffsetsResult = 
initWithCommittedOffsetsIfNeeded(updateFetchPositionsEvent);
+                initWithCommittedOffsetsResult.whenComplete((__, error) -> {
+                    if (error == null) {
+                        // Retrieve partition offsets to init positions for 
partitions that still
+                        // don't have a valid position
+                        
initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent);
+                    } else {
+                        
updateFetchPositionsEvent.future().completeExceptionally(error);
+                    }
+                });
+            } else {
+                initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent);
+            }
+
+        } catch (Exception e) {
+            
updateFetchPositionsEvent.future().completeExceptionally(maybeWrapAsKafkaException(e));
+        }
     }
 
-    private void process(final ValidatePositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    private void initWithPartitionOffsetsIfNeeded(final 
UpdateFetchPositionsEvent updateFetchPositionsEvent) {
+        try {
+            // If there are partitions still needing a position and a reset 
policy is defined,
+            // request reset using the default policy. If no reset strategy is 
defined and there
+            // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
+            subscriptions.resetInitializingPositions();
+
+            // Reset positions using partition offsets retrieved from the 
leader, for any partitions
+            // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
+            // partition offsets according to the strategy (ex. earliest, 
latest), and update the
+            // positions.
+            CompletableFuture<Void> resetPositionsFuture = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
+
+            resetPositionsFuture.whenComplete((result, error) -> {
+                if (updateFetchPositionsEvent.future().isDone()) {
+                    log.debug("UpdateFetchPositions event {} had already 
expired when reset " +
+                        "positions completed.", updateFetchPositionsEvent);
+                    return;
+                }
+                if (error == null) {
+                    updateFetchPositionsEvent.future().complete(false);
+                } else {
+                    
updateFetchPositionsEvent.future().completeExceptionally(error);
+                }
+            });
+        } catch (Exception e) {
+            updateFetchPositionsEvent.future().completeExceptionally(e);
+        }
+    }
+
+    /**
+     * Fetch the committed offsets for partitions that require initialization. 
Use them to set
+     * the fetch positions in the subscription state.
+     *
+     * @throws TimeoutException If offsets could not be retrieved within the 
timeout
+     */
+    private CompletableFuture<Void> 
initWithCommittedOffsetsIfNeeded(UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        final Set<TopicPartition> initializingPartitions = 
subscriptions.initializingPartitions();
+
+        if (initializingPartitions.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        // The shorter the timeout provided to poll(), the more likely the 
offsets fetch will time out. To handle
+        // this case, on the first attempt to fetch the committed offsets, a 
FetchCommittedOffsetsEvent is created
+        // (with potentially a longer timeout) and stored. The event is used 
for the first attempt, but in the
+        // case it times out, subsequent attempts will also use the event in 
order to wait for the results.
+        if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+            final long deadlineMs = 
Math.max(updateFetchPositionsEvent.deadlineMs(), 
updateFetchPositionsEvent.fetchOffsetsDeadlineMs());

Review Comment:
   I totally get your point. The alternative was to get the config for 
"defaultApiTimeout" in the UpdateFetchPositionsEvent (that's what we need for 
the 2nd deadline). This is changing now anyways, because of the changes to 
address other comments, so we won't have it anymore. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to