AndrewJSchofield commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1733481236


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> fetchO
                         result.fetchedOffsets));
     }
 
+    private boolean 
maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) {
+        Throwable exception = cachedUpdatePositionsException.getAndSet(null);
+        if (exception != null) {
+            // Return exception that may have been encountered on a previous 
attempt to update
+            // positions, after the triggering event had already expired.
+            result.completeExceptionally(exception);
+            return true;
+        }
+        return false;
+    }
+
+    public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+
+        if (maybeCompleteWithPreviousKnownException(result)) {
+            return result;
+        }
+
+        result.whenComplete((__, error) -> {
+            boolean updatePositionsExpired = time.milliseconds() >= deadlineMs;
+            if (error != null && updatePositionsExpired) {
+                // Update fetch positions operations are triggered 
asynchronously here in the
+                // background thread, so they may complete (with error)
+                // when the triggering UpdateFetchPositionsEvent has been 
already expired. Keep
+                // exception saved to be thrown on the next call to update 
positions.
+                cachedUpdatePositionsException.set(error);
+            }
+        });
+
+        try {
+
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position
+            // against it. It will throw an exception if log truncation is 
detected.
+            validatePositionsIfNeeded();
+
+            boolean hasAllFetchPositions = 
subscriptionState.hasAllFetchPositions();
+            if (hasAllFetchPositions) {
+                result.complete(true);
+                return result;
+            }
+
+            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
+            // partitions which do not have a valid position and are not 
awaiting reset. This will
+            // trigger an OffsetFetch request and update positions with the 
offsets retrieved.
+            if (commitRequestManager != null) {
+                CompletableFuture<Void> initWithCommittedOffsetsResult = 
initWithCommittedOffsetsIfNeeded(deadlineMs);
+                initWithCommittedOffsetsResult.whenComplete((__, error) -> {
+                    if (error == null) {
+                        initWithPartitionOffsetsIfNeeded(result);
+                    } else {
+                        result.completeExceptionally(error);
+                    }
+                });
+            } else {
+                initWithPartitionOffsetsIfNeeded(result);
+            }
+        } catch (Exception e) {
+            result.completeExceptionally(maybeWrapAsKafkaException(e));
+        }
+        return result;
+    }
+
+    /**
+     * If there are partitions still needing a position and a reset policy is 
defined, request
+     * reset using the default policy.
+     *
+     * @param result Future that will complete when the reset operation 
completes.
+     * @throws NoOffsetForPartitionException If no reset strategy is configured
+     */
+    private void initWithPartitionOffsetsIfNeeded(CompletableFuture<Boolean> 
result) {
+        try {
+            // Mark partitions that need reset, using the configured reset 
strategy. If no
+            // strategy is defined, this will raise a 
NoOffsetForPartitionException exception.
+            subscriptionState.resetInitializingPositions();
+        } catch (Exception e) {
+            result.completeExceptionally(e);
+            return;
+        }
+
+        // For partitions awaiting reset, generate a ListOffset request to 
retrieve the partition
+        // offsets according to the strategy (ex. earliest, latest), and 
update the positions.
+        resetPositionsIfNeeded().whenComplete((resetResult, error) -> {
+            if (error == null) {
+                result.complete(false);
+            } else {
+                result.completeExceptionally(error);
+            }
+        });
+    }
+
+    // Visible for testing
+    boolean hasPendingOffsetFetchEvent() {
+        return pendingOffsetFetchEvent != null;
+    }
+
+    /**
+     * Fetch the committed offsets for partitions that require initialization. 
Use them to set
+     * the fetch positions in the subscription state.
+     *
+     * @throws TimeoutException If offsets could not be retrieved within the 
timeout
+     */
+    private CompletableFuture<Void> initWithCommittedOffsetsIfNeeded(long 
deadlineMs) {
+        final Set<TopicPartition> initializingPartitions = 
subscriptionState.initializingPartitions();
+
+        if (initializingPartitions.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        // The shorter the timeout provided to poll(), the more likely the 
offsets fetch will time out. To handle
+        // this case, on the first attempt to fetch the committed offsets, a 
FetchCommittedOffsetsEvent is created
+        // (with potentially a longer timeout) and stored. The event is used 
for the first attempt, but in the
+        // case it times out, subsequent attempts will also use the event in 
order to wait for the results.
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
fetchCommittedFuture;
+        if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
+            if (pendingOffsetFetchEvent != null) {
+                // This will be the case where we were waiting for a fetch 
committed offsets request
+                // to update positions, but the set of initializing partitions 
changed. We need to
+                // cancel the pending future, to ensure that it's results are 
not used to update
+                pendingOffsetFetchEvent.result.cancel(true);
+            }
+            // Need to generate a new request to fetch committed offsets
+            final long fetchCommittedDeadlineMs = Math.max(deadlineMs, 
time.milliseconds() + defaultApiTimeoutMs);
+            fetchCommittedFuture = 
commitRequestManager.fetchOffsets(initializingPartitions, 
fetchCommittedDeadlineMs);
+            pendingOffsetFetchEvent = new 
PendingFetchCommittedRequest(initializingPartitions,

Review Comment:
   nit: This would be more legible as a single line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to