lianetm commented on code in PR #16885: URL: https://github.com/apache/kafka/pull/16885#discussion_r1724967393
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) { } } - private void process(final ResetPositionsEvent event) { - CompletableFuture<Void> future = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - future.whenComplete(complete(event.future())); + /** + * + * Fetch committed offsets and use them to update positions in the subscription state. If no + * committed offsets available, fetch offsets from the leader. + */ + private void process(final UpdateFetchPositionsEvent updateFetchPositionsEvent) { + try { + // The event could be completed in the app thread before it got to be + // processed in the background (ex. interrupted) + if (updateFetchPositionsEvent.future().isCompletedExceptionally()) { + log.debug("UpdateFetchPositions event {} was completed exceptionally before it " + + "got time to be processed.", updateFetchPositionsEvent); + return; + } + + // Validate positions using the partition leader end offsets, to detect if any partition + // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch + // request, retrieve the partition end offsets, and validate the current position + // against it. It will throw an exception if log truncation is detected. + requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); Review Comment: it throws an exception ([here](https://github.com/apache/kafka/blob/0eaaff88cf68bc2c24d4874ff9bc1cc2b493c24b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java#L194)) when it checks for the partitions it needs to validate. Even though it does return a future, this is one of the parts I was trying to simplify with this refactoring. This is my reasoning: 1. validate positions is an operation whose only purpose is to detect log truncation. So it fires a request, and when it gets a response it looks for truncation. If it detects it, it saves the exception to be thrown on the next call to validate (common concept and behaviour up to here, on the 2 consumers) 2. based on the conceptual definition above, the classic consumer triggers it as an async operation and does not wait for a response to move on and attempt to reset positions with committed offsets or partition offsets So, with the async consumer doing all the updates in the background now, seemed easy to simplify and do the same: trigger validation as an async (no waiting for the result future to complete), carry on with reset, throw log truncation if any on the next call. Note that one fair concern with not chaining the validate request is how to ensure it won't be storming the broker with requests. That does not happen because it already sets the pace of requests to send based on the subscriptionState allowedRetries (see [here](https://github.com/apache/kafka/blob/0eaaff88cf68bc2c24d4874ff9bc1cc2b493c24b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L269)). This ensures that whenever a validation request is sent, it waits up to the requestTimeout before sending a new one). Makes sense? I could be missing something but seems to me we already get the behaviour we want without having to play with the futures here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org