lianetm commented on code in PR #17035:
URL: https://github.com/apache/kafka/pull/17035#discussion_r1819038665


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1606,6 +1618,73 @@ private void updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, OffsetAnd
             offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
     }
 
+    /**
+     * This method signals the background thread to {@link 
CreateFetchRequestsEvent create fetch requests}.
+     *
+     * <p/>
+     *
+     * This method takes the following steps to maintain compatibility with 
the {@link ClassicKafkaConsumer} method
+     * of the same name:
+     *
+     * <ul>
+     *     <li>
+     *         The method will wait for confirmation of the request creation 
before continuing.
+     *     </li>
+     *     <li>
+     *         The method will throw exceptions encountered during request 
creation to the user <b>immediately</b>.
+     *     </li>
+     *     <li>
+     *         The method will suppress {@link TimeoutException}s that occur 
while waiting for the confirmation.
+     *         Timeouts during request creation are a byproduct of this 
consumer's thread communication mechanisms.
+     *         That exception type isn't thrown in the request creation step 
of the {@link ClassicKafkaConsumer}.
+     *         Additionally, timeouts will not impact the logic of {@link 
#pollForFetches(Timer) blocking requests}
+     *         as it can handle requests that are created after the timeout.
+     *     </li>
+     * </ul>
+     *
+     * @param timer Timer used to bound how long the consumer waits for the 
requests to be created, which in practice
+     *              is used to avoid using {@link Long#MAX_VALUE} to wait 
"forever"
+     */
+    private void sendFetches(Timer timer) {
+        try {
+            applicationEventHandler.addAndGet(new 
CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
+        } catch (TimeoutException e) {
+            // Can be ignored, per above comments.
+        }
+    }
+
+    /**
+     * This method signals the background thread to {@link 
CreateFetchRequestsEvent create fetch requests} for the
+     * prefetch case, i.e. right before {@link #poll(Duration)} exits.
+     *
+     * <p/>
+     *
+     * This method takes the following steps to maintain compatibility with 
the {@link ClassicKafkaConsumer}
+     * {@code sendFetches()} method that appears at the end of {@link 
ClassicKafkaConsumer#poll(Duration)}:
+     *
+     * <ul>
+     *     <li>
+     *         The method will wait for confirmation of the request creation 
before continuing.

Review Comment:
   This is not true now for prefetching that uses `.add` instead of 
`.addAndGet`, should we remove this line?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -3386,6 +3394,71 @@ public void 
testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform
         assertTrue(subscriptions.isFetchable(tp1));
     }
 
+    @Test
+    public void testPollWithoutCreateFetchRequests() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);

Review Comment:
   thinking more I say it's better to keep them even if they are setting up a 
condition that it's truly not used in the test. The reason is really that if 
ever there's a regression and we wrongfully send a "ghost" request in the 
sendFetches(false), we don't want to miss catching it just because some other 
conditions are missing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to