lianetm commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1956606014
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -386,38 +360,10 @@ public void testCommitAsync() { @Test public void testCommitAsyncWithEmptyOffsets() { subscriptionState = mock(SubscriptionState.class); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); - TopicPartition tp = new TopicPartition("topic", 1); - OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, Optional.of(1), ""); - Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, offsetAndMetadata); - doReturn(offsets).when(subscriptionState).allConsumed(); - - CommitRequestManager commitRequestManager = create(true, 100); - CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(Optional.empty()); - assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size()); - List<NetworkClientDelegate.FutureCompletionHandler> pollResults = assertPoll(1, commitRequestManager); - pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse( - "topic", - 1, - (short) 1, - Errors.NONE))); - - verify(subscriptionState).allConsumed(); - verify(metadata).updateLastSeenEpochIfNewer(tp, 1); - assertTrue(future.isDone()); - Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get()); - assertEquals(offsets, commitOffsets); - } - - @Test - public void testCommitAsyncWithEmptyAllConsumedOffsets() { - subscriptionState = mock(SubscriptionState.class); - doReturn(Map.of()).when(subscriptionState).allConsumed(); CommitRequestManager commitRequestManager = create(true, 100); - CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(Optional.empty()); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = commitRequestManager.commitAsync(Collections.emptyMap()); - verify(subscriptionState).allConsumed(); assertTrue(future.isDone()); Review Comment: should we add the check to make sure that no request is added to the queue if empty offsets? (could happen even is the future isDone) `assertPoll(0, commitRequestManager);` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -637,6 +634,24 @@ private void maybeUpdateLastSeenEpochIfNewer(final Map<TopicPartition, OffsetAnd }); } + /** + * This is a non-blocking method to update timer and trigger async auto-commit. + * <p> + * This method performs two main tasks: + * <ol> + * <li>Updates the internal timer with the current time.</li> + * <li>Initiate an asynchronous auto-commit operation for all consumed messages if needed.</li> Review Comment: ```suggestion * <li>Initiate an asynchronous auto-commit operation for all consumed positions if needed.</li> ``` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -1515,6 +1522,8 @@ public void testGroupRemoteAssignorUsedInConsumerProtocol() { consumer = newConsumer(config); assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); + // To unblock commitSyncAllConsumed on close + markOffsetsReadyForCommitEvent(); Review Comment: ditto ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -1539,6 +1548,8 @@ public void testGroupIdNotNullAndValid() { assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); + // To unblock commitSyncAllConsumed on close + markOffsetsReadyForCommitEvent(); Review Comment: makes sense, but given that this test is unrelated to commits, maybe a cleaner approach is to simply disable set ENABLE_AUTO_COMMIT_CONFIG=false? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -595,18 +540,20 @@ public void testAutocommitEnsureOnlyOneInflightRequest() { CommitRequestManager commitRequestManager = create(true, 100); time.sleep(100); - commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds()); List<NetworkClientDelegate.FutureCompletionHandler> futures = assertPoll(1, commitRequestManager); time.sleep(100); - commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds()); // We want to make sure we don't resend autocommit if the previous request has not been // completed, even if the interval expired assertPoll(0, commitRequestManager); assertEmptyPendingRequests(commitRequestManager); // complete the unsent request and re-poll futures.get(0).onComplete(buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap<>()))); + time.sleep(100); Review Comment: why do we need to wait for the interval here again? It expired on ln 546 above, so the next one should be generated as soon as we receive a response and have a pollEvent I expect, no more waiting. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -1383,9 +1398,7 @@ public void registerStateListener(MemberStateListener listener) { * time-sensitive operations should be performed */ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { - if (state == MemberState.RECONCILING) { - maybeReconcile(); - } + maybeReconcile(false); Review Comment: I couldn't find coverage for this and it would be a nasty regression (messing committed offsets). Let's pls add a simple test to check that this mananger.poll calls reconcile with canCommit false. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -206,13 +206,25 @@ public void process(ApplicationEvent event) { } private void process(final PollEvent event) { + // Trigger a reconciliation that can safely commit offsets if needed to revoke partitions, + // as we're processing before any new fetching starts in the app thread + requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager -> + consumerMembershipManager.maybeReconcile(true)); if (requestManagers.commitRequestManager.isPresent()) { - requestManagers.commitRequestManager.ifPresent(m -> m.updateAutoCommitTimer(event.pollTimeMs())); + CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); + commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs()); + // all commit request generation points have been passed, + // so it's safe to notify the app thread could start the next poll cycle Review Comment: ```suggestion // so it's safe to notify the app thread could proceed and start fetching ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org