lianetm commented on code in PR #16885: URL: https://github.com/apache/kafka/pull/16885#discussion_r1725091254
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1581,48 +1579,37 @@ private Fetch<K, V> collectFetch() { return fetch; } + /** * Set the fetch position to the committed position (if there is one) * or reset it using the offset reset policy the user has configured. * - * @throws AuthenticationException If authentication fails. See the exception for more details - * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is - * defined * @return true iff the operation completed without timing out + * @throws AuthenticationException If authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined */ private boolean updateFetchPositions(final Timer timer) { + UpdateFetchPositionsEvent updateFetchPositionsEvent = null; try { - // Validate positions using the partition leader end offsets, to detect if any partition - // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch - // request, retrieve the partition end offsets, and validate the current position against it. - applicationEventHandler.addAndGet(new ValidatePositionsEvent(calculateDeadlineMs(timer))); - - cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); - if (cachedSubscriptionHasAllFetchPositions) return true; - - // Reset positions using committed offsets retrieved from the group coordinator, for any - // partitions which do not have a valid position and are not awaiting reset. This will - // trigger an OffsetFetch request and update positions with the offsets retrieved. This - // will only do a coordinator lookup if there are partitions which have missing - // positions, so a consumer with manually assigned partitions can avoid a coordinator - // dependence by always ensuring that assigned partitions have an initial position. - if (isCommittedOffsetsManagementEnabled() && !initWithCommittedOffsetsIfNeeded(timer)) - return false; - - // If there are partitions still needing a position and a reset policy is defined, - // request reset using the default policy. If no reset strategy is defined and there - // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. - subscriptions.resetInitializingPositions(); - - // Reset positions using partition offsets retrieved from the leader, for any partitions - // which are awaiting reset. This will trigger a ListOffset request, retrieve the - // partition offsets according to the strategy (ex. earliest, latest), and update the - // positions. - applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer))); - return true; + updateFetchPositionsEvent = new UpdateFetchPositionsEvent(calculateDeadlineMs(timer), + calculateDeadlineMs(time, defaultApiTimeoutMs)); + wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future()); + + if (Thread.interrupted()) { + // Ensure we propagate the interrupted exception if the thread was interrupted + // before the updateFetchPositions event is processed. Otherwise, this exception + // could be swallowed if event is processed fast enough in the background after + // being added, so that it's already completed when getting the result Review Comment: Agreed, moved. This is a general case. If the thread is interrupted we need to ensure that we propagate the exception (to cover the case where we don't have to wait because the event gets processed in-between the addAndGet, add -> event completes -> get) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org