lianetm commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1724967393


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     *
+     * Fetch committed offsets and use them to update positions in the 
subscription state. If no
+     * committed offsets available, fetch offsets from the leader.
+     */
+    private void process(final UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        try {
+            // The event could be completed in the app thread before it got to 
be
+            // processed in the background (ex. interrupted)
+            if (updateFetchPositionsEvent.future().isCompletedExceptionally()) 
{
+                log.debug("UpdateFetchPositions event {} was completed 
exceptionally before it " +
+                    "got time to be processed.", updateFetchPositionsEvent);
+                return;
+            }
+
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position
+            // against it. It will throw an exception if log truncation is 
detected.
+            requestManagers.offsetsRequestManager.validatePositionsIfNeeded();

Review Comment:
   it throws an exception 
([here](https://github.com/apache/kafka/blob/0eaaff88cf68bc2c24d4874ff9bc1cc2b493c24b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java#L194))
 when it checks for the partitions it needs to validate. Even though it does 
return a future, this is one of the parts I was trying to simplify with this 
refactoring. This is my reasoning:
   1. validate positions is an operation whose only purpose is to detect log 
truncation. So it fires a request, and when it gets a response it looks for 
truncation. If it detects it, it saves the exception to be thrown on the next 
call to validate (common concept and behaviour up to here, on the 2 consumers)
   2. based on the conceptual definition above, the classic consumer triggers 
it as an async operation and does not wait for a response to move on and 
attempt to reset positions with committed offsets or partition offsets
   So, with the async consumer doing all the updates in the background now, 
seemed easy to simplify and do the same: trigger validation as an async (no 
waiting for the result future to complete), carry on with reset, throw log 
truncation if any on the next call.  
   
   Note that one fair concern with not chaining the validate request is how to 
ensure it won't be storming the broker with requests. That does not happen 
because it already sets the pace of requests to send based on the 
subscriptionState allowedRetries (see 
[here](https://github.com/apache/kafka/blob/0eaaff88cf68bc2c24d4874ff9bc1cc2b493c24b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L269)).
 This ensures that whenever a validation request is sent, it waits up to the 
requestTimeout before sending a new one).
   
   Makes sense? I could be missing something but seems to me we already get the 
behaviour we want without having to play with the futures here.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to