lianetm commented on code in PR #17035: URL: https://github.com/apache/kafka/pull/17035#discussion_r1819038665
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1606,6 +1618,73 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); } + /** + * This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}. + * + * <p/> + * + * This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} method + * of the same name: + * + * <ul> + * <li> + * The method will wait for confirmation of the request creation before continuing. + * </li> + * <li> + * The method will throw exceptions encountered during request creation to the user <b>immediately</b>. + * </li> + * <li> + * The method will suppress {@link TimeoutException}s that occur while waiting for the confirmation. + * Timeouts during request creation are a byproduct of this consumer's thread communication mechanisms. + * That exception type isn't thrown in the request creation step of the {@link ClassicKafkaConsumer}. + * Additionally, timeouts will not impact the logic of {@link #pollForFetches(Timer) blocking requests} + * as it can handle requests that are created after the timeout. + * </li> + * </ul> + * + * @param timer Timer used to bound how long the consumer waits for the requests to be created, which in practice + * is used to avoid using {@link Long#MAX_VALUE} to wait "forever" + */ + private void sendFetches(Timer timer) { + try { + applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(calculateDeadlineMs(timer))); + } catch (TimeoutException e) { + // Can be ignored, per above comments. + } + } + + /** + * This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the + * prefetch case, i.e. right before {@link #poll(Duration)} exits. + * + * <p/> + * + * This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} + * {@code sendFetches()} method that appears at the end of {@link ClassicKafkaConsumer#poll(Duration)}: + * + * <ul> + * <li> + * The method will wait for confirmation of the request creation before continuing. Review Comment: This is not true now for prefetching that uses `.add` instead of `.addAndGet`, should we remove this line? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -3386,6 +3394,71 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform assertTrue(subscriptions.isFetchable(tp1)); } + @Test + public void testPollWithoutCreateFetchRequests() { + buildFetcher(); + + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); Review Comment: thinking more I say it's better to keep them even if they are setting up a condition that it's truly not used in the test. The reason is really that if ever there's a regression and we wrongfully send a "ghost" request in the sendFetches(false), we don't want to miss catching it just because some other conditions are missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org