AndrewJSchofield commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1723189652


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     *
+     * Fetch committed offsets and use them to update positions in the 
subscription state. If no
+     * committed offsets available, fetch offsets from the leader.
+     */
+    private void process(final UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        try {
+            // The event could be completed in the app thread before it got to 
be
+            // processed in the background (ex. interrupted)
+            if (updateFetchPositionsEvent.future().isCompletedExceptionally()) 
{
+                log.debug("UpdateFetchPositions event {} was completed 
exceptionally before it " +
+                    "got time to be processed.", updateFetchPositionsEvent);
+                return;
+            }
+
+            // Validate positions using the partition leader end offsets, to 
detect if any partition
+            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
+            // request, retrieve the partition end offsets, and validate the 
current position
+            // against it. It will throw an exception if log truncation is 
detected.
+            requestManagers.offsetsRequestManager.validatePositionsIfNeeded();

Review Comment:
   Yes, I think it should be chained. Also, I don't think it does throw an 
exception if log truncation is detected. Completing the future exceptionally is 
not quite the same thing.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     *

Review Comment:
   nit: Unnecessary empty line.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -118,7 +116,6 @@ public OffsetsRequestManager(final SubscriptionState 
subscriptionState,
         this.requestTimeoutMs = requestTimeoutMs;
         this.apiVersions = apiVersions;
         this.networkClientDelegate = networkClientDelegate;
-        this.backgroundEventHandler = backgroundEventHandler;

Review Comment:
   I don't think you need the BackgroundEventHandler parameter any longer 
either.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
         }
     }
 
-    private void process(final ResetPositionsEvent event) {
-        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        future.whenComplete(complete(event.future()));
+    /**
+     *
+     * Fetch committed offsets and use them to update positions in the 
subscription state. If no
+     * committed offsets available, fetch offsets from the leader.
+     */
+    private void process(final UpdateFetchPositionsEvent 
updateFetchPositionsEvent) {
+        try {
+            // The event could be completed in the app thread before it got to 
be
+            // processed in the background (ex. interrupted)
+            if (updateFetchPositionsEvent.future().isCompletedExceptionally()) 
{
+                log.debug("UpdateFetchPositions event {} was completed 
exceptionally before it " +
+                    "got time to be processed.", updateFetchPositionsEvent);
+                return;
+            }

Review Comment:
   Agreed. This sounds like either a non-problem or a problem for all of the 
event types.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1581,48 +1579,37 @@ private Fetch<K, V> collectFetch() {
 
         return fetch;
     }
+
     /**
      * Set the fetch position to the committed position (if there is one)
      * or reset it using the offset reset policy the user has configured.
      *
-     * @throws AuthenticationException If authentication fails. See the 
exception for more details
-     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
-     *             defined
      * @return true iff the operation completed without timing out
+     * @throws AuthenticationException       If authentication fails. See the 
exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
+     *                                       defined
      */
     private boolean updateFetchPositions(final Timer timer) {
+        UpdateFetchPositionsEvent updateFetchPositionsEvent = null;
         try {
-            // Validate positions using the partition leader end offsets, to 
detect if any partition
-            // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
-            // request, retrieve the partition end offsets, and validate the 
current position against it.
-            applicationEventHandler.addAndGet(new 
ValidatePositionsEvent(calculateDeadlineMs(timer)));
-
-            cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
-            if (cachedSubscriptionHasAllFetchPositions) return true;
-
-            // Reset positions using committed offsets retrieved from the 
group coordinator, for any
-            // partitions which do not have a valid position and are not 
awaiting reset. This will
-            // trigger an OffsetFetch request and update positions with the 
offsets retrieved. This
-            // will only do a coordinator lookup if there are partitions which 
have missing
-            // positions, so a consumer with manually assigned partitions can 
avoid a coordinator
-            // dependence by always ensuring that assigned partitions have an 
initial position.
-            if (isCommittedOffsetsManagementEnabled() && 
!initWithCommittedOffsetsIfNeeded(timer))
-                return false;
-
-            // If there are partitions still needing a position and a reset 
policy is defined,
-            // request reset using the default policy. If no reset strategy is 
defined and there
-            // are partitions with a missing position, then we will raise a 
NoOffsetForPartitionException exception.
-            subscriptions.resetInitializingPositions();
-
-            // Reset positions using partition offsets retrieved from the 
leader, for any partitions
-            // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
-            // partition offsets according to the strategy (ex. earliest, 
latest), and update the
-            // positions.
-            applicationEventHandler.addAndGet(new 
ResetPositionsEvent(calculateDeadlineMs(timer)));
-            return true;
+            updateFetchPositionsEvent = new 
UpdateFetchPositionsEvent(calculateDeadlineMs(timer),
+                calculateDeadlineMs(time, defaultApiTimeoutMs));
+            wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future());
+
+            if (Thread.interrupted()) {
+                // Ensure we propagate the interrupted exception if the thread 
was interrupted
+                // before the updateFetchPositions event is processed. 
Otherwise, this exception
+                // could be swallowed if event is processed fast enough in the 
background after
+                // being added, so that it's already completed when getting 
the result

Review Comment:
   I agree. This doesn't make sense to me. If we want to check for thread 
interruption, it should be within the `addAndGet` below so that it applies to 
all of the timed waits for the background thread to process events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to