kirktrue commented on code in PR #17035: URL: https://github.com/apache/kafka/pull/17035#discussion_r1819493993
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1606,6 +1618,73 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); } + /** + * This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}. + * + * <p/> + * + * This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} method + * of the same name: + * + * <ul> + * <li> + * The method will wait for confirmation of the request creation before continuing. + * </li> + * <li> + * The method will throw exceptions encountered during request creation to the user <b>immediately</b>. + * </li> + * <li> + * The method will suppress {@link TimeoutException}s that occur while waiting for the confirmation. + * Timeouts during request creation are a byproduct of this consumer's thread communication mechanisms. + * That exception type isn't thrown in the request creation step of the {@link ClassicKafkaConsumer}. + * Additionally, timeouts will not impact the logic of {@link #pollForFetches(Timer) blocking requests} + * as it can handle requests that are created after the timeout. + * </li> + * </ul> + * + * @param timer Timer used to bound how long the consumer waits for the requests to be created, which in practice + * is used to avoid using {@link Long#MAX_VALUE} to wait "forever" + */ + private void sendFetches(Timer timer) { + try { + applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(calculateDeadlineMs(timer))); + } catch (TimeoutException e) { + // Can be ignored, per above comments. + } + } + + /** + * This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the + * prefetch case, i.e. right before {@link #poll(Duration)} exits. + * + * <p/> + * + * This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} + * {@code sendFetches()} method that appears at the end of {@link ClassicKafkaConsumer#poll(Duration)}: + * + * <ul> + * <li> + * The method will wait for confirmation of the request creation before continuing. Review Comment: Good catch. Reworded to state that it will not wait for confirmation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org