This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new ef6a832  KAFKA-10166: always write checkpoint before closing an 
(initialized) task (#8926)
ef6a832 is described below

commit ef6a832ff673e5e25a205a4915e271392719bcf0
Author: A. Sophie Blee-Goldman <sop...@confluent.io>
AuthorDate: Fri Jun 26 15:11:56 2020 -0700

    KAFKA-10166: always write checkpoint before closing an (initialized) task 
(#8926)
    
    This should address at least some of the excessive TaskCorruptedExceptions 
we've been seeing lately. Basically, at the moment we only commit tasks if 
commitNeeded is true -- this seems obvious by definition. But the problem is we 
do some essential cleanup in postCommit that should always be done before a 
task is closed:
    
    * clear the PartitionGroup
    * write the checkpoint
    
    The second is actually fine to skip when commitNeeded = false with ALOS, as 
we will have already written a checkpoint during the last commit. But for EOS, 
we only write the checkpoint before a close -- so even if there is no new 
pending data since the last commit, we have to write the current offsets. If we 
don't, the task will be assumed dirty and we will run into our friend the 
TaskCorruptedException during (re)initialization.
    
    To fix this, we should just always call prepareCommit and postCommit at the 
TaskManager level. Within the task, it can decide whether or not to actually do 
something in those methods based on commitNeeded.
    
    One subtle issue is that we still need to avoid checkpointing a task that 
was still in CREATED, to avoid potentially overwriting an existing checkpoint 
with uninitialized empty offsets. Unfortunately we always suspend a task before 
closing and committing, so we lose the information about whether the task as in 
CREATED or RUNNING/RESTORING by the time we get to the checkpoint. For this we 
introduce a special flag to keep track of whether a suspended task should 
actually be checkpointed or not
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../streams/processor/internals/StandbyTask.java   |  87 +++++++++++---
 .../streams/processor/internals/StreamTask.java    |  66 ++++++++---
 .../streams/processor/internals/TaskManager.java   | 105 ++++++++---------
 .../processor/internals/StandbyTaskTest.java       |  25 +++-
 .../processor/internals/StreamTaskTest.java        | 128 ++++++++++++++-------
 .../processor/internals/TaskManagerTest.java       |  23 ++--
 .../apache/kafka/streams/TopologyTestDriver.java   |   2 +
 7 files changed, 281 insertions(+), 155 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 1aa68bc..3826780 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -46,14 +46,15 @@ public class StandbyTask extends AbstractTask implements 
Task {
     private final InternalProcessorContext processorContext;
     private final StreamsMetricsImpl streamsMetrics;
 
-    private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit;
+    private boolean checkpointNeededForSuspended = false;
+    private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit = new 
HashMap<>();
 
     /**
      * @param id             the ID of this task
      * @param partitions     input topic partitions, used for thread metadata 
only
      * @param topology       the instance of {@link ProcessorTopology}
      * @param config         the {@link StreamsConfig} specified by the user
-     * @param streamsMetrics        the {@link StreamsMetrics} created by the 
thread
+     * @param streamsMetrics the {@link StreamsMetrics} created by the thread
      * @param stateMgr       the {@link ProcessorStateManager} for this task
      * @param stateDirectory the {@link StateDirectory} created by the thread
      */
@@ -93,6 +94,9 @@ public class StandbyTask extends AbstractTask implements Task 
{
         if (state() == State.CREATED) {
             StateManagerUtil.registerStateStores(log, logPrefix, topology, 
stateMgr, stateDirectory, processorContext);
 
+            // initialize the snapshot with the current offsets as we don't 
need to commit then until they change
+            offsetSnapshotSinceLastCommit = new 
HashMap<>(stateMgr.changelogOffsets());
+
             // no topology needs initialized, we can transit to RUNNING
             // right after registered the stores
             transitionTo(State.RESTORING);
@@ -115,8 +119,15 @@ public class StandbyTask extends AbstractTask implements 
Task {
     public void suspend() {
         switch (state()) {
             case CREATED:
+                log.info("Suspended created");
+                checkpointNeededForSuspended = false;
+                transitionTo(State.SUSPENDED);
+
+                break;
+
             case RUNNING:
-                log.info("Suspended {}", state());
+                log.info("Suspended running");
+                checkpointNeededForSuspended = true;
                 transitionTo(State.SUSPENDED);
 
                 break;
@@ -144,18 +155,27 @@ public class StandbyTask extends AbstractTask implements 
Task {
     }
 
     /**
-     * 1. flush store
-     * 2. write checkpoint file
+     * Flush stores before a commit
      *
      * @throws StreamsException fatal error, should close the thread
      */
     @Override
     public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
-        if (state() == State.RUNNING || state() == State.SUSPENDED) {
-            stateMgr.flush();
-            log.debug("Prepared task for committing");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " 
while preparing standby task " + id + " for committing ");
+        switch (state()) {
+            case CREATED:
+                log.debug("Skipped preparing created task for commit");
+
+                break;
+
+            case RUNNING:
+            case SUSPENDED:
+                stateMgr.flush();
+                log.debug("Prepared {} task for committing", state());
+
+                break;
+
+            default:
+                throw new IllegalStateException("Illegal state " + state() + " 
while preparing standby task " + id + " for committing ");
         }
 
         return Collections.emptyMap();
@@ -163,14 +183,36 @@ public class StandbyTask extends AbstractTask implements 
Task {
 
     @Override
     public void postCommit() {
-        if (state() == State.RUNNING || state() == State.SUSPENDED) {
-            // since there's no written offsets we can checkpoint with empty 
map,
-            // and the state current offset would be used to checkpoint
-            stateMgr.checkpoint(Collections.emptyMap());
-            offsetSnapshotSinceLastCommit = new 
HashMap<>(stateMgr.changelogOffsets());
-            log.debug("Finalized commit");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " 
while post committing standby task " + id);
+        switch (state()) {
+            case CREATED:
+                // We should never write a checkpoint for a CREATED task as we 
may overwrite an existing checkpoint
+                // with empty uninitialized offsets
+                log.debug("Skipped writing checkpoint for created task");
+
+                break;
+
+            case RUNNING:
+                if (commitNeeded()) {
+                    writeCheckpoint();
+                }
+                log.debug("Finalized commit for running task");
+
+                break;
+
+            case SUSPENDED:
+                // don't overwrite the existing checkpoint file if we haven't 
actually initialized the offsets yet
+                if (checkpointNeededForSuspended) {
+                    writeCheckpoint();
+                    log.debug("Finalized commit for suspended task");
+                    checkpointNeededForSuspended = false;
+                } else {
+                    log.debug("Skipped writing checkpoint for uninitialized 
suspended task");
+                }
+
+                break;
+
+            default:
+                throw new IllegalStateException("Illegal state " + state() + " 
while post committing standby task " + id);
         }
     }
 
@@ -203,6 +245,13 @@ public class StandbyTask extends AbstractTask implements 
Task {
         log.info("Closed clean and recycled state");
     }
 
+    private void writeCheckpoint() {
+        // since there's no written offsets we can checkpoint with empty map,
+        // and the state's current offset would be used to checkpoint
+        stateMgr.checkpoint(Collections.emptyMap());
+        offsetSnapshotSinceLastCommit = new 
HashMap<>(stateMgr.changelogOffsets());
+    }
+
     private void close(final boolean clean) {
         switch (state()) {
             case SUSPENDED:
@@ -243,7 +292,7 @@ public class StandbyTask extends AbstractTask implements 
Task {
     @Override
     public boolean commitNeeded() {
         // we can commit if the store's offset has changed since last commit
-        return offsetSnapshotSinceLastCommit == null || 
!offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets());
+        return 
!offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets());
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4b27436..abdd567 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -58,6 +58,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.singleton;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
@@ -106,6 +107,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     private long idleStartTimeMs;
     private boolean commitNeeded = false;
     private boolean commitRequested = false;
+    private boolean checkpointNeededForSuspended = false;
 
     public StreamTask(final TaskId id,
                       final Set<TopicPartition> partitions,
@@ -249,8 +251,15 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     public void suspend() {
         switch (state()) {
             case CREATED:
+                log.info("Suspended created");
+                checkpointNeededForSuspended = false;
+                transitionTo(State.SUSPENDED);
+
+                break;
+
             case RESTORING:
-                log.info("Suspended {}", state());
+                log.info("Suspended restoring");
+                checkpointNeededForSuspended = true;
                 transitionTo(State.SUSPENDED);
 
                 break;
@@ -259,6 +268,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 try {
                     // use try-catch to ensure state transition to SUSPENDED 
even if user code throws in `Processor#close()`
                     closeTopology();
+                    checkpointNeededForSuspended = true;
                 } finally {
                     transitionTo(State.SUSPENDED);
                     log.info("Suspended running");
@@ -338,18 +348,28 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
      */
     @Override
     public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+        final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
         switch (state()) {
-            case RUNNING:
+            case CREATED:
             case RESTORING:
+            case RUNNING:
             case SUSPENDED:
                 stateMgr.flush();
-                recordCollector.flush();
-
-                log.debug("Prepared task for committing");
+                // the commitNeeded flag just indicates whether we have 
reached RUNNING and processed any new data,
+                // so it only indicates whether the record collector should be 
flushed or not, whereas the state
+                // manager should always be flushed; either there is newly 
restored data or the flush will be a no-op
+                if (commitNeeded) {
+                    recordCollector.flush();
+
+                    log.debug("Prepared {} task for committing", state());
+                    offsetsToCommit = committableOffsetsAndMetadata();
+                } else {
+                    log.debug("Skipped preparing {} task for commit since 
there is nothing to commit", state());
+                    offsetsToCommit = Collections.emptyMap();
+                }
 
                 break;
 
-            case CREATED:
             case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " 
while preparing active task " + id + " for committing");
 
@@ -357,7 +377,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 throw new IllegalStateException("Unknown state " + state() + " 
while preparing active task " + id + " for committing");
         }
 
-        return committableOffsetsAndMetadata();
+        return offsetsToCommit;
     }
 
     private Map<TopicPartition, OffsetAndMetadata> 
committableOffsetsAndMetadata() {
@@ -399,7 +419,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 break;
 
             case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " 
while getting commitable offsets for active task " + id);
+                throw new IllegalStateException("Illegal state " + state() + " 
while getting committable offsets for active task " + id);
 
             default:
                 throw new IllegalStateException("Unknown state " + state() + " 
while post committing active task " + id);
@@ -414,11 +434,18 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     @Override
     public void postCommit() {
         commitRequested = false;
-        commitNeeded = false;
 
         switch (state()) {
+            case CREATED:
+                // We should never write a checkpoint for a CREATED task as we 
may overwrite an existing checkpoint
+                // with empty uninitialized offsets
+                log.debug("Skipped writing checkpoint for created task");
+
+                break;
+
             case RESTORING:
                 writeCheckpoint();
+                log.debug("Finalized commit for restoring task");
 
                 break;
 
@@ -426,6 +453,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 if (!eosEnabled) {
                     writeCheckpoint();
                 }
+                log.debug("Finalized commit for running task");
 
                 break;
 
@@ -438,19 +466,24 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                  */
                 partitionGroup.clear();
 
-                writeCheckpoint();
+                if (checkpointNeededForSuspended) {
+                    // Make sure we don't overwrite the checkpoint if the 
suspended task was still in CREATED
+                    // and had not yet initialized offsets
+                    writeCheckpoint();
+                    log.debug("Finalized commit for suspended task");
+                    checkpointNeededForSuspended = false;
+                } else {
+                    log.debug("Skipped writing checkpoint for uninitialized 
suspended task");
+                }
 
                 break;
 
-            case CREATED:
             case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " 
while post committing active task " + id);
 
             default:
                 throw new IllegalStateException("Unknown state " + state() + " 
while post committing active task " + id);
         }
-
-        log.debug("Finalized commit");
     }
 
     private Map<TopicPartition, Long> extractPartitionTimes() {
@@ -511,10 +544,11 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 
     private void writeCheckpoint() {
         if (commitNeeded) {
-            log.error("Tried to write a checkpoint with pending uncommitted 
data, should complete the commit first.");
-            throw new IllegalStateException("A checkpoint should only be 
written if no commit is needed.");
+            stateMgr.checkpoint(checkpointableOffsets());
+        } else {
+            stateMgr.checkpoint(emptyMap());
         }
-        stateMgr.checkpoint(checkpointableOffsets());
+        commitNeeded = false;
     }
 
     private void validateClean() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 025e9f4..173efff 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -242,15 +242,12 @@ public class TaskManager {
 
         for (final Task task : tasksToClose) {
             try {
-                task.suspend(); // Should be a no-op for active tasks since 
they're suspended in handleRevocation
-                if (task.commitNeeded()) {
-                    if (task.isActive()) {
-                        log.error("Active task {} was revoked and should have 
already been committed", task.id());
-                        throw new IllegalStateException("Revoked active task 
was not committed during handleRevocation");
-                    } else {
-                        task.prepareCommit();
-                        task.postCommit();
-                    }
+                if (!task.isActive()) {
+                    // Active tasks should have already been suspended and 
committed during handleRevocation, but
+                    // standbys must be suspended/committed/closed all here
+                    task.suspend();
+                    task.prepareCommit();
+                    task.postCommit();
                 }
                 completeTaskCloseClean(task);
                 cleanUpTaskProducer(task, taskCloseExceptions);
@@ -437,17 +434,16 @@ public class TaskManager {
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
         final Set<TopicPartition> remainingRevokedPartitions = new 
HashSet<>(revokedPartitions);
 
-        final Set<Task> tasksToCommit = new HashSet<>();
+        final Set<Task> revokedTasks = new HashSet<>();
         final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
         for (final Task task : activeTaskIterable()) {
             if 
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
                 try {
                     task.suspend();
-                    if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
-                    }
+                    revokedTasks.add(task);
                 } catch (final RuntimeException e) {
                     log.error("Caught the following exception while trying to 
suspend revoked task " + task.id(), e);
                     firstException.compareAndSet(null, new 
StreamsException("Failed to suspend " + task.id(), e));
@@ -469,21 +465,20 @@ public class TaskManager {
             throw suspendException;
         }
 
+        prepareCommitAndAddOffsetsToMap(revokedTasks, 
consumedOffsetsAndMetadataPerTask);
+
         // If using eos-beta, if we must commit any task then we must commit 
all of them
         // TODO: when KAFKA-9450 is done this will be less expensive, and we 
can simplify by always committing everything
-        if (processingMode ==  EXACTLY_ONCE_BETA && !tasksToCommit.isEmpty()) {
-            tasksToCommit.addAll(additionalTasksForCommitting);
-        }
+        final boolean shouldCommitAdditionalTasks =
+            processingMode == EXACTLY_ONCE_BETA && 
!consumedOffsetsAndMetadataPerTask.isEmpty();
 
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        for (final Task task : tasksToCommit) {
-            final Map<TopicPartition, OffsetAndMetadata> committableOffsets = 
task.prepareCommit();
-            consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+        if (shouldCommitAdditionalTasks) {
+            prepareCommitAndAddOffsetsToMap(additionalTasksForCommitting, 
consumedOffsetsAndMetadataPerTask);
         }
 
         commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
 
-        for (final Task task : tasksToCommit) {
+        for (final Task task : revokedTasks) {
             try {
                 task.postCommit();
             } catch (final RuntimeException e) {
@@ -492,9 +487,30 @@ public class TaskManager {
             }
         }
 
-        final RuntimeException commitException = firstException.get();
-        if (commitException != null) {
-            throw commitException;
+        if (shouldCommitAdditionalTasks) {
+            for (final Task task : additionalTasksForCommitting) {
+                try {
+                    task.postCommit();
+                } catch (final RuntimeException e) {
+                    log.error("Exception caught while post-committing task " + 
task.id(), e);
+                    firstException.compareAndSet(null, e);
+                }
+            }
+        }
+
+        final RuntimeException postCommitException = firstException.get();
+        if (postCommitException != null) {
+            throw postCommitException;
+        }
+    }
+
+    private void prepareCommitAndAddOffsetsToMap(final Set<Task> 
tasksToPrepare,
+                                                 final Map<TaskId, 
Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask) {
+        for (final Task task : tasksToPrepare) {
+            final Map<TopicPartition, OffsetAndMetadata> committableOffsets = 
task.prepareCommit();
+            if (!committableOffsets.isEmpty()) {
+                consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+            }
         }
     }
 
@@ -723,17 +739,17 @@ public class TaskManager {
         if (!clean) {
             return activeTaskIterable();
         }
+        final Set<Task> tasksToCommit = new HashSet<>();
         final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCloseClean = new HashSet<>();
-        final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
         for (final Task task : activeTaskIterable()) {
             try {
                 task.suspend();
-                if (task.commitNeeded()) {
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-                    tasksToCommit.add(task);
+                final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
+                tasksToCommit.add(task);
+                if (!committableOffsets.isEmpty()) {
                     consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
                 }
                 tasksToCloseClean.add(task);
@@ -754,7 +770,7 @@ public class TaskManager {
             try {
                 commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
 
-                for (final Task task : tasksToCommit) {
+                for (final Task task : activeTaskIterable()) {
                     try {
                         task.postCommit();
                     } catch (final RuntimeException e) {
@@ -794,17 +810,13 @@ public class TaskManager {
             return standbyTaskIterable();
         }
         final Set<Task> tasksToCloseDirty = new HashSet<>();
-        final Set<Task> tasksToCloseClean = new HashSet<>();
-        final Set<Task> tasksToCommit = new HashSet<>();
 
         for (final Task task : standbyTaskIterable()) {
             try {
                 task.suspend();
-                if (task.commitNeeded()) {
-                    task.prepareCommit();
-                    tasksToCommit.add(task);
-                }
-                tasksToCloseClean.add(task);
+                task.prepareCommit();
+                task.postCommit();
+                completeTaskCloseClean(task);
             } catch (final TaskMigratedException e) {
                 // just ignore the exception as it doesn't matter during 
shutdown
                 tasksToCloseDirty.add(task);
@@ -813,27 +825,6 @@ public class TaskManager {
                 tasksToCloseDirty.add(task);
             }
         }
-
-        for (final Task task : tasksToCommit) {
-            try {
-                task.postCommit();
-            } catch (final RuntimeException e) {
-                log.error("Exception caught while post-committing standby task 
" + task.id(), e);
-                firstException.compareAndSet(null, e);
-                tasksToCloseDirty.add(task);
-                tasksToCloseClean.remove(task);
-            }
-        }
-
-        for (final Task task : tasksToCloseClean) {
-            try {
-                completeTaskCloseClean(task);
-            } catch (final RuntimeException e) {
-                log.error("Exception caught while clean-closing standby task " 
+ task.id(), e);
-                firstException.compareAndSet(null, e);
-                tasksToCloseDirty.add(task);
-            }
-        }
         return tasksToCloseDirty;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index f98d630..ba07d55 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -171,6 +171,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldTransitToRunningAfterInitialization() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.registerStateStores(EasyMock.anyObject(), 
EasyMock.anyObject());
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.replay(stateManager);
@@ -195,12 +196,15 @@ public class StandbyTaskTest {
     public void shouldThrowIfCommittingOnIllegalState() {
         EasyMock.replay(stateManager);
         task = createStandbyTask();
+        task.suspend();
+        task.closeClean();
 
         assertThrows(IllegalStateException.class, task::prepareCommit);
     }
 
     @Test
     public void shouldFlushAndCheckpointStateManagerOnCommit() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.flush();
         EasyMock.expectLastCall();
         stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
@@ -230,6 +234,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldNotCommitAndThrowOnCloseDirty() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.close();
         EasyMock.expectLastCall().andThrow(new 
ProcessorStateException("KABOOM!")).anyTimes();
         stateManager.flush();
@@ -255,6 +260,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.close();
         EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
         EasyMock.replay(stateManager);
@@ -270,6 +276,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldSuspendAndCommitBeforeCloseClean() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.close();
         EasyMock.expectLastCall();
         stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
@@ -311,7 +318,7 @@ public class StandbyTaskTest {
         EasyMock.expect(stateManager.changelogOffsets())
             .andReturn(Collections.singletonMap(partition, 50L))
             .andReturn(Collections.singletonMap(partition, 50L))
-            .andReturn(Collections.singletonMap(partition, 60L));
+            .andReturn(Collections.singletonMap(partition, 60L)).anyTimes();
         stateManager.flush();
         EasyMock.expectLastCall();
         stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
@@ -321,22 +328,21 @@ public class StandbyTaskTest {
         task = createStandbyTask();
         task.initializeIfNeeded();
 
-        assertTrue(task.commitNeeded());
-
-        task.prepareCommit();
-        task.postCommit();
-
         // do not need to commit if there's no update
         assertFalse(task.commitNeeded());
 
         // could commit if the offset advanced
         assertTrue(task.commitNeeded());
 
+        task.prepareCommit();
+        task.postCommit();
+
         EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldThrowOnCloseCleanError() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.close();
         EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
@@ -360,6 +366,9 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldThrowOnCloseCleanCheckpointError() {
+        EasyMock.expect(stateManager.changelogOffsets())
+            .andReturn(Collections.emptyMap())
+            .andReturn(Collections.singletonMap(partition, 0L));
         stateManager.checkpoint(EasyMock.anyObject());
         EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
@@ -385,6 +394,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldUnregisterMetricsInCloseClean() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.replay(stateManager);
 
@@ -400,6 +410,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldUnregisterMetricsInCloseDirty() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.replay(stateManager);
 
@@ -498,6 +509,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldRecycleTask() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.recycle();
         EasyMock.replay(stateManager);
 
@@ -528,6 +540,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldAlwaysSuspendRunningTasks() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         EasyMock.replay(stateManager);
         task = createStandbyTask();
         task.initializeIfNeeded();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 9607470..facbf2e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -82,7 +82,10 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
@@ -1203,32 +1206,6 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldCommitWhenSuspending() throws IOException {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
-        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
 10L)).anyTimes();
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        
stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition,
 10L)));
-        EasyMock.expectLastCall();
-        EasyMock.replay(recordCollector, stateDirectory, stateManager);
-
-        task = createStatefulTask(createConfig(false, "100"), true);
-
-        task.initializeIfNeeded();
-        task.completeRestoration();
-
-        task.suspend();
-        task.prepareCommit();
-        task.postCommit();
-
-        assertEquals(Task.State.SUSPENDED, task.state());
-        assertTrue(source1.closed);
-        assertTrue(source2.closed);
-
-        EasyMock.verify(stateManager);
-    }
-
-    @Test
     public void 
shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() throws 
IOException {
         stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
         EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
@@ -1469,7 +1446,6 @@ public class StreamTaskTest {
     @Test
     public void shouldThrowIfCommittingOnIllegalState() {
         task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
-        assertThrows(IllegalStateException.class, task::prepareCommit);
 
         task.transitionTo(Task.State.SUSPENDED);
         task.transitionTo(Task.State.CLOSED);
@@ -1479,7 +1455,6 @@ public class StreamTaskTest {
     @Test
     public void shouldThrowIfPostCommittingOnIllegalState() {
         task = createStatelessTask(createConfig(false, "100"), 
StreamsConfig.METRICS_LATEST);
-        assertThrows(IllegalStateException.class, task::postCommit);
 
         task.transitionTo(Task.State.SUSPENDED);
         task.transitionTo(Task.State.CLOSED);
@@ -1487,6 +1462,61 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldSkipCheckpointingSuspendedCreatedTask() {
+        stateManager.checkpoint(EasyMock.anyObject());
+        EasyMock.expectLastCall().andThrow(new AssertionError("Should not have 
tried to checkpoint"));
+        EasyMock.replay(stateManager);
+
+        task = createStatefulTask(createConfig(false, "100"), true);
+        task.suspend();
+        task.postCommit();
+    }
+
+    @Test
+    public void shouldCheckpointWithEmptyOffsetsForSuspendedRestoringTask() {
+        stateManager.checkpoint(emptyMap());
+        EasyMock.replay(stateManager);
+
+        task = createStatefulTask(createConfig(false, "100"), true);
+        task.initializeIfNeeded();
+        task.suspend();
+        task.postCommit();
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void 
shouldCheckpointWithEmptyOffsetsForSuspendedRunningTaskWithNoCommitNeeded() {
+        stateManager.checkpoint(emptyMap());
+        EasyMock.replay(stateManager);
+
+        task = createStatefulTask(createConfig(false, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration();
+        task.suspend();
+        task.postCommit();
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void 
shouldCheckpointTheConsumedOffsetsForSuspendedRunningTaskWithCommitNeeded() {
+        final Map<TopicPartition, Long> checkpointableOffsets = 
singletonMap(partition1, 0L);
+        stateManager.checkpoint(EasyMock.eq(checkpointableOffsets));
+        
EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).anyTimes();
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(false, "0"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration();
+        task.addRecords(partition1, singleton(getConsumerRecord(partition1, 
10)));
+        task.process(100L);
+        assertTrue(task.commitNeeded());
+
+        task.suspend();
+        task.postCommit();
+        EasyMock.verify(stateManager, recordCollector);
+    }
+
+    @Test
     public void shouldReturnStateManagerChangelogOffsets() {
         
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition1,
 50L)).anyTimes();
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition1)).anyTimes();
@@ -1560,7 +1590,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotCommitOnCloseRestoring() {
+    public void shouldCheckpointOnCloseRestoring() {
         stateManager.flush();
         EasyMock.expectLastCall();
         stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
@@ -1584,30 +1614,33 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldCommitOnCloseClean() {
+    public void shouldCheckpointOffsetsOnPostCommitIfCommitNeeded() {
         final long offset = 543L;
+        final long consumedOffset = 345L;
 
         
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
 offset)).anyTimes();
-        stateManager.close();
-        EasyMock.expectLastCall();
-        
stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition,
 offset)));
         EasyMock.expectLastCall();
+        stateManager.checkpoint(EasyMock.eq(mkMap(
+            mkEntry(changelogPartition, offset),
+            mkEntry(partition1, consumedOffset)
+        )));
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.replay(recordCollector, stateManager);
         final MetricName metricName = setupCloseTaskMetric();
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
+        task = createOptimizedStatefulTask(createConfig(false, "0"), consumer);
         task.initializeIfNeeded();
         task.completeRestoration();
+
+        task.addRecords(partition1, 
singletonList(getConsumerRecord(partition1, consumedOffset)));
+        task.process(100L);
+        assertTrue(task.commitNeeded());
+
         task.suspend();
         task.prepareCommit();
         task.postCommit();
-        task.closeClean();
-
-        assertEquals(Task.State.CLOSED, task.state());
 
-        final double expectedCloseTaskMetric = 1.0;
-        verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
+        assertEquals(Task.State.SUSPENDED, task.state());
 
         EasyMock.verify(stateManager);
     }
@@ -1616,8 +1649,8 @@ public class StreamTaskTest {
     public void shouldSwallowExceptionOnCloseCleanError() {
         final long offset = 543L;
 
-        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
 offset)).anyTimes();
-        
stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition,
 offset)));
+        
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
+        
stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(partition1, 
offset)));
         EasyMock.expectLastCall();
         
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition)).anyTimes();
         stateManager.close();
@@ -1629,6 +1662,10 @@ public class StreamTaskTest {
         task.initializeIfNeeded();
         task.completeRestoration();
 
+        task.addRecords(partition1, 
singletonList(getConsumerRecord(partition1, offset)));
+        task.process(100L);
+        assertTrue(task.commitNeeded());
+
         task.suspend();
         task.prepareCommit();
         task.postCommit();
@@ -1679,9 +1716,8 @@ public class StreamTaskTest {
     @Test
     public void shouldThrowOnCloseCleanCheckpointError() {
         final long offset = 543L;
-
-        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
 offset));
-        stateManager.checkpoint(Collections.singletonMap(changelogPartition, 
offset));
+        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap());
+        stateManager.checkpoint(Collections.singletonMap(partition1, offset));
         EasyMock.expectLastCall().andThrow(new 
ProcessorStateException("KABOOM!")).anyTimes();
         stateManager.close();
         EasyMock.expectLastCall().andThrow(new AssertionError("Close should 
not be called!")).anyTimes();
@@ -1692,6 +1728,10 @@ public class StreamTaskTest {
         task = createOptimizedStatefulTask(createConfig(false, "100"), 
consumer);
         task.initializeIfNeeded();
 
+        task.addRecords(partition1, 
singletonList(getConsumerRecord(partition1, offset)));
+        task.process(100L);
+        assertTrue(task.commitNeeded());
+
         task.suspend();
         task.prepareCommit();
         assertThrows(ProcessorStateException.class, () -> task.postCommit());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 23166d1..36730b5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -79,6 +79,7 @@ import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.common.utils.Utils.union;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.eq;
@@ -414,11 +415,10 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldCloseDirtyActiveUnassignedTasksWhenErrorSuspendingTask() 
{
+    public void 
shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true) {
             @Override
-            public void suspend() {
-                super.suspend();
+            public void closeClean() {
                 throw new RuntimeException("KABOOM!");
             }
         };
@@ -436,6 +436,7 @@ public class TaskManagerTest {
         replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, 
consumer, changeLogReader);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
+        taskManager.handleRevocation(taskId00Partitions);
 
         final RuntimeException thrown = assertThrows(
             RuntimeException.class,
@@ -536,6 +537,8 @@ public class TaskManagerTest {
         assertThat(taskManager.tryToCompleteRestoration(), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
 
+        taskManager.handleRevocation(taskId00Partitions);
+
         final RuntimeException thrown = assertThrows(
             RuntimeException.class,
             () -> taskManager.handleAssignment(emptyMap(), emptyMap())
@@ -1399,7 +1402,7 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldCloseActiveTasksDirtyAndPropagateSuspendException() {
+    public void 
shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() {
         setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);
 
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
true);
@@ -1418,21 +1421,15 @@ public class TaskManagerTest {
         taskManager.tasks().put(taskId01, task01);
         taskManager.tasks().put(taskId02, task02);
 
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId02);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01);
-
         replay(activeTaskCreator);
 
         final RuntimeException thrown = assertThrows(RuntimeException.class,
-            () -> taskManager.handleAssignment(mkMap(mkEntry(taskId00, 
taskId00Partitions)), Collections.emptyMap()));
+            () -> taskManager.handleRevocation(union(HashSet::new, 
taskId01Partitions, taskId02Partitions)));
         assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend 
boom!"));
 
         assertThat(task00.state(), is(Task.State.CREATED));
-        assertThat(task01.state(), is(Task.State.CLOSED));
-        assertThat(task02.state(), is(Task.State.CLOSED));
-
-        // All the tasks involving in the commit should already be removed.
-        assertThat(taskManager.tasks(), is(Collections.singletonMap(taskId00, 
task00)));
+        assertThat(task01.state(), is(Task.State.SUSPENDED));
+        assertThat(task02.state(), is(Task.State.SUSPENDED));
 
         verify(activeTaskCreator);
     }
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7615612..31659bb 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1175,6 +1175,8 @@ public class TopologyTestDriver implements Closeable {
     public void close() {
         if (task != null) {
             task.suspend();
+            task.prepareCommit();
+            task.postCommit();
             task.closeClean();
         }
         if (globalStateTask != null) {

Reply via email to