frankvicky commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1950777170


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -220,8 +221,10 @@ public void testPollEnsureAutocommitSent() {
         assertPoll(0, commitRequestManager);
 
         commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.maybeAutoCommitAsync();

Review Comment:
   Another question:
   Do we need a helper method in `commitRequestManager` to combine the 
invocation of `updateAutoCommitTimer` and `maybeAutoCommitAsync`?
   Something like `updateTimerAndMaybeCommit`
   
   



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -446,14 +447,17 @@ public void 
testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() {
     @MethodSource("offsetsGenerator")
     public void testSyncCommitEvent(Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets) {
         SyncCommitEvent event = new SyncCommitEvent(offsets, 12345);
+        Map<TopicPartition, OffsetAndMetadata> actualOffsets = 
offsets.orElse(Collections.emptyMap());
 
         setupProcessor(true);
-        
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets,
 12345);
+        
doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(actualOffsets,
 12345);
+        doReturn(Collections.emptyMap()).when(subscriptionState).allConsumed();
 
         processor.process(event);
-        verify(commitRequestManager).commitSync(offsets, 12345);
+        verify(commitRequestManager).commitSync(actualOffsets, 12345);
         Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
assertDoesNotThrow(() -> event.future().get());
-        assertEquals(offsets.orElse(Map.of()), committedOffsets);
+        assertTrue(event.offsetsReady.isDone());

Review Comment:
   This is to make sure `offsetsReady` has been completed.
   I am not sure if it gives us enough protection or if we need an independent 
test.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -186,7 +186,6 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
             return drainPendingOffsetCommitRequests();
         }
 
-        maybeAutoCommitAsync();

Review Comment:
   If I understand correctly, we no longer allow the background thread to 
trigger auto-commit freely. Therefore, I removed this line.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -220,8 +221,10 @@ public void testPollEnsureAutocommitSent() {
         assertPoll(0, commitRequestManager);
 
         commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+        commitRequestManager.maybeAutoCommitAsync();

Review Comment:
   We no longer invoke `maybeAutoCommitAsync` during poll, so we must manually 
invoke it here to ensure a pending request has been generated.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -417,15 +415,14 @@ public CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> commitAsync(fin
      *                   an expected retriable error.
      * @return Future that will complete when a successful response
      */
-    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitSync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets,
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets,
                                                                                
 final long deadlineMs) {
-        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(subscriptions::allConsumed);
-        if (commitOffsets.isEmpty()) {
+        if (offsets.isEmpty()) {
             return CompletableFuture.completedFuture(Map.of());
         }

Review Comment:
   Same question with `commitAsync`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -388,22 +387,21 @@ private void 
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
      * exceptionally depending on the response. If the request fails with a 
retriable error, the
      * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
-        Map<TopicPartition, OffsetAndMetadata> commitOffsets = 
offsets.orElseGet(subscriptions::allConsumed);
-        if (commitOffsets.isEmpty()) {
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
+        if (offsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
             return CompletableFuture.completedFuture(Map.of());
         }

Review Comment:
   I'm not sure `Optional.empty` has the same meaning as an empty map.
   c.c @lianetm 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -206,8 +207,12 @@ public void process(ApplicationEvent event) {
     }
 
     private void process(final PollEvent event) {
+        // In order to ensure certain positions before reconciliation, we only 
trigger full process of reconcile by PollEvent
+        
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+            consumerMembershipManager.maybeReconcile(true));

Review Comment:
   We have full control over when to commit, so we no longer need a snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to