ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441092703



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -409,32 +407,28 @@ public void resume() {
         return committableOffsets;
     }
 
+    /**
+     * This should only be called if the attempted commit succeeded for this 
task
+     */
     @Override
     public void postCommit() {
         commitRequested = false;
         commitNeeded = false;
 
         switch (state()) {
             case RESTORING:
-                writeCheckpointIfNeed();
+            case SUSPENDED:
+                maybeWriteCheckpoint();
 
                 break;
 
             case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
+                if (!eosEnabled) {
+                    maybeWriteCheckpoint();
                 }
 
                 break;
 
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` 
already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();

Review comment:
       I'm not 100% certain that the Consumer _does_ clear its internal buffer 
on revocation. At least, I couldn't find it in the code, but maybe I'm looking 
in the wrong place. 
   
   Not arguing we shouldn't clear the partition group here, was just wondering 
about this for my own sake. Hm




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to