lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1956606014


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -386,38 +360,10 @@ public void testCommitAsync() {
     @Test
     public void testCommitAsyncWithEmptyOffsets() {
         subscriptionState = mock(SubscriptionState.class);
-        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
-        TopicPartition tp = new TopicPartition("topic", 1);
-        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, 
Optional.of(1), "");
-        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, 
offsetAndMetadata);
-        doReturn(offsets).when(subscriptionState).allConsumed();
-
-        CommitRequestManager commitRequestManager = create(true, 100);
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Optional.empty());
-        assertEquals(1, 
commitRequestManager.unsentOffsetCommitRequests().size());
-        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = 
assertPoll(1, commitRequestManager);
-        pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
-            "topic",
-            1,
-            (short) 1,
-            Errors.NONE)));
-
-        verify(subscriptionState).allConsumed();
-        verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
-        assertTrue(future.isDone());
-        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
assertDoesNotThrow(() -> future.get());
-        assertEquals(offsets, commitOffsets);
-    }
-
-    @Test
-    public void testCommitAsyncWithEmptyAllConsumedOffsets() {
-        subscriptionState = mock(SubscriptionState.class);
-        doReturn(Map.of()).when(subscriptionState).allConsumed();
 
         CommitRequestManager commitRequestManager = create(true, 100);
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Optional.empty());
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
commitRequestManager.commitAsync(Collections.emptyMap());
 
-        verify(subscriptionState).allConsumed();
         assertTrue(future.isDone());

Review Comment:
   should we add the check to make sure that no request is added to the queue 
if empty offsets? (could happen even is the future isDone) 
   `assertPoll(0, commitRequestManager);`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -637,6 +634,24 @@ private void maybeUpdateLastSeenEpochIfNewer(final 
Map<TopicPartition, OffsetAnd
         });
     }
 
+    /**
+     * This is a non-blocking method to update timer and trigger async 
auto-commit.
+     * <p>
+     * This method performs two main tasks:
+     * <ol>
+     * <li>Updates the internal timer with the current time.</li>
+     * <li>Initiate an asynchronous auto-commit operation for all consumed 
messages if needed.</li>

Review Comment:
   ```suggestion
        * <li>Initiate an asynchronous auto-commit operation for all consumed 
positions if needed.</li>
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1515,6 +1522,8 @@ public void 
testGroupRemoteAssignorUsedInConsumerProtocol() {
         consumer = newConsumer(config);
 
         
assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
+        // To unblock commitSyncAllConsumed on close
+        markOffsetsReadyForCommitEvent();

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1539,6 +1548,8 @@ public void testGroupIdNotNullAndValid() {
 
         
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
         
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+        // To unblock commitSyncAllConsumed on close
+        markOffsetsReadyForCommitEvent();

Review Comment:
   makes sense, but given that this test is unrelated to commits, maybe a 
cleaner approach is to simply disable set ENABLE_AUTO_COMMIT_CONFIG=false?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -595,18 +540,20 @@ public void testAutocommitEnsureOnlyOneInflightRequest() {
 
         CommitRequestManager commitRequestManager = create(true, 100);
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         List<NetworkClientDelegate.FutureCompletionHandler> futures = 
assertPoll(1, commitRequestManager);
 
         time.sleep(100);
-        commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
         // We want to make sure we don't resend autocommit if the previous 
request has not been
         // completed, even if the interval expired
         assertPoll(0, commitRequestManager);
         assertEmptyPendingRequests(commitRequestManager);
 
         // complete the unsent request and re-poll
         futures.get(0).onComplete(buildOffsetCommitClientResponse(new 
OffsetCommitResponse(0, new HashMap<>())));
+        time.sleep(100);

Review Comment:
   why do we need to wait for the interval here again? It expired on ln 546 
above, so the next one should be generated as soon as we receive a response and 
have a pollEvent I expect, no more waiting. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1383,9 +1398,7 @@ public void registerStateListener(MemberStateListener 
listener) {
      *                      time-sensitive operations should be performed
      */
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        if (state == MemberState.RECONCILING) {
-            maybeReconcile();
-        }
+        maybeReconcile(false);

Review Comment:
   I couldn't find coverage for this and it would be a nasty regression 
(messing committed offsets). Let's pls add a simple test to check that this 
mananger.poll calls reconcile with canCommit false.  



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -206,13 +206,25 @@ public void process(ApplicationEvent event) {
     }
 
     private void process(final PollEvent event) {
+        // Trigger a reconciliation that can safely commit offsets if needed 
to revoke partitions,
+        // as we're processing before any new fetching starts in the app thread
+        
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+            consumerMembershipManager.maybeReconcile(true));
         if (requestManagers.commitRequestManager.isPresent()) {
-            requestManagers.commitRequestManager.ifPresent(m -> 
m.updateAutoCommitTimer(event.pollTimeMs()));
+            CommitRequestManager commitRequestManager = 
requestManagers.commitRequestManager.get();
+            commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
+            // all commit request generation points have been passed,
+            // so it's safe to notify the app thread could start the next poll 
cycle

Review Comment:
   ```suggestion
               // so it's safe to notify the app thread could proceed and start 
fetching
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to