kirktrue commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1722491854


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     *
+     * Fetch committed offsets and use them to update positions in the 
subscription state. If no
+     * committed offsets available, fetch offsets from the leader.
+     */
+    private void process(final UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        try {
+            // The event could be completed in the app thread before it got to 
be
+            // processed in the background (ex. interrupted)
+            if (updateFetchPositionsEvent.future().isCompletedExceptionally()) 
{
+                log.debug("UpdateFetchPositions event {} was completed 
exceptionally before it " +
+                    "got time to be processed.", updateFetchPositionsEvent);
+                return;
+            }
+
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position
+            // against it. It will throw an exception if log truncation is 
detected.
+            requestManagers.offsetsRequestManager.validatePositionsIfNeeded();

Review Comment:
   `validatePositionsIfNeeded()` returns a `CompletableFuture`. We need to wait 
for it to be completed before continuing, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UpdateFetchPositionsEvent.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+
+/**
+ * Event to update the position to fetch from. This will use the committed 
offsets if available.
+ * If no committed offsets exist, it will use the partition offsets.
+ *
+ * <p/>
+ *
+ * The event completes with a boolean value indicating if all assigned 
partitions already had
+ * valid fetch positions (based on {@link 
SubscriptionState#hasAllFetchPositions()}).
+ */
+public class UpdateFetchPositionsEvent extends 
CompletableApplicationEvent<Boolean> {
+
+    /**
+     * Deadline to complete the UpdateFetchPositionsEvent. If the event does 
not complete before
+     * this time, it will be completed exceptionally with a TimeoutException.
+     */
+    private final long deadlineMs;

Review Comment:
   The `CompletableApplicationEvent` already has a `deadlineMs` to control the 
event's lifetime. What's the benefit to reproducing that logic in this subclass?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -52,15 +59,27 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     private final ConsumerMetadata metadata;
     private final SubscriptionState subscriptions;
     private final RequestManagers requestManagers;
+    private final Time time;
+
+    /**
+     * OffsetFetch request triggered to update fetch positions. The request is 
kept. It will be
+     * cleared every time a response with the committed offsets is received 
and used to update
+     * fetch positions. If the response cannot be used because the 
UpdateFetchPositions expired,
+     * it will be kept to be used on the next attempt to update fetch 
positions if partitions
+     * remain the same.
+     */
+    private FetchCommittedOffsetsEvent pendingOffsetFetchEvent;
 

Review Comment:
   We've introduced a lot of logic and state in this class. I'd really like to 
consider moving the bulk of `process()`, `initWithPartitionOffsetsIfNeeded()`, 
and `initWithPartitionOffsetsIfNeeded()` to a dedicated class. I'd prefer to 
keep `ApplicationEventProcessor` focused on dispatching events to their 
corresponding `RequestManager` method(s).



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     *
+     * Fetch committed offsets and use them to update positions in the 
subscription state. If no
+     * committed offsets available, fetch offsets from the leader.
+     */
+    private void process(final UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        try {
+            // The event could be completed in the app thread before it got to 
be
+            // processed in the background (ex. interrupted)
+            if (updateFetchPositionsEvent.future().isCompletedExceptionally()) 
{
+                log.debug("UpdateFetchPositions event {} was completed 
exceptionally before it " +
+                    "got time to be processed.", updateFetchPositionsEvent);
+                return;
+            }
+
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position
+            // against it. It will throw an exception if log truncation is 
detected.
+            requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
+
+            boolean hasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
+            if (hasAllFetchPositions) {
+                updateFetchPositionsEvent.future().complete(true);
+                return;
+            }
+
+            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
+            // partitions which do not have a valid position and are not 
awaiting reset. This will
+            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
+            // will only do a coordinator lookup if there are partitions which 
have missing
+            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
+            // dependence by always ensuring that assigned partitions have an 
initial position.
+            if (requestManagers.commitRequestManager.isPresent()) {
+                CompletableFuture<Void> initWithCommittedOffsetsResult = 
initWithCommittedOffsetsIfNeeded(updateFetchPositionsEvent);
+                initWithCommittedOffsetsResult.whenComplete((__, error) -> {
+                    if (error == null) {
+                        // Retrieve partition offsets to init positions for 
partitions that still
+                        // don't have a valid position
+                        
initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent);
+                    } else {
+                        
updateFetchPositionsEvent.future().completeExceptionally(error);
+                    }
+                });
+            } else {
+                initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent);
+            }
+

Review Comment:
   Super nitโ€”extra newline.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     *
+     * Fetch committed offsets and use them to update positions in the 
subscription state. If no
+     * committed offsets available, fetch offsets from the leader.
+     */
+    private void process(final UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        try {
+            // The event could be completed in the app thread before it got to 
be
+            // processed in the background (ex. interrupted)
+            if (updateFetchPositionsEvent.future().isCompletedExceptionally()) 
{
+                log.debug("UpdateFetchPositions event {} was completed 
exceptionally before it " +
+                    "got time to be processed.", updateFetchPositionsEvent);
+                return;
+            }
+
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position
+            // against it. It will throw an exception if log truncation is 
detected.
+            requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
+
+            boolean hasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
+            if (hasAllFetchPositions) {
+                updateFetchPositionsEvent.future().complete(true);
+                return;
+            }
+
+            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
+            // partitions which do not have a valid position and are not 
awaiting reset. This will
+            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
+            // will only do a coordinator lookup if there are partitions which 
have missing
+            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
+            // dependence by always ensuring that assigned partitions have an 
initial position.
+            if (requestManagers.commitRequestManager.isPresent()) {
+                CompletableFuture<Void> initWithCommittedOffsetsResult = 
initWithCommittedOffsetsIfNeeded(updateFetchPositionsEvent);
+                initWithCommittedOffsetsResult.whenComplete((__, error) -> {
+                    if (error == null) {
+                        // Retrieve partition offsets to init positions for 
partitions that still
+                        // don't have a valid position
+                        
initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent);
+                    } else {
+                        
updateFetchPositionsEvent.future().completeExceptionally(error);
+                    }
+                });
+            } else {
+                initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent);
+            }
+
+        } catch (Exception e) {
+            
updateFetchPositionsEvent.future().completeExceptionally(maybeWrapAsKafkaException(e));
+        }
     }
 
-    private void process(final ValidatePositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    private void initWithPartitionOffsetsIfNeeded(final 
UpdateFetchPositionsEvent updateFetchPositionsEvent) {
+        try {
+            // If there are partitions still needing a position and a reset 
policy is defined,
+            // request reset using the default policy. If no reset strategy is 
defined and there
+            // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
+            subscriptions.resetInitializingPositions();
+
+            // Reset positions using partition offsets retrieved from the 
leader, for any partitions
+            // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
+            // partition offsets according to the strategy (ex. earliest, 
latest), and update the
+            // positions.
+            CompletableFuture<Void> resetPositionsFuture = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
+
+            resetPositionsFuture.whenComplete((result, error) -> {
+                if (updateFetchPositionsEvent.future().isDone()) {
+                    log.debug("UpdateFetchPositions event {} had already 
expired when reset " +
+                        "positions completed.", updateFetchPositionsEvent);
+                    return;
+                }
+                if (error == null) {
+                    updateFetchPositionsEvent.future().complete(false);
+                } else {
+                    
updateFetchPositionsEvent.future().completeExceptionally(error);
+                }
+            });
+        } catch (Exception e) {
+            updateFetchPositionsEvent.future().completeExceptionally(e);
+        }
+    }
+
+    /**
+     * Fetch the committed offsets for partitions that require initialization. 
Use them to set
+     * the fetch positions in the subscription state.
+     *
+     * @throws TimeoutException If offsets could not be retrieved within the 
timeout
+     */
+    private CompletableFuture<Void> 
initWithCommittedOffsetsIfNeeded(UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        final Set<TopicPartition> initializingPartitions = 
subscriptions.initializingPartitions();
+
+        if (initializingPartitions.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        // The shorter the timeout provided to poll(), the more likely the 
offsets fetch will time out. To handle
+        // this case, on the first attempt to fetch the committed offsets, a 
FetchCommittedOffsetsEvent is created
+        // (with potentially a longer timeout) and stored. The event is used 
for the first attempt, but in the
+        // case it times out, subsequent attempts will also use the event in 
order to wait for the results.
+        if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+            final long deadlineMs = 
Math.max(updateFetchPositionsEvent.deadlineMs(), 
updateFetchPositionsEvent.fetchOffsetsDeadlineMs());

Review Comment:
   It seems like we could possibly calculate the fetch offsets deadline here 
instead of including two deadlines in the event. Sorry, I'm kind of hung up on 
the two deadlines in the event ๐Ÿ˜ž



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     *
+     * Fetch committed offsets and use them to update positions in the 
subscription state. If no
+     * committed offsets available, fetch offsets from the leader.
+     */
+    private void process(final UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        try {
+            // The event could be completed in the app thread before it got to 
be
+            // processed in the background (ex. interrupted)
+            if (updateFetchPositionsEvent.future().isCompletedExceptionally()) 
{
+                log.debug("UpdateFetchPositions event {} was completed 
exceptionally before it " +
+                    "got time to be processed.", updateFetchPositionsEvent);
+                return;
+            }

Review Comment:
   Similar to the code in `AsyncKafkaConsumer`, isn't this case true for any of 
the `CompletableApplicationEvent` subclasses? If that's the case, we should 
probably handle it in a consistent way at a higher layer ๐Ÿค”



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1581,48 +1579,37 @@ private Fetch<K, V> collectFetch() {
 
         return fetch;
     }
+
     /**
      * Set the fetch position to the committed position (if there is one)
      * or reset it using the offset reset policy the user has configured.
      *
-     * @throws AuthenticationException If authentication fails. See the 
exception for more details
-     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
-     *             defined
      * @return true iff the operation completed without timing out
+     * @throws AuthenticationException       If authentication fails. See the 
exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *                                       defined
      */
     private boolean updateFetchPositions(final Timer timer) {
+        UpdateFetchPositionsEvent updateFetchPositionsEvent = null;
         try {
-            // Validate positions using the partition leader end offsets, to 
detect if any partition
-            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
-            // request, retrieve the partition end offsets, and validate the 
current position against it.
-            applicationEventHandler.addAndGet(new 
ValidatePositionsEvent(calculateDeadlineMs(timer)));
-
-            cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
-            if (cachedSubscriptionHasAllFetchPositions) return true;
-
-            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
-            // partitions which do not have a valid position and are not 
awaiting reset. This will
-            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
-            // will only do a coordinator lookup if there are partitions which 
have missing
-            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
-            // dependence by always ensuring that assigned partitions have an 
initial position.
-            if (isCommittedOffsetsManagementEnabled() && 
!initWithCommittedOffsetsIfNeeded(timer))
-                return false;
-
-            // If there are partitions still needing a position and a reset 
policy is defined,
-            // request reset using the default policy. If no reset strategy is 
defined and there
-            // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-            subscriptions.resetInitializingPositions();
-
-            // Reset positions using partition offsets retrieved from the 
leader, for any partitions
-            // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
-            // partition offsets according to the strategy (ex. earliest, 
latest), and update the
-            // positions.
-            applicationEventHandler.addAndGet(new 
ResetPositionsEvent(calculateDeadlineMs(timer)));
-            return true;
+            updateFetchPositionsEvent = new 
UpdateFetchPositionsEvent(calculateDeadlineMs(timer),
+                calculateDeadlineMs(time, defaultApiTimeoutMs));

Review Comment:
   Looking at just this calling code, it's a little hard to understand why 
we're passing in two different timeouts. Perhaps declaring some variables with 
meaningful names, adding some comments, or something might help?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1581,48 +1579,37 @@ private Fetch<K, V> collectFetch() {
 
         return fetch;
     }
+
     /**
      * Set the fetch position to the committed position (if there is one)
      * or reset it using the offset reset policy the user has configured.
      *
-     * @throws AuthenticationException If authentication fails. See the 
exception for more details
-     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
-     *             defined
      * @return true iff the operation completed without timing out
+     * @throws AuthenticationException       If authentication fails. See the 
exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *                                       defined
      */
     private boolean updateFetchPositions(final Timer timer) {
+        UpdateFetchPositionsEvent updateFetchPositionsEvent = null;

Review Comment:
   Nit: it doesn't seem like this event needs to be declared outside the `try`, 
does it?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1581,48 +1579,37 @@ private Fetch<K, V> collectFetch() {
 
         return fetch;
     }
+
     /**
      * Set the fetch position to the committed position (if there is one)
      * or reset it using the offset reset policy the user has configured.
      *
-     * @throws AuthenticationException If authentication fails. See the 
exception for more details
-     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
-     *             defined
      * @return true iff the operation completed without timing out
+     * @throws AuthenticationException       If authentication fails. See the 
exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *                                       defined
      */
     private boolean updateFetchPositions(final Timer timer) {
+        UpdateFetchPositionsEvent updateFetchPositionsEvent = null;
         try {
-            // Validate positions using the partition leader end offsets, to 
detect if any partition
-            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
-            // request, retrieve the partition end offsets, and validate the 
current position against it.
-            applicationEventHandler.addAndGet(new 
ValidatePositionsEvent(calculateDeadlineMs(timer)));
-
-            cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
-            if (cachedSubscriptionHasAllFetchPositions) return true;
-
-            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
-            // partitions which do not have a valid position and are not 
awaiting reset. This will
-            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
-            // will only do a coordinator lookup if there are partitions which 
have missing
-            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
-            // dependence by always ensuring that assigned partitions have an 
initial position.
-            if (isCommittedOffsetsManagementEnabled() && 
!initWithCommittedOffsetsIfNeeded(timer))
-                return false;
-
-            // If there are partitions still needing a position and a reset 
policy is defined,
-            // request reset using the default policy. If no reset strategy is 
defined and there
-            // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-            subscriptions.resetInitializingPositions();
-
-            // Reset positions using partition offsets retrieved from the 
leader, for any partitions
-            // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
-            // partition offsets according to the strategy (ex. earliest, 
latest), and update the
-            // positions.
-            applicationEventHandler.addAndGet(new 
ResetPositionsEvent(calculateDeadlineMs(timer)));
-            return true;
+            updateFetchPositionsEvent = new 
UpdateFetchPositionsEvent(calculateDeadlineMs(timer),
+                calculateDeadlineMs(time, defaultApiTimeoutMs));
+            wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future());
+
+            if (Thread.interrupted()) {
+                // Ensure we propagate the interrupted exception if the thread 
was interrupted
+                // before the updateFetchPositions event is processed. 
Otherwise, this exception
+                // could be swallowed if event is processed fast enough in the 
background after
+                // being added, so that it's already completed when getting 
the result

Review Comment:
   I'm trying to wrap my head around the `Thread.interrupted()` check here. Per 
the comments, it seems like the case we're trying to prevent could happen on 
any of the events, right? ๐Ÿค”
   
   Or is updating fetched positions really a special case compared to 
everything else?
   
   Sorry for being a bit slow ๐Ÿ˜†



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     *
+     * Fetch committed offsets and use them to update positions in the 
subscription state. If no
+     * committed offsets available, fetch offsets from the leader.
+     */
+    private void process(final UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        try {
+            // The event could be completed in the app thread before it got to 
be
+            // processed in the background (ex. interrupted)
+            if (updateFetchPositionsEvent.future().isCompletedExceptionally()) 
{
+                log.debug("UpdateFetchPositions event {} was completed 
exceptionally before it " +
+                    "got time to be processed.", updateFetchPositionsEvent);
+                return;
+            }
+
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position
+            // against it. It will throw an exception if log truncation is 
detected.
+            requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
+
+            boolean hasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
+            if (hasAllFetchPositions) {
+                updateFetchPositionsEvent.future().complete(true);
+                return;
+            }
+
+            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
+            // partitions which do not have a valid position and are not 
awaiting reset. This will
+            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
+            // will only do a coordinator lookup if there are partitions which 
have missing
+            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
+            // dependence by always ensuring that assigned partitions have an 
initial position.
+            if (requestManagers.commitRequestManager.isPresent()) {
+                CompletableFuture<Void> initWithCommittedOffsetsResult = 
initWithCommittedOffsetsIfNeeded(updateFetchPositionsEvent);
+                initWithCommittedOffsetsResult.whenComplete((__, error) -> {
+                    if (error == null) {
+                        // Retrieve partition offsets to init positions for 
partitions that still
+                        // don't have a valid position
+                        
initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent);
+                    } else {
+                        
updateFetchPositionsEvent.future().completeExceptionally(error);
+                    }
+                });
+            } else {
+                initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent);
+            }
+
+        } catch (Exception e) {
+            
updateFetchPositionsEvent.future().completeExceptionally(maybeWrapAsKafkaException(e));
+        }
     }
 
-    private void process(final ValidatePositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    private void initWithPartitionOffsetsIfNeeded(final 
UpdateFetchPositionsEvent updateFetchPositionsEvent) {
+        try {
+            // If there are partitions still needing a position and a reset 
policy is defined,
+            // request reset using the default policy. If no reset strategy is 
defined and there
+            // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
+            subscriptions.resetInitializingPositions();
+
+            // Reset positions using partition offsets retrieved from the 
leader, for any partitions
+            // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
+            // partition offsets according to the strategy (ex. earliest, 
latest), and update the
+            // positions.
+            CompletableFuture<Void> resetPositionsFuture = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
+
+            resetPositionsFuture.whenComplete((result, error) -> {
+                if (updateFetchPositionsEvent.future().isDone()) {
+                    log.debug("UpdateFetchPositions event {} had already 
expired when reset " +
+                        "positions completed.", updateFetchPositionsEvent);
+                    return;
+                }
+                if (error == null) {
+                    updateFetchPositionsEvent.future().complete(false);
+                } else {
+                    
updateFetchPositionsEvent.future().completeExceptionally(error);
+                }
+            });
+        } catch (Exception e) {
+            updateFetchPositionsEvent.future().completeExceptionally(e);

Review Comment:
   Do we need to wrap this exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to