frankvicky commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1950777170
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -220,8 +221,10 @@ public void testPollEnsureAutocommitSent() { assertPoll(0, commitRequestManager); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.maybeAutoCommitAsync(); Review Comment: Another question: Do we need a helper method in `commitRequestManager` to combine the invocation of `updateAutoCommitTimer` and `maybeAutoCommitAsync`? Something like `updateTimerAndMaybeCommit` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ########## @@ -446,14 +447,17 @@ public void testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() { @MethodSource("offsetsGenerator") public void testSyncCommitEvent(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) { SyncCommitEvent event = new SyncCommitEvent(offsets, 12345); + Map<TopicPartition, OffsetAndMetadata> actualOffsets = offsets.orElse(Collections.emptyMap()); setupProcessor(true); - doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets, 12345); + doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(actualOffsets, 12345); + doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed(); processor.process(event); - verify(commitRequestManager).commitSync(offsets, 12345); + verify(commitRequestManager).commitSync(actualOffsets, 12345); Map<TopicPartition, OffsetAndMetadata> committedOffsets = assertDoesNotThrow(() -> event.future().get()); - assertEquals(offsets.orElse(Map.of()), committedOffsets); + assertTrue(event.offsetsReady.isDone()); Review Comment: This is to make sure `offsetsReady` has been completed. I am not sure if it gives us enough protection or if we need an independent test. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -186,7 +186,6 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return drainPendingOffsetCommitRequests(); } - maybeAutoCommitAsync(); Review Comment: If I understand correctly, we no longer allow the background thread to trigger auto-commit freely. Therefore, I removed this line. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -220,8 +221,10 @@ public void testPollEnsureAutocommitSent() { assertPoll(0, commitRequestManager); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.maybeAutoCommitAsync(); Review Comment: We no longer invoke `maybeAutoCommitAsync` during poll, so we must manually invoke it here to ensure a pending request has been generated. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -417,15 +415,14 @@ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(fin * an expected retriable error. * @return Future that will complete when a successful response */ - public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitSync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, + public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final long deadlineMs) { - Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(subscriptions::allConsumed); - if (commitOffsets.isEmpty()) { + if (offsets.isEmpty()) { return CompletableFuture.completedFuture(Map.of()); } Review Comment: Same question with `commitAsync`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -388,22 +387,21 @@ private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState * exceptionally depending on the response. If the request fails with a retriable error, the * future will be completed with a {@link RetriableCommitFailedException}. */ - public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) { - Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(subscriptions::allConsumed); - if (commitOffsets.isEmpty()) { + public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) { + if (offsets.isEmpty()) { log.debug("Skipping commit of empty offsets"); return CompletableFuture.completedFuture(Map.of()); } Review Comment: I'm not sure `Optional.empty` has the same meaning as an empty map. c.c @lianetm ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -206,8 +207,12 @@ public void process(ApplicationEvent event) { } private void process(final PollEvent event) { + // In order to ensure certain positions before reconciliation, we only trigger full process of reconcile by PollEvent + requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager -> + consumerMembershipManager.maybeReconcile(true)); Review Comment: We have full control over when to commit, so we no longer need a snapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org