lianetm commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1744318841


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -182,6 +215,222 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> fetchO
                         result.fetchedOffsets));
     }
 
+    /**
+     * Update fetch positions for assigned partitions that do not have a 
position. This will:
+     * <ul>
+     *     <li>check if all assigned partitions already have fetch positions 
and return right away if that's the case</li>
+     *     <li>trigger an async request to validate positions (detect log 
truncation)</li>
+     *     <li>fetch committed offsets if enabled, and use the response to 
update the positions</li>
+     *     <li>fetch partition offsets for partitions that may still require a 
position, and use the response to
+     *     update the positions</li>
+     * </ul>
+     *
+     * @param deadlineMs Time in milliseconds when the triggering application 
event expires. Any error received after
+     *                   this will be saved, and used to complete the result 
exceptionally on the next call to this
+     *                   function.
+     * @return Future that will complete with a boolean indicating if all 
assigned partitions have positions (based
+     * on {@link SubscriptionState#hasAllFetchPositions()}). It will complete 
immediately, with true, if all positions
+     * are already available. If some positions are missing, the future will 
complete once the offsets are retrieved and positions are updated.
+     */
+    public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+
+        try {
+            if (maybeCompleteWithPreviousException(result)) {
+                return result;
+            }
+
+            validatePositionsIfNeeded();
+
+            if (subscriptionState.hasAllFetchPositions()) {
+                // All positions are already available
+                result.complete(true);
+                return result;
+            }
+
+            // Some positions are missing, so trigger requests to fetch 
offsets and update them.
+            updatePositionsWithOffsets(deadlineMs).whenComplete((__, error) -> 
{
+                if (error != null) {
+                    result.completeExceptionally(error);
+                } else {
+                    result.complete(subscriptionState.hasAllFetchPositions());
+                }
+            });
+
+        } catch (Exception e) {
+            result.completeExceptionally(maybeWrapAsKafkaException(e));
+        }
+        return result;
+    }
+
+    private boolean 
maybeCompleteWithPreviousException(CompletableFuture<Boolean> result) {
+        Throwable cachedException = 
cachedUpdatePositionsException.getAndSet(null);
+        if (cachedException != null) {
+            result.completeExceptionally(cachedException);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Generate requests to fetch offsets, and update positions once a 
response is received.
+     */
+    private CompletableFuture<Void> updatePositionsWithOffsets(long 
deadlineMs) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        cacheExceptionIfEventExpired(result, deadlineMs);
+
+        // Reset positions using committed offsets retrieved from the group 
coordinator, for any
+        // partitions which do not have a valid position and are not awaiting 
reset. This will
+        // trigger an OffsetFetch request and update positions with the 
offsets retrieved.
+        if (commitRequestManager != null) {
+            CompletableFuture<Void> initWithCommittedOffsetsResult = 
initWithCommittedOffsetsIfNeeded(deadlineMs);
+            initWithCommittedOffsetsResult.whenComplete((__, error) -> {
+                if (error == null) {
+                    initWithPartitionOffsetsIfNeeded(result);
+                } else {
+                    result.completeExceptionally(error);
+                }
+            });
+        } else {
+            initWithPartitionOffsetsIfNeeded(result);
+        }
+        return result;
+    }
+
+    /**
+     * Save exception that may occur while updating fetch positions. Note that 
since the update fetch positions
+     * is triggered asynchronously, errors may be found when the triggering 
UpdateFetchPositionsEvent has already
+     * expired. In that case, the exception is saved in memory, to be thrown 
when processing the following
+     * UpdateFetchPositionsEvent.
+     *
+     * @param result     Update fetch positions future to get the exception 
from (if any)
+     * @param deadlineMs Deadline of the triggering application event, used to 
identify if the event has already
+     *                   expired when the error in the result future occurs.
+     */
+    private void cacheExceptionIfEventExpired(CompletableFuture<Void> result, 
long deadlineMs) {
+        result.whenComplete((__, error) -> {
+            boolean updatePositionsExpired = time.milliseconds() >= deadlineMs;
+            if (error != null && updatePositionsExpired) {
+                cachedUpdatePositionsException.set(error);
+            }
+        });
+    }
+
+    /**
+     * If there are partitions still needing a position and a reset policy is 
defined, request
+     * reset using the default policy.
+     *
+     * @param result Future that will complete when the reset operation 
completes.
+     * @throws NoOffsetForPartitionException If no reset strategy is configured
+     */
+    private void initWithPartitionOffsetsIfNeeded(CompletableFuture<Void> 
result) {
+        try {
+            // Mark partitions that need reset, using the configured reset 
strategy. If no
+            // strategy is defined, this will raise a 
NoOffsetForPartitionException exception.
+            subscriptionState.resetInitializingPositions();
+        } catch (Exception e) {
+            result.completeExceptionally(e);
+            return;
+        }
+
+        // For partitions awaiting reset, generate a ListOffset request to 
retrieve the partition
+        // offsets according to the strategy (ex. earliest, latest), and 
update the positions.
+        resetPositionsIfNeeded().whenComplete((resetResult, error) -> {
+            if (error == null) {
+                result.complete(null);
+            } else {
+                result.completeExceptionally(error);
+            }
+        });
+    }
+
+    /**
+     * Fetch the committed offsets for partitions that require initialization. 
Use them to set
+     * the fetch positions in the subscription state.
+     *
+     * @throws TimeoutException If offsets could not be retrieved within the 
timeout
+     */
+    private CompletableFuture<Void> initWithCommittedOffsetsIfNeeded(long 
deadlineMs) {
+        final Set<TopicPartition> initializingPartitions = 
subscriptionState.initializingPartitions();
+
+        if (initializingPartitions.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        // The shorter the timeout provided to poll(), the more likely the 
offsets fetch will time out. To handle
+        // this case, on the first attempt to fetch the committed offsets, a 
FetchCommittedOffsetsEvent is created
+        // (with potentially a longer timeout) and stored. The event is used 
for the first attempt, but in the
+        // case it times out, subsequent attempts will also use the event in 
order to wait for the results.
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
fetchCommittedFuture;
+        if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+            // Need to generate a new request to fetch committed offsets
+            final long fetchCommittedDeadlineMs = Math.max(deadlineMs, 
time.milliseconds() + defaultApiTimeoutMs);
+            fetchCommittedFuture = 
commitRequestManager.fetchOffsets(initializingPartitions, 
fetchCommittedDeadlineMs);
+            pendingOffsetFetchEvent = new 
PendingFetchCommittedRequest(initializingPartitions, fetchCommittedFuture);
+        } else {
+            fetchCommittedFuture = pendingOffsetFetchEvent.result;
+        }
+
+        // when the ongoing OffsetFetch completes, carry on with updating 
positions and
+        // completing the result future for the current attempt to 
initWithCommittedOffsetsIfNeeded
+        fetchCommittedFuture.whenComplete((offsets, error) -> {
+            pendingOffsetFetchEvent = null;
+
+            // Ensure we only set positions for the partitions that still 
require one (ex. some partitions may have
+            // been assigned a position manually)
+            if (error == null) {
+                Map<TopicPartition, OffsetAndMetadata> offsetsToApply = 
offsetsForInitializingPartitions(offsets);
+                refreshCommittedOffsets(offsetsToApply, metadata, 
subscriptionState);
+                result.complete(null);
+            } else {
+                log.error("Error fetching committed offsets to update 
positions", error);
+                result.completeExceptionally(error);
+            }
+        });
+
+        return result;
+    }
+
+    /**
+     * Get the offsets, from the given collection, that belong to partitions 
that still require a position (partitions
+     * that are initializing). This is expected to be used to filter out 
offsets that were retrieved for partitions
+     * that do not need a position anymore.
+     *
+     * @param offsets Offsets per partition
+     * @return Subset of the offsets associated to partitions that are still 
initializing
+     */
+    private Map<TopicPartition, OffsetAndMetadata> 
offsetsForInitializingPartitions(Map<TopicPartition, OffsetAndMetadata> 
offsets) {
+        Set<TopicPartition> currentlyInitializingPartitions = 
subscriptionState.initializingPartitions();
+        Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
+        offsets.forEach((key, value) -> {
+            if (currentlyInitializingPartitions.contains(key)) {
+                result.put(key, value);
+            }
+        });
+        return result;
+    }
+
+    /**
+     * This determines if the {@link #pendingOffsetFetchEvent pending offset 
fetch event} can be reused. Reuse
+     * is only possible if all the following conditions are true:
+     *
+     * <ul>
+     *     <li>A pending offset fetch event exists</li>
+     *     <li>The partition set of the pending offset fetch event is the same 
as the given partition set</li>
+     * </ul>
+     */
+    private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> 
partitions) {
+        if (pendingOffsetFetchEvent == null) {
+            return false;
+        }
+
+        return pendingOffsetFetchEvent.requestedPartitions.equals(partitions);

Review Comment:
   it makes sense to me. Just for the record, when the changes for caching the 
request were initially implemented (in the app thread), a separate Jira was 
created for this (https://issues.apache.org/jira/browse/KAFKA-16966). Still, 
I'm ok with changing the `containsAll` here, and keep that other Jira maybe 
only to review this in the classic consumer (I won't get there on this PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to