kirktrue commented on code in PR #16885: URL: https://github.com/apache/kafka/pull/16885#discussion_r1733075083
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1581,48 +1577,27 @@ private Fetch<K, V> collectFetch() { return fetch; } + /** * Set the fetch position to the committed position (if there is one) * or reset it using the offset reset policy the user has configured. * - * @throws AuthenticationException If authentication fails. See the exception for more details - * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is - * defined * @return true iff the operation completed without timing out + * @throws AuthenticationException If authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined */ private boolean updateFetchPositions(final Timer timer) { try { - // Validate positions using the partition leader end offsets, to detect if any partition - // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch - // request, retrieve the partition end offsets, and validate the current position against it. - applicationEventHandler.addAndGet(new ValidatePositionsEvent(calculateDeadlineMs(timer))); - - cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); - if (cachedSubscriptionHasAllFetchPositions) return true; - - // Reset positions using committed offsets retrieved from the group coordinator, for any - // partitions which do not have a valid position and are not awaiting reset. This will - // trigger an OffsetFetch request and update positions with the offsets retrieved. This - // will only do a coordinator lookup if there are partitions which have missing - // positions, so a consumer with manually assigned partitions can avoid a coordinator - // dependence by always ensuring that assigned partitions have an initial position. - if (isCommittedOffsetsManagementEnabled() && !initWithCommittedOffsetsIfNeeded(timer)) - return false; - - // If there are partitions still needing a position and a reset policy is defined, - // request reset using the default policy. If no reset strategy is defined and there - // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. - subscriptions.resetInitializingPositions(); - - // Reset positions using partition offsets retrieved from the leader, for any partitions - // which are awaiting reset. This will trigger a ListOffset request, retrieve the - // partition offsets according to the strategy (ex. earliest, latest), and update the - // positions. - applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer))); - return true; + UpdateFetchPositionsEvent updateFetchPositionsEvent = new UpdateFetchPositionsEvent(calculateDeadlineMs(timer)); + wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future()); Review Comment: Why do we need to involve the wakeup trigger for this case? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -256,14 +253,14 @@ private void process(final UnsubscribeEvent event) { } } - private void process(final ResetPositionsEvent event) { - CompletableFuture<Void> future = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - future.whenComplete(complete(event.future())); - } - - private void process(final ValidatePositionsEvent event) { - CompletableFuture<Void> future = requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); - future.whenComplete(complete(event.future())); + /** + * Fetch committed offsets and use them to update positions in the subscription state. If no + * committed offsets available, fetch offsets from the leader. + */ + private void process(final UpdateFetchPositionsEvent updateFetchPositionsEvent) { + CompletableFuture<Boolean> future = + requestManagers.offsetsRequestManager.updateFetchPositions(updateFetchPositionsEvent.deadlineMs()); + future.whenComplete(complete(updateFetchPositionsEvent.future())); Review Comment: Super nit: any reason not to make the variable named `event` like the other methods? That'll help reduce visual clutter and make the lines a bit shorter: ```suggestion private void process(final UpdateFetchPositionsEvent event) { CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); future.whenComplete(complete(event.future())); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -116,15 +138,27 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, this.subscriptionState = subscriptionState; this.time = time; this.requestTimeoutMs = requestTimeoutMs; + this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.apiVersions = apiVersions; this.networkClientDelegate = networkClientDelegate; - this.backgroundEventHandler = backgroundEventHandler; this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState, time, retryBackoffMs, apiVersions); // Register the cluster metadata update callback. Note this only relies on the // requestsToRetry initialized above, and won't be invoked until all managers are // initialized and the network thread started. this.metadata.addClusterUpdateListener(this); + this.commitRequestManager = commitRequestManager; + } + + private static class PendingFetchCommittedRequest { + final Set<TopicPartition> requestedPartitions; + final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result; + + private PendingFetchCommittedRequest(final Set<TopicPartition> requestedPartitions, + final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { Review Comment: Super-duper nit: parameter alignment: ```suggestion final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchO result.fetchedOffsets)); } + private boolean maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) { + Throwable exception = cachedUpdatePositionsException.getAndSet(null); + if (exception != null) { + // Return exception that may have been encountered on a previous attempt to update + // positions, after the triggering event had already expired. + result.completeExceptionally(exception); + return true; + } + return false; + } + + public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) { + CompletableFuture<Boolean> result = new CompletableFuture<>(); + + if (maybeCompleteWithPreviousKnownException(result)) { + return result; + } + + result.whenComplete((__, error) -> { + boolean updatePositionsExpired = time.milliseconds() >= deadlineMs; + if (error != null && updatePositionsExpired) { + // Update fetch positions operations are triggered asynchronously here in the + // background thread, so they may complete (with error) + // when the triggering UpdateFetchPositionsEvent has been already expired. Keep + // exception saved to be thrown on the next call to update positions. + cachedUpdatePositionsException.set(error); + } + }); + + try { + Review Comment: Super-nit: extra newline. ```suggestion ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -42,6 +42,7 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; + Review Comment: Super nit: extra whitespace: ```suggestion ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchO result.fetchedOffsets)); } + private boolean maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) { + Throwable exception = cachedUpdatePositionsException.getAndSet(null); + if (exception != null) { + // Return exception that may have been encountered on a previous attempt to update + // positions, after the triggering event had already expired. + result.completeExceptionally(exception); + return true; + } + return false; + } + + public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) { + CompletableFuture<Boolean> result = new CompletableFuture<>(); + + if (maybeCompleteWithPreviousKnownException(result)) { + return result; + } + + result.whenComplete((__, error) -> { + boolean updatePositionsExpired = time.milliseconds() >= deadlineMs; + if (error != null && updatePositionsExpired) { + // Update fetch positions operations are triggered asynchronously here in the + // background thread, so they may complete (with error) + // when the triggering UpdateFetchPositionsEvent has been already expired. Keep + // exception saved to be thrown on the next call to update positions. + cachedUpdatePositionsException.set(error); + } + }); + + try { + + // Validate positions using the partition leader end offsets, to detect if any partition + // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch + // request, retrieve the partition end offsets, and validate the current position + // against it. It will throw an exception if log truncation is detected. + validatePositionsIfNeeded(); + + boolean hasAllFetchPositions = subscriptionState.hasAllFetchPositions(); + if (hasAllFetchPositions) { + result.complete(true); + return result; + } + + // Reset positions using committed offsets retrieved from the group coordinator, for any + // partitions which do not have a valid position and are not awaiting reset. This will + // trigger an OffsetFetch request and update positions with the offsets retrieved. + if (commitRequestManager != null) { + CompletableFuture<Void> initWithCommittedOffsetsResult = initWithCommittedOffsetsIfNeeded(deadlineMs); + initWithCommittedOffsetsResult.whenComplete((__, error) -> { + if (error == null) { + initWithPartitionOffsetsIfNeeded(result); + } else { + result.completeExceptionally(error); + } + }); + } else { + initWithPartitionOffsetsIfNeeded(result); + } + } catch (Exception e) { + result.completeExceptionally(maybeWrapAsKafkaException(e)); + } + return result; + } + + /** + * If there are partitions still needing a position and a reset policy is defined, request + * reset using the default policy. + * + * @param result Future that will complete when the reset operation completes. + * @throws NoOffsetForPartitionException If no reset strategy is configured + */ + private void initWithPartitionOffsetsIfNeeded(CompletableFuture<Boolean> result) { + try { + // Mark partitions that need reset, using the configured reset strategy. If no + // strategy is defined, this will raise a NoOffsetForPartitionException exception. + subscriptionState.resetInitializingPositions(); + } catch (Exception e) { + result.completeExceptionally(e); + return; + } + + // For partitions awaiting reset, generate a ListOffset request to retrieve the partition + // offsets according to the strategy (ex. earliest, latest), and update the positions. + resetPositionsIfNeeded().whenComplete((resetResult, error) -> { + if (error == null) { + result.complete(false); + } else { + result.completeExceptionally(error); + } + }); + } + + // Visible for testing + boolean hasPendingOffsetFetchEvent() { + return pendingOffsetFetchEvent != null; + } + + /** + * Fetch the committed offsets for partitions that require initialization. Use them to set + * the fetch positions in the subscription state. + * + * @throws TimeoutException If offsets could not be retrieved within the timeout + */ + private CompletableFuture<Void> initWithCommittedOffsetsIfNeeded(long deadlineMs) { + final Set<TopicPartition> initializingPartitions = subscriptionState.initializingPartitions(); + + if (initializingPartitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + log.debug("Refreshing committed offsets for partitions {}", initializingPartitions); + CompletableFuture<Void> result = new CompletableFuture<>(); + + // The shorter the timeout provided to poll(), the more likely the offsets fetch will time out. To handle + // this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created + // (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the + // case it times out, subsequent attempts will also use the event in order to wait for the results. + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchCommittedFuture; + if (!canReusePendingOffsetFetchEvent(initializingPartitions)) { + if (pendingOffsetFetchEvent != null) { + // This will be the case where we were waiting for a fetch committed offsets request + // to update positions, but the set of initializing partitions changed. We need to + // cancel the pending future, to ensure that it's results are not used to update + pendingOffsetFetchEvent.result.cancel(true); + } + // Need to generate a new request to fetch committed offsets + final long fetchCommittedDeadlineMs = Math.max(deadlineMs, time.milliseconds() + defaultApiTimeoutMs); + fetchCommittedFuture = commitRequestManager.fetchOffsets(initializingPartitions, fetchCommittedDeadlineMs); + pendingOffsetFetchEvent = new PendingFetchCommittedRequest(initializingPartitions, + fetchCommittedFuture); + } else { + fetchCommittedFuture = pendingOffsetFetchEvent.result; + } + + // when the ongoing OffsetFetch completes, carry on with updating positions and + // completing the result future for the current attempt to initWithCommittedOffsetsIfNeeded + fetchCommittedFuture.whenComplete((offsets, error) -> { + pendingOffsetFetchEvent = null; + if (error instanceof CancellationException) { + // Abort updating positions + return; + } + // If an offset fetch triggered to update positions finishes without being + // cancelled, we update positions even if the original event expired. The event + // is cancelled whenever the set of partitions to initialize changes + if (error == null) { + refreshCommittedOffsets(offsets, metadata, subscriptionState); + result.complete(null); + } else { + log.error("Error fetching committed offsets to update positions", error); + result.completeExceptionally(error); + } + }); + + return result; + } + + /** + * This determines if the {@link #pendingOffsetFetchEvent pending offset fetch event} can be reused. Reuse + * is only possible if all the following conditions are true: + * + * <ul> + * <li>A pending offset fetch event exists</li> + * <li>The partition set of the pending offset fetch event is the same as the given partition set</li> + * </ul> + */ + private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> partitions) { + if (pendingOffsetFetchEvent == null) { + return false; + } + + return pendingOffsetFetchEvent.requestedPartitions.equals(partitions); + } Review Comment: The corresponding code in `AsyncKafkaConsumer.canReusePendingOffsetFetchEvent` also checked that the `pendingOffsetFetchEvent`’s deadline hadn't passed. Is that no longer a concern after the refactor? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ########## @@ -90,9 +88,9 @@ public class OffsetsRequestManagerTest { private OffsetsRequestManager requestManager; private ConsumerMetadata metadata; private SubscriptionState subscriptionState; - private MockTime time; + private final Time time = mock(Time.class); Review Comment: Out of curiosity, what prompted this change? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -182,6 +216,180 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchO result.fetchedOffsets)); } + private boolean maybeCompleteWithPreviousKnownException(CompletableFuture<Boolean> result) { + Throwable exception = cachedUpdatePositionsException.getAndSet(null); + if (exception != null) { + // Return exception that may have been encountered on a previous attempt to update + // positions, after the triggering event had already expired. + result.completeExceptionally(exception); + return true; + } + return false; + } + + public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) { + CompletableFuture<Boolean> result = new CompletableFuture<>(); + + if (maybeCompleteWithPreviousKnownException(result)) { + return result; + } + + result.whenComplete((__, error) -> { + boolean updatePositionsExpired = time.milliseconds() >= deadlineMs; + if (error != null && updatePositionsExpired) { + // Update fetch positions operations are triggered asynchronously here in the + // background thread, so they may complete (with error) + // when the triggering UpdateFetchPositionsEvent has been already expired. Keep + // exception saved to be thrown on the next call to update positions. + cachedUpdatePositionsException.set(error); + } + }); + + try { + + // Validate positions using the partition leader end offsets, to detect if any partition + // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch + // request, retrieve the partition end offsets, and validate the current position + // against it. It will throw an exception if log truncation is detected. + validatePositionsIfNeeded(); + + boolean hasAllFetchPositions = subscriptionState.hasAllFetchPositions(); + if (hasAllFetchPositions) { + result.complete(true); + return result; + } Review Comment: I don't think we need the temporary variable with the refactored code. ```suggestion if (subscriptionState.hasAllFetchPositions()) { result.complete(true); return result; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org