kirktrue commented on code in PR #16885: URL: https://github.com/apache/kafka/pull/16885#discussion_r1722491854
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) { } } - private void process(final ResetPositionsEvent event) { - CompletableFuture<Void> future = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - future.whenComplete(complete(event.future())); + /** + * + * Fetch committed offsets and use them to update positions in the subscription state. If no + * committed offsets available, fetch offsets from the leader. + */ + private void process(final UpdateFetchPositionsEvent updateFetchPositionsEvent) { + try { + // The event could be completed in the app thread before it got to be + // processed in the background (ex. interrupted) + if (updateFetchPositionsEvent.future().isCompletedExceptionally()) { + log.debug("UpdateFetchPositions event {} was completed exceptionally before it " + + "got time to be processed.", updateFetchPositionsEvent); + return; + } + + // Validate positions using the partition leader end offsets, to detect if any partition + // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch + // request, retrieve the partition end offsets, and validate the current position + // against it. It will throw an exception if log truncation is detected. + requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); Review Comment: `validatePositionsIfNeeded()` returns a `CompletableFuture`. We need to wait for it to be completed before continuing, right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UpdateFetchPositionsEvent.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.SubscriptionState; + +/** + * Event to update the position to fetch from. This will use the committed offsets if available. + * If no committed offsets exist, it will use the partition offsets. + * + * <p/> + * + * The event completes with a boolean value indicating if all assigned partitions already had + * valid fetch positions (based on {@link SubscriptionState#hasAllFetchPositions()}). + */ +public class UpdateFetchPositionsEvent extends CompletableApplicationEvent<Boolean> { + + /** + * Deadline to complete the UpdateFetchPositionsEvent. If the event does not complete before + * this time, it will be completed exceptionally with a TimeoutException. + */ + private final long deadlineMs; Review Comment: The `CompletableApplicationEvent` already has a `deadlineMs` to control the event's lifetime. What's the benefit to reproducing that logic in this subclass? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -52,15 +59,27 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven private final ConsumerMetadata metadata; private final SubscriptionState subscriptions; private final RequestManagers requestManagers; + private final Time time; + + /** + * OffsetFetch request triggered to update fetch positions. The request is kept. It will be + * cleared every time a response with the committed offsets is received and used to update + * fetch positions. If the response cannot be used because the UpdateFetchPositions expired, + * it will be kept to be used on the next attempt to update fetch positions if partitions + * remain the same. + */ + private FetchCommittedOffsetsEvent pendingOffsetFetchEvent; Review Comment: We've introduced a lot of logic and state in this class. I'd really like to consider moving the bulk of `process()`, `initWithPartitionOffsetsIfNeeded()`, and `initWithPartitionOffsetsIfNeeded()` to a dedicated class. I'd prefer to keep `ApplicationEventProcessor` focused on dispatching events to their corresponding `RequestManager` method(s). ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) { } } - private void process(final ResetPositionsEvent event) { - CompletableFuture<Void> future = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - future.whenComplete(complete(event.future())); + /** + * + * Fetch committed offsets and use them to update positions in the subscription state. If no + * committed offsets available, fetch offsets from the leader. + */ + private void process(final UpdateFetchPositionsEvent updateFetchPositionsEvent) { + try { + // The event could be completed in the app thread before it got to be + // processed in the background (ex. interrupted) + if (updateFetchPositionsEvent.future().isCompletedExceptionally()) { + log.debug("UpdateFetchPositions event {} was completed exceptionally before it " + + "got time to be processed.", updateFetchPositionsEvent); + return; + } + + // Validate positions using the partition leader end offsets, to detect if any partition + // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch + // request, retrieve the partition end offsets, and validate the current position + // against it. It will throw an exception if log truncation is detected. + requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); + + boolean hasAllFetchPositions = subscriptions.hasAllFetchPositions(); + if (hasAllFetchPositions) { + updateFetchPositionsEvent.future().complete(true); + return; + } + + // Reset positions using committed offsets retrieved from the group coordinator, for any + // partitions which do not have a valid position and are not awaiting reset. This will + // trigger an OffsetFetch request and update positions with the offsets retrieved. This + // will only do a coordinator lookup if there are partitions which have missing + // positions, so a consumer with manually assigned partitions can avoid a coordinator + // dependence by always ensuring that assigned partitions have an initial position. + if (requestManagers.commitRequestManager.isPresent()) { + CompletableFuture<Void> initWithCommittedOffsetsResult = initWithCommittedOffsetsIfNeeded(updateFetchPositionsEvent); + initWithCommittedOffsetsResult.whenComplete((__, error) -> { + if (error == null) { + // Retrieve partition offsets to init positions for partitions that still + // don't have a valid position + initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent); + } else { + updateFetchPositionsEvent.future().completeExceptionally(error); + } + }); + } else { + initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent); + } + Review Comment: Super nitโextra newline. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) { } } - private void process(final ResetPositionsEvent event) { - CompletableFuture<Void> future = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - future.whenComplete(complete(event.future())); + /** + * + * Fetch committed offsets and use them to update positions in the subscription state. If no + * committed offsets available, fetch offsets from the leader. + */ + private void process(final UpdateFetchPositionsEvent updateFetchPositionsEvent) { + try { + // The event could be completed in the app thread before it got to be + // processed in the background (ex. interrupted) + if (updateFetchPositionsEvent.future().isCompletedExceptionally()) { + log.debug("UpdateFetchPositions event {} was completed exceptionally before it " + + "got time to be processed.", updateFetchPositionsEvent); + return; + } + + // Validate positions using the partition leader end offsets, to detect if any partition + // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch + // request, retrieve the partition end offsets, and validate the current position + // against it. It will throw an exception if log truncation is detected. + requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); + + boolean hasAllFetchPositions = subscriptions.hasAllFetchPositions(); + if (hasAllFetchPositions) { + updateFetchPositionsEvent.future().complete(true); + return; + } + + // Reset positions using committed offsets retrieved from the group coordinator, for any + // partitions which do not have a valid position and are not awaiting reset. This will + // trigger an OffsetFetch request and update positions with the offsets retrieved. This + // will only do a coordinator lookup if there are partitions which have missing + // positions, so a consumer with manually assigned partitions can avoid a coordinator + // dependence by always ensuring that assigned partitions have an initial position. + if (requestManagers.commitRequestManager.isPresent()) { + CompletableFuture<Void> initWithCommittedOffsetsResult = initWithCommittedOffsetsIfNeeded(updateFetchPositionsEvent); + initWithCommittedOffsetsResult.whenComplete((__, error) -> { + if (error == null) { + // Retrieve partition offsets to init positions for partitions that still + // don't have a valid position + initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent); + } else { + updateFetchPositionsEvent.future().completeExceptionally(error); + } + }); + } else { + initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent); + } + + } catch (Exception e) { + updateFetchPositionsEvent.future().completeExceptionally(maybeWrapAsKafkaException(e)); + } } - private void process(final ValidatePositionsEvent event) { - CompletableFuture<Void> future = requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); - future.whenComplete(complete(event.future())); + private void initWithPartitionOffsetsIfNeeded(final UpdateFetchPositionsEvent updateFetchPositionsEvent) { + try { + // If there are partitions still needing a position and a reset policy is defined, + // request reset using the default policy. If no reset strategy is defined and there + // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. + subscriptions.resetInitializingPositions(); + + // Reset positions using partition offsets retrieved from the leader, for any partitions + // which are awaiting reset. This will trigger a ListOffset request, retrieve the + // partition offsets according to the strategy (ex. earliest, latest), and update the + // positions. + CompletableFuture<Void> resetPositionsFuture = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); + + resetPositionsFuture.whenComplete((result, error) -> { + if (updateFetchPositionsEvent.future().isDone()) { + log.debug("UpdateFetchPositions event {} had already expired when reset " + + "positions completed.", updateFetchPositionsEvent); + return; + } + if (error == null) { + updateFetchPositionsEvent.future().complete(false); + } else { + updateFetchPositionsEvent.future().completeExceptionally(error); + } + }); + } catch (Exception e) { + updateFetchPositionsEvent.future().completeExceptionally(e); + } + } + + /** + * Fetch the committed offsets for partitions that require initialization. Use them to set + * the fetch positions in the subscription state. + * + * @throws TimeoutException If offsets could not be retrieved within the timeout + */ + private CompletableFuture<Void> initWithCommittedOffsetsIfNeeded(UpdateFetchPositionsEvent updateFetchPositionsEvent) { + final Set<TopicPartition> initializingPartitions = subscriptions.initializingPartitions(); + + if (initializingPartitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + log.debug("Refreshing committed offsets for partitions {}", initializingPartitions); + CompletableFuture<Void> result = new CompletableFuture<>(); + + // The shorter the timeout provided to poll(), the more likely the offsets fetch will time out. To handle + // this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created + // (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the + // case it times out, subsequent attempts will also use the event in order to wait for the results. + if (!canReusePendingOffsetFetchEvent(initializingPartitions)) { + final long deadlineMs = Math.max(updateFetchPositionsEvent.deadlineMs(), updateFetchPositionsEvent.fetchOffsetsDeadlineMs()); Review Comment: It seems like we could possibly calculate the fetch offsets deadline here instead of including two deadlines in the event. Sorry, I'm kind of hung up on the two deadlines in the event ๐ ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) { } } - private void process(final ResetPositionsEvent event) { - CompletableFuture<Void> future = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - future.whenComplete(complete(event.future())); + /** + * + * Fetch committed offsets and use them to update positions in the subscription state. If no + * committed offsets available, fetch offsets from the leader. + */ + private void process(final UpdateFetchPositionsEvent updateFetchPositionsEvent) { + try { + // The event could be completed in the app thread before it got to be + // processed in the background (ex. interrupted) + if (updateFetchPositionsEvent.future().isCompletedExceptionally()) { + log.debug("UpdateFetchPositions event {} was completed exceptionally before it " + + "got time to be processed.", updateFetchPositionsEvent); + return; + } Review Comment: Similar to the code in `AsyncKafkaConsumer`, isn't this case true for any of the `CompletableApplicationEvent` subclasses? If that's the case, we should probably handle it in a consistent way at a higher layer ๐ค ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1581,48 +1579,37 @@ private Fetch<K, V> collectFetch() { return fetch; } + /** * Set the fetch position to the committed position (if there is one) * or reset it using the offset reset policy the user has configured. * - * @throws AuthenticationException If authentication fails. See the exception for more details - * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is - * defined * @return true iff the operation completed without timing out + * @throws AuthenticationException If authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined */ private boolean updateFetchPositions(final Timer timer) { + UpdateFetchPositionsEvent updateFetchPositionsEvent = null; try { - // Validate positions using the partition leader end offsets, to detect if any partition - // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch - // request, retrieve the partition end offsets, and validate the current position against it. - applicationEventHandler.addAndGet(new ValidatePositionsEvent(calculateDeadlineMs(timer))); - - cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); - if (cachedSubscriptionHasAllFetchPositions) return true; - - // Reset positions using committed offsets retrieved from the group coordinator, for any - // partitions which do not have a valid position and are not awaiting reset. This will - // trigger an OffsetFetch request and update positions with the offsets retrieved. This - // will only do a coordinator lookup if there are partitions which have missing - // positions, so a consumer with manually assigned partitions can avoid a coordinator - // dependence by always ensuring that assigned partitions have an initial position. - if (isCommittedOffsetsManagementEnabled() && !initWithCommittedOffsetsIfNeeded(timer)) - return false; - - // If there are partitions still needing a position and a reset policy is defined, - // request reset using the default policy. If no reset strategy is defined and there - // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. - subscriptions.resetInitializingPositions(); - - // Reset positions using partition offsets retrieved from the leader, for any partitions - // which are awaiting reset. This will trigger a ListOffset request, retrieve the - // partition offsets according to the strategy (ex. earliest, latest), and update the - // positions. - applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer))); - return true; + updateFetchPositionsEvent = new UpdateFetchPositionsEvent(calculateDeadlineMs(timer), + calculateDeadlineMs(time, defaultApiTimeoutMs)); Review Comment: Looking at just this calling code, it's a little hard to understand why we're passing in two different timeouts. Perhaps declaring some variables with meaningful names, adding some comments, or something might help? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1581,48 +1579,37 @@ private Fetch<K, V> collectFetch() { return fetch; } + /** * Set the fetch position to the committed position (if there is one) * or reset it using the offset reset policy the user has configured. * - * @throws AuthenticationException If authentication fails. See the exception for more details - * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is - * defined * @return true iff the operation completed without timing out + * @throws AuthenticationException If authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined */ private boolean updateFetchPositions(final Timer timer) { + UpdateFetchPositionsEvent updateFetchPositionsEvent = null; Review Comment: Nit: it doesn't seem like this event needs to be declared outside the `try`, does it? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1581,48 +1579,37 @@ private Fetch<K, V> collectFetch() { return fetch; } + /** * Set the fetch position to the committed position (if there is one) * or reset it using the offset reset policy the user has configured. * - * @throws AuthenticationException If authentication fails. See the exception for more details - * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is - * defined * @return true iff the operation completed without timing out + * @throws AuthenticationException If authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined */ private boolean updateFetchPositions(final Timer timer) { + UpdateFetchPositionsEvent updateFetchPositionsEvent = null; try { - // Validate positions using the partition leader end offsets, to detect if any partition - // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch - // request, retrieve the partition end offsets, and validate the current position against it. - applicationEventHandler.addAndGet(new ValidatePositionsEvent(calculateDeadlineMs(timer))); - - cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); - if (cachedSubscriptionHasAllFetchPositions) return true; - - // Reset positions using committed offsets retrieved from the group coordinator, for any - // partitions which do not have a valid position and are not awaiting reset. This will - // trigger an OffsetFetch request and update positions with the offsets retrieved. This - // will only do a coordinator lookup if there are partitions which have missing - // positions, so a consumer with manually assigned partitions can avoid a coordinator - // dependence by always ensuring that assigned partitions have an initial position. - if (isCommittedOffsetsManagementEnabled() && !initWithCommittedOffsetsIfNeeded(timer)) - return false; - - // If there are partitions still needing a position and a reset policy is defined, - // request reset using the default policy. If no reset strategy is defined and there - // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. - subscriptions.resetInitializingPositions(); - - // Reset positions using partition offsets retrieved from the leader, for any partitions - // which are awaiting reset. This will trigger a ListOffset request, retrieve the - // partition offsets according to the strategy (ex. earliest, latest), and update the - // positions. - applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer))); - return true; + updateFetchPositionsEvent = new UpdateFetchPositionsEvent(calculateDeadlineMs(timer), + calculateDeadlineMs(time, defaultApiTimeoutMs)); + wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future()); + + if (Thread.interrupted()) { + // Ensure we propagate the interrupted exception if the thread was interrupted + // before the updateFetchPositions event is processed. Otherwise, this exception + // could be swallowed if event is processed fast enough in the background after + // being added, so that it's already completed when getting the result Review Comment: I'm trying to wrap my head around the `Thread.interrupted()` check here. Per the comments, it seems like the case we're trying to prevent could happen on any of the events, right? ๐ค Or is updating fetched positions really a special case compared to everything else? Sorry for being a bit slow ๐ ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) { } } - private void process(final ResetPositionsEvent event) { - CompletableFuture<Void> future = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - future.whenComplete(complete(event.future())); + /** + * + * Fetch committed offsets and use them to update positions in the subscription state. If no + * committed offsets available, fetch offsets from the leader. + */ + private void process(final UpdateFetchPositionsEvent updateFetchPositionsEvent) { + try { + // The event could be completed in the app thread before it got to be + // processed in the background (ex. interrupted) + if (updateFetchPositionsEvent.future().isCompletedExceptionally()) { + log.debug("UpdateFetchPositions event {} was completed exceptionally before it " + + "got time to be processed.", updateFetchPositionsEvent); + return; + } + + // Validate positions using the partition leader end offsets, to detect if any partition + // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch + // request, retrieve the partition end offsets, and validate the current position + // against it. It will throw an exception if log truncation is detected. + requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); + + boolean hasAllFetchPositions = subscriptions.hasAllFetchPositions(); + if (hasAllFetchPositions) { + updateFetchPositionsEvent.future().complete(true); + return; + } + + // Reset positions using committed offsets retrieved from the group coordinator, for any + // partitions which do not have a valid position and are not awaiting reset. This will + // trigger an OffsetFetch request and update positions with the offsets retrieved. This + // will only do a coordinator lookup if there are partitions which have missing + // positions, so a consumer with manually assigned partitions can avoid a coordinator + // dependence by always ensuring that assigned partitions have an initial position. + if (requestManagers.commitRequestManager.isPresent()) { + CompletableFuture<Void> initWithCommittedOffsetsResult = initWithCommittedOffsetsIfNeeded(updateFetchPositionsEvent); + initWithCommittedOffsetsResult.whenComplete((__, error) -> { + if (error == null) { + // Retrieve partition offsets to init positions for partitions that still + // don't have a valid position + initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent); + } else { + updateFetchPositionsEvent.future().completeExceptionally(error); + } + }); + } else { + initWithPartitionOffsetsIfNeeded(updateFetchPositionsEvent); + } + + } catch (Exception e) { + updateFetchPositionsEvent.future().completeExceptionally(maybeWrapAsKafkaException(e)); + } } - private void process(final ValidatePositionsEvent event) { - CompletableFuture<Void> future = requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); - future.whenComplete(complete(event.future())); + private void initWithPartitionOffsetsIfNeeded(final UpdateFetchPositionsEvent updateFetchPositionsEvent) { + try { + // If there are partitions still needing a position and a reset policy is defined, + // request reset using the default policy. If no reset strategy is defined and there + // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. + subscriptions.resetInitializingPositions(); + + // Reset positions using partition offsets retrieved from the leader, for any partitions + // which are awaiting reset. This will trigger a ListOffset request, retrieve the + // partition offsets according to the strategy (ex. earliest, latest), and update the + // positions. + CompletableFuture<Void> resetPositionsFuture = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); + + resetPositionsFuture.whenComplete((result, error) -> { + if (updateFetchPositionsEvent.future().isDone()) { + log.debug("UpdateFetchPositions event {} had already expired when reset " + + "positions completed.", updateFetchPositionsEvent); + return; + } + if (error == null) { + updateFetchPositionsEvent.future().complete(false); + } else { + updateFetchPositionsEvent.future().completeExceptionally(error); + } + }); + } catch (Exception e) { + updateFetchPositionsEvent.future().completeExceptionally(e); Review Comment: Do we need to wrap this exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org