mjsax commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r466056481



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
##########
@@ -379,9 +379,18 @@ private static void verifyReceivedAllRecords(final 
Map<TopicPartition, List<Cons
         final IntegerDeserializer integerDeserializer = new 
IntegerDeserializer();
         for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>> partitionRecords : receivedRecords.entrySet()) {
             final TopicPartition inputTopicPartition = new 
TopicPartition("data", partitionRecords.getKey().partition());
-            final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = 
expectedRecords.get(inputTopicPartition).iterator();
+            final List<ConsumerRecord<byte[], byte[]>> 
receivedRecordsForPartition = partitionRecords.getValue();
+            final List<ConsumerRecord<byte[], byte[]>> 
expectedRecordsForPartition = expectedRecords.get(inputTopicPartition);
+
+            System.out.println(partitionRecords.getKey() + " with " + 
receivedRecordsForPartition.size() + ", " +
+                    inputTopicPartition + " with " + 
expectedRecordsForPartition.size());
+
+            final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = 
expectedRecordsForPartition.iterator();
+            RuntimeException exception = null;
+            for (final ConsumerRecord<byte[], byte[]> receivedRecord : 
receivedRecordsForPartition) {
+                if (!expectedRecord.hasNext())

Review comment:
       Nit: add `{ }`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -461,6 +463,42 @@ public void flush() {
         }
     }
 
+    public void flushCache() {
+        RuntimeException firstException = null;
+        // attempting to flush the stores
+        if (!stores.isEmpty()) {
+            log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                try {
+                    // buffer should be flushed to send all records to 
changelog
+                    if (store instanceof TimeOrderedKeyValueBuffer) {
+                        store.flush();
+                    } else if (store instanceof CachedStateStore) {
+                        ((CachedStateStore) store).flushCache();
+                    }
+                    log.trace("Flushed cache or buffer {}", store.name());
+                } catch (final RuntimeException exception) {
+                    if (firstException == null) {
+                        // do NOT wrap the error if it is actually caused by 
Streams itself
+                        if (exception instanceof StreamsException) {
+                            firstException = exception;
+                        } else {
+                            firstException = new ProcessorStateException(
+                                    format("%sFailed to flush cache of store 
%s", logPrefix, store.name()), exception);

Review comment:
       nit: formatting:
   ```
   firstException = new ProcessorStateException(
       format("%sFailed to flush cache of store %s", logPrefix, store.name()),
       exception
   );
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -49,6 +61,30 @@
         this.stateDirectory = stateDirectory;
     }
 
+    protected void initializeCheckpoint() {
+        // we will delete the local checkpoint file after registering the 
state stores and loading them into the

Review comment:
       Why do we actually delete the checkpoint file after registering? For 
non-eos, it seems we only need to delete the checkopint file if we wipe out the 
whole store?
   
   Or is it a "simplification" do not distinguish between eos/non-eos for this 
case?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -267,80 +266,19 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 // check for tasks that were owned previously but have changed 
active/standby status
                 tasksToRecycle.add(task);
             } else {
-                tasksToClose.add(task);
-            }
-        }
-
-        for (final Task task : tasksToClose) {
-            try {
-                if (task.isActive()) {
-                    // Active tasks are revoked and suspended/committed during 
#handleRevocation
-                    if (!task.state().equals(State.SUSPENDED)) {
-                        log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-                                  task.id(), task.state());
-                        throw new IllegalStateException("Active task " + 
task.id() + " should have been suspended");
-                    }
-                } else {
-                    task.suspend();
-                    task.prepareCommit();
-                    task.postCommit();
-                }
-                completeTaskCloseClean(task);
-                cleanUpTaskProducer(task, taskCloseExceptions);
-                tasks.remove(task.id());
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format(
-                    "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-                    task.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(task.id(), e);
-                // We've already recorded the exception (which is the point of 
clean).
-                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
-                dirtyTasks.add(task);
-            }
-        }
-
-        for (final Task oldTask : tasksToRecycle) {
-            final Task newTask;
-            try {
-                if (oldTask.isActive()) {
-                    if (!oldTask.state().equals(State.SUSPENDED)) {
-                        // Active tasks are revoked and suspended/committed 
during #handleRevocation
-                        log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-                                  oldTask.id(), oldTask.state());
-                        throw new IllegalStateException("Active task " + 
oldTask.id() + " should have been suspended");
-                    }
-                    final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
-                    newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
-                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
-                } else {
-                    oldTask.suspend();
-                    oldTask.prepareCommit();
-                    oldTask.postCommit();
-                    final Set<TopicPartition> partitions = 
activeTasksToCreate.remove(oldTask.id());
-                    newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, 
partitions, mainConsumer);
-                }
-                tasks.remove(oldTask.id());
-                addNewTask(newTask);
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format("Failed to recycle 
task %s cleanly. Attempting to close remaining tasks before re-throwing:", 
oldTask.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(oldTask.id(), e);
-                dirtyTasks.add(oldTask);
+                tasksToCloseClean.add(task);
             }
         }
 
-        for (final Task task : dirtyTasks) {
-            closeTaskDirty(task);
-            cleanUpTaskProducer(task, taskCloseExceptions);
-            tasks.remove(task.id());
-        }
+        // close and recycle those tasks
+        handleCloseAndRecycle(tasksToRecycle, tasksToCloseClean, 
tasksToCloseDirty, activeTasksToCreate, standbyTasksToCreate, 
taskCloseExceptions);
 
         if (!taskCloseExceptions.isEmpty()) {
+            log.error("Hit exceptions while closing / recycling tasks: {}", 
taskCloseExceptions);

Review comment:
       Seems like double logging? We have a `log.error` each time before 
`taskCloseExceptions.put()` is called in `handleCloseAndRecycle`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+                //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was 
suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails 
during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before 
committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit 
offsets but only need to
+                    // flush / checkpoint state stores, so we only need to 
call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of 
clean).
+                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {

Review comment:
       This seems to be a "hack" -- IMHO, as task should be only in either one 
set/list, but never in both... Can we change the first loop to use an explicit 
iterator and remove a task from `tasksToCloseClean` when we add it to 
`tasksToCloseDirty`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -344,40 +343,41 @@ public void resume() {
     }
 
     /**
+     * @throws StreamsException fatal error that should cause the thread to die
+     * @throws TaskMigratedException recoverable error that would cause the 
task to be removed

Review comment:
       It seems like almost every method might throw `StreamsException` and/or 
`TaskMigratedExcetpion` -- is it really worth to have those comments all over 
the place?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+                //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was 
suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails 
during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before 
committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit 
offsets but only need to
+                    // flush / checkpoint state stores, so we only need to 
call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of 
clean).
+                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {
+                    completeTaskCloseClean(task);
+                    cleanUpTaskProducer(task, taskCloseExceptions);
+                    tasks.remove(task.id());
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                tasksToCloseDirty.add(task);
+            }
+        }
+
+        for (final Task task : tasksToRecycle) {
+            final Task newTask;
+            try {
+                if (!tasksToCloseDirty.contains(task)) {
+                    if (task.isActive()) {
+                        final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(task.id());
+                        newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) task, partitions);
+                        cleanUpTaskProducer(task, taskCloseExceptions);
+                    } else {
+                        final Set<TopicPartition> partitions = 
activeTasksToCreate.remove(task.id());
+                        newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) task, partitions, 
mainConsumer);
+                    }
+                    tasks.remove(task.id());
+                    addNewTask(newTask);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format("Failed to recycle 
task %s cleanly. Attempting to close remaining tasks before re-throwing:", 
task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);

Review comment:
       should it be `putIfAbsent` (cf `cleanUpTaskProducer()`)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+                //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was 
suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails 
during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before 
committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit 
offsets but only need to
+                    // flush / checkpoint state stores, so we only need to 
call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of 
clean).
+                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {
+                    completeTaskCloseClean(task);
+                    cleanUpTaskProducer(task, taskCloseExceptions);
+                    tasks.remove(task.id());
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                tasksToCloseDirty.add(task);

Review comment:
       as above

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
##########
@@ -379,9 +379,18 @@ private static void verifyReceivedAllRecords(final 
Map<TopicPartition, List<Cons
         final IntegerDeserializer integerDeserializer = new 
IntegerDeserializer();
         for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>> partitionRecords : receivedRecords.entrySet()) {
             final TopicPartition inputTopicPartition = new 
TopicPartition("data", partitionRecords.getKey().partition());
-            final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = 
expectedRecords.get(inputTopicPartition).iterator();
+            final List<ConsumerRecord<byte[], byte[]>> 
receivedRecordsForPartition = partitionRecords.getValue();
+            final List<ConsumerRecord<byte[], byte[]>> 
expectedRecordsForPartition = expectedRecords.get(inputTopicPartition);
+
+            System.out.println(partitionRecords.getKey() + " with " + 
receivedRecordsForPartition.size() + ", " +
+                    inputTopicPartition + " with " + 
expectedRecordsForPartition.size());
+
+            final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = 
expectedRecordsForPartition.iterator();
+            RuntimeException exception = null;
+            for (final ConsumerRecord<byte[], byte[]> receivedRecord : 
receivedRecordsForPartition) {
+                if (!expectedRecord.hasNext())
+                    exception = new RuntimeException("Result verification 
failed for " + receivedRecord + " since there's no more expected record");

Review comment:
       Should be `break` for this case?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
##########
@@ -390,9 +399,12 @@ private static void verifyReceivedAllRecords(final 
Map<TopicPartition, List<Cons
                 final int expectedValue = 
integerDeserializer.deserialize(expected.topic(), expected.value());
 
                 if (!receivedKey.equals(expectedKey) || receivedValue != 
expectedValue) {
-                    throw new RuntimeException("Result verification failed for 
" + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but 
was <" + receivedKey + "," + receivedValue + ">");
+                    exception = new RuntimeException("Result verification 
failed for " + receivedRecord + " expected <" + expectedKey + "," + 
expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
                 }
             }
+
+            if (exception != null)

Review comment:
       Nit: add `{ }`

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
##########
@@ -390,9 +399,12 @@ private static void verifyReceivedAllRecords(final 
Map<TopicPartition, List<Cons
                 final int expectedValue = 
integerDeserializer.deserialize(expected.topic(), expected.value());
 
                 if (!receivedKey.equals(expectedKey) || receivedValue != 
expectedValue) {
-                    throw new RuntimeException("Result verification failed for 
" + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but 
was <" + receivedKey + "," + receivedValue + ">");
+                    exception = new RuntimeException("Result verification 
failed for " + receivedRecord + " expected <" + expectedKey + "," + 
expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");

Review comment:
       Should be `break` here?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())

Review comment:
       nit: add `{ }`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;

Review comment:
       This line just says what the code say itself. The question seem to be, 
"why" and this question is not answered in the comment.
   
   From my understanding, because we delete the checkpoint file when the task 
is initialized, we want to write the checkpoint file again as soon as possible 
(ie, on first commit) instead of waiting until we hit the 
`OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT` to keep the time frame in which we have 
consistent state but not checkpoint file low?
   
   Thus, I am wondering if we should just not delete the checkpoint file when 
we init the task for non-eos instead? And just remove this additional condition?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)

Review comment:
       nit: add `{ }`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -248,12 +248,11 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
         );
 
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = 
new LinkedHashMap<>();
-
         final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
         final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-        final List<Task> tasksToClose = new LinkedList<>();
-        final Set<Task> tasksToRecycle = new HashSet<>();
-        final Set<Task> dirtyTasks = new HashSet<>();
+        final List<Task> tasksToRecycle = new LinkedList<>();
+        final List<Task> tasksToCloseClean = new LinkedList<>();
+        final List<Task> tasksToCloseDirty = new LinkedList<>();

Review comment:
       Why do we maintain `List` instead of `Set` ? It seem more natural to me 
to have a `Set` (ie, should we instead switch `tasksToRecyle` to use a `Set`, 
too)?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+                //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was 
suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails 
during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before 
committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit 
offsets but only need to
+                    // flush / checkpoint state stores, so we only need to 
call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of 
clean).
+                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {
+                    completeTaskCloseClean(task);
+                    cleanUpTaskProducer(task, taskCloseExceptions);
+                    tasks.remove(task.id());
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                tasksToCloseDirty.add(task);
+            }
+        }
+
+        for (final Task task : tasksToRecycle) {
+            final Task newTask;
+            try {
+                if (!tasksToCloseDirty.contains(task)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -344,40 +343,41 @@ public void resume() {
     }
 
     /**
+     * @throws StreamsException fatal error that should cause the thread to die
+     * @throws TaskMigratedException recoverable error that would cause the 
task to be removed
      * @return offsets that should be committed for this task
      */
     @Override
     public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
-        final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;

Review comment:
       Why do we remove this? It might be personal preference, but the old code 
was "cleaner" IMHO? (Feel free to ignore  this comment.)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -428,53 +428,28 @@ public void resume() {
         return committableOffsets;
     }
 
-    /**
-     * This should only be called if the attempted commit succeeded for this 
task
-     */
     @Override
-    public void postCommit() {
-        commitRequested = false;
-
+    public void postCommit(final boolean enforceCheckpoint) {
         switch (state()) {
             case CREATED:
                 // We should never write a checkpoint for a CREATED task as we 
may overwrite an existing checkpoint
                 // with empty uninitialized offsets
-                log.debug("Skipped writing checkpoint for created task");
+                log.debug("Skipped writing checkpoint for {} task", state());
 
                 break;
 
             case RESTORING:
-                writeCheckpoint();
-                log.debug("Finalized commit for restoring task");
+            case SUSPENDED:

Review comment:
       Nit: Can we keep `SUSPENDED` after `RUNNING` case? We use the same order 
in all methods and always follow the "natural" state transition order, that is 
CREATE, RESTORING, RUNNING, SUSPENDED, CLOSED.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+            return true;
+
+        // we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : 
newOffsetSnapshot.entrySet()) {
+            totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+        }
+
+        // when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+        // otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+        return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
       I would expect that writing the checkpoint file should be cheap anyway 
(it's just some metadata, how bad can it be?) -- or are you worried about the 
blocking flush to the file system? Thus, I don't consider double-checkpointing 
as a real issue? Might be a case of pre-mature optimization? It might simplify 
our code if we just blindly write the checkpoint data even if it did not 
change? -- But I am fine either way.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -428,53 +428,28 @@ public void resume() {
         return committableOffsets;
     }
 
-    /**
-     * This should only be called if the attempted commit succeeded for this 
task
-     */
     @Override
-    public void postCommit() {
-        commitRequested = false;
-
+    public void postCommit(final boolean enforceCheckpoint) {
         switch (state()) {
             case CREATED:
                 // We should never write a checkpoint for a CREATED task as we 
may overwrite an existing checkpoint
                 // with empty uninitialized offsets
-                log.debug("Skipped writing checkpoint for created task");
+                log.debug("Skipped writing checkpoint for {} task", state());

Review comment:
       nit: Why this? We know that we are in state `created` ?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+                //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was 
suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails 
during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);

Review comment:
       We should remove `task` from `taskToCloseClean`/`taskToRecycle` here to 
maintain the invariant that a task is only in either one of the three sets/list 
but never in multiple at the same time. (cf. my comment below)
   
   I understand that you want to share this loop for "cleanClose" and "recycle" 
we can extract it into its own method to achieve this.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+                //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was 
suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails 
during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before 
committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit 
offsets but only need to
+                    // flush / checkpoint state stores, so we only need to 
call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);

Review comment:
       should it be `putIfAbsent` (cf `cleanUpTaskProducer()`)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+                //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was 
suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails 
during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before 
committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit 
offsets but only need to
+                    // flush / checkpoint state stores, so we only need to 
call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of 
clean).
+                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);
+            }
+        }
 
+        for (final Task task : tasksToCloseClean) {
+            try {
+                if (!tasksToCloseDirty.contains(task)) {
+                    completeTaskCloseClean(task);
+                    cleanUpTaskProducer(task, taskCloseExceptions);
+                    tasks.remove(task.id());
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);

Review comment:
       should it be `putIfAbsent` (cf `cleanUpTaskProducer()`)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -479,24 +512,20 @@ boolean tryToCompleteRestoration() {
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
         final Set<TopicPartition> remainingRevokedPartitions = new 
HashSet<>(revokedPartitions);
 
-        final Set<Task> revokedTasks = new HashSet<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-
+        final Set<Task> revokedActiveTasks = new HashSet<>();
+        final Set<Task> nonRevokedActiveTasks = new HashSet<>();
+        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsPerTask = new HashMap<>();
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
+
         for (final Task task : activeTaskIterable()) {
             if 
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
-                try {
-                    task.suspend();
-                    revokedTasks.add(task);
-                } catch (final RuntimeException e) {
-                    log.error("Caught the following exception while trying to 
suspend revoked task " + task.id(), e);
-                    firstException.compareAndSet(null, new 
StreamsException("Failed to suspend " + task.id(), e));
-                }
+                // when the task input partitions are included in the revoked 
list,
+                // this is an active task and should be revoked
+                revokedActiveTasks.add(task);
+                remainingRevokedPartitions.removeAll(task.inputPartitions());
             } else if (task.commitNeeded()) {
-                additionalTasksForCommitting.add(task);
+                nonRevokedActiveTasks.add(task);

Review comment:
       Why this renaming? -- the set does not contain "all" non-revoked active 
tasks -- only those, that need committing?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -243,18 +242,24 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         for (final Task task : tasksToClose) {
             try {
-                if (task.isActive()) {

Review comment:
       What other PR?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }
+
+        // for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+        final List<Task> tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+        tasksToCheckpoint.addAll(tasksToRecycle);
+        for (final Task task : tasksToCheckpoint) {
+            try {
+                // Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+                // 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+                //    and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+                // 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+                //    write the checkpoint file.
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed when it was 
suspended, but it reports non-empty " +
+                                    "offsets {} to commit; it means it fails 
during last commit and hence should be closed dirty",
+                            task.id(), offsets);
+
+                    tasksToCloseDirty.add(task);
+                } else if (!task.isActive()) {
+                    // For standby tasks, always try to first suspend before 
committing (checkpointing) it;
+                    // Since standby tasks do not actually need to commit 
offsets but only need to
+                    // flush / checkpoint state stores, so we only need to 
call postCommit here.
+                    task.suspend();
+
+                    task.postCommit(true);
+                }
+            } catch (final RuntimeException e) {
+                final String uncleanMessage = String.format(
+                        "Failed to checkpoint task %s. Attempting to close 
remaining tasks before re-throwing:",
+                        task.id());
+                log.error(uncleanMessage, e);
+                taskCloseExceptions.put(task.id(), e);
+                // We've already recorded the exception (which is the point of 
clean).
+                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
+                tasksToCloseDirty.add(task);

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -454,6 +456,41 @@ public void flush() {
         }
     }
 
+    public void flushCache() {
+        RuntimeException firstException = null;
+        // attempting to flush the stores
+        if (!stores.isEmpty()) {
+            log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                try {
+                    // buffer should be flushed to send all records to 
changelog
+                    if (store instanceof TimeOrderedKeyValueBuffer) {

Review comment:
       I agree. IMHO, we should get rid of KTable/store cache all together and 
only have a "changelog-writer-cache" that regular stores and suppress() can use 
to reduce the write load on the changelog topic. For downstream rate control, 
users should use suppress() (and we might want to try to unify suppress() and 
the upstream store somehow eventually to avoid the current redundancy)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -368,7 +306,102 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                 addNewTask(task);
             }
         }
+    }
+
+    private void handleCloseAndRecycle(final List<Task> tasksToRecycle,
+                                       final List<Task> tasksToCloseClean,
+                                       final List<Task> tasksToCloseDirty,
+                                       final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                       final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                       final LinkedHashMap<TaskId, 
RuntimeException> taskCloseExceptions) {
+        if (!tasksToCloseDirty.isEmpty()) {
+            throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+        }

Review comment:
       While I usually prefer checks like this, it seems unnecessary here? 
(Feel free to ignore.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to