junrao commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1953524086


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -818,6 +826,8 @@ void maybeReconcile() {
             return;
         }
 
+        if (autoCommitEnabled && !canCommit) return;

Review Comment:
   Hmm, this still seems problematic. The application thread adds a `PollEvent` 
without waiting for the processing to complete. When we get here with 
canCommit=true, we haven't called `markPendingRevocationToPauseFetching()` yet. 
This means that the application thread could still fetch a chunk of records 
from the to be revoked partitions without processing the records yet. Later, we 
will call `signalReconciliationStarted()`, which will commit with 
`subscriptions.allConsumed()`. However, we are not sure if those offsets are 
actually consumed.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -637,6 +634,11 @@ private void maybeUpdateLastSeenEpochIfNewer(final 
Map<TopicPartition, OffsetAnd
         });
     }
 
+    public void updateTimerAndMaybeCommit(final long currentTimeMs) {
+        updateAutoCommitTimer(currentTimeMs);
+        maybeAutoCommitAsync();

Review Comment:
   Hmm, still not sure if this fixes the auto offset commit issue. The 
application thread adds the PollEvent without waiting for the processing of the 
event to complete. So, we are not sure if the application is still polling new 
records when `subscriptions.allConsumed()` is called.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java:
##########
@@ -29,6 +30,6 @@
 public class SyncCommitEvent extends CommitEvent {
 
     public SyncCommitEvent(final Optional<Map<TopicPartition, 
OffsetAndMetadata>> offsets, final long deadlineMs) {
-        super(Type.COMMIT_SYNC, offsets, deadlineMs);
+        super(Type.COMMIT_SYNC, offsets, deadlineMs, new 
CompletableFuture<>());

Review Comment:
   Could we create the future in the super class in stead of passing it in?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -792,7 +796,11 @@ private void transitionToStale() {
      *  - There are topics that haven't been added to the current assignment 
yet, but all their topic IDs
      *    are missing from the target assignment.
      */
-    void maybeReconcile() {
+    public void maybeReconcile(boolean canCommit) {

Review Comment:
   Could we add a javadoc for `canCommit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to