This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1f9aa01   KAFKA-7672 : force write checkpoint during StreamTask 
#suspend (#6115)
1f9aa01 is described below

commit 1f9aa01a5b3b59d90499a059d719af03483d5130
Author: Boyang Chen <bche...@outlook.com>
AuthorDate: Fri Feb 22 21:49:28 2019 -0800

     KAFKA-7672 : force write checkpoint during StreamTask #suspend (#6115)
    
    This fix is aiming for #2 issue pointed out within 
https://issues.apache.org/jira/browse/KAFKA-7672
    In the current setup, we do offset checkpoint file write when EOS is turned 
on during #suspend, which introduces the potential race condition during 
StateManager #closeSuspend call. To mitigate the problem, we attempt to always 
write checkpoint file in #suspend call.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax 
<mj...@apache.org>,  John Roesler <j...@confluent.io>, Bill Bejeck 
<bbej...@gmail.com>
---
 .../processor/internals/AbstractStateManager.java  |  2 +-
 .../streams/processor/internals/AbstractTask.java  |  5 ++--
 .../internals/GlobalStateManagerImpl.java          |  3 +--
 .../processor/internals/GlobalStateUpdateTask.java |  2 +-
 .../processor/internals/ProcessorStateManager.java | 23 ++++++++++++++----
 .../streams/processor/internals/StandbyTask.java   |  6 +----
 .../streams/processor/internals/StateManager.java  |  3 +--
 .../processor/internals/StoreChangelogReader.java  |  4 ++++
 .../streams/processor/internals/StreamTask.java    | 14 +++++++++++
 .../internals/GlobalStateManagerImplTest.java      | 28 +++++++---------------
 .../processor/internals/GlobalStateTaskTest.java   |  5 ++--
 .../internals/ProcessorStateManagerTest.java       | 25 +++++++++----------
 .../processor/internals/StandbyTaskTest.java       |  3 ++-
 .../processor/internals/StateManagerStub.java      |  2 +-
 .../apache/kafka/test/GlobalStateManagerStub.java  |  2 +-
 15 files changed, 71 insertions(+), 56 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index 36482aa..c306468 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -41,7 +41,7 @@ abstract class AbstractStateManager implements StateManager {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
     final File baseDir;
-    private final boolean eosEnabled;
+    final boolean eosEnabled;
     OffsetCheckpoint checkpoint;
 
     final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 1838eb7..3387801 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -240,14 +240,13 @@ public abstract class AbstractTask implements Task {
     }
 
     /**
-     * @param writeCheckpoint boolean indicating if a checkpoint file should 
be written
      * @throws ProcessorStateException if there is an error while closing the 
state manager
      */
-    void closeStateManager(final boolean writeCheckpoint) throws 
ProcessorStateException {
+    void closeStateManager(final boolean clean) throws ProcessorStateException 
{
         ProcessorStateException exception = null;
         log.trace("Closing state manager");
         try {
-            stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() 
: null);
+            stateMgr.close(clean);
         } catch (final ProcessorStateException e) {
             exception = e;
         } finally {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 48319a5..7682814 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -318,7 +318,7 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
 
 
     @Override
-    public void close(final Map<TopicPartition, Long> offsets) throws 
IOException {
+    public void close(final boolean clean) throws IOException {
         try {
             if (globalStores.isEmpty()) {
                 return;
@@ -341,7 +341,6 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
             if (closeFailed.length() > 0) {
                 throw new ProcessorStateException("Exceptions caught during 
close of 1 or more global state globalStores\n" + closeFailed);
             }
-            checkpoint(offsets);
         } finally {
             stateDirectory.unlockGlobalState();
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 202fa36..29c861e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -105,7 +105,7 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
     }
 
     public void close() throws IOException {
-        stateMgr.close(offsets);
+        stateMgr.close(true);
     }
 
     private void initTopology() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index f75f185..5c54ee7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -89,6 +89,9 @@ public class ProcessorStateManager extends 
AbstractStateManager {
 
         // load the checkpoint information
         checkpointableOffsets.putAll(checkpoint.read());
+
+        log.trace("Checkpointable offsets read from checkpoint: {}", 
checkpointableOffsets);
+
         if (eosEnabled) {
             // delete the checkpoint file after finish loading its stored 
offsets
             checkpoint.delete();
@@ -140,7 +143,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
             restoreCallbacks.put(topic, stateRestoreCallback);
             recordConverters.put(topic, recordConverter);
         } else {
-            log.trace("Restoring state store {} from changelog topic {}", 
storeName, topic);
+            log.trace("Restoring state store {} from changelog topic {} at 
checkpoint {}", storeName, topic, checkpointableOffsets.get(storePartition));
 
             final StateRestorer restorer = new StateRestorer(
                 storePartition,
@@ -254,7 +257,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
      * @throws ProcessorStateException if any error happens when closing the 
state stores
      */
     @Override
-    public void close(final Map<TopicPartition, Long> ackedOffsets) throws 
ProcessorStateException {
+    public void close(final boolean clean) throws ProcessorStateException {
         ProcessorStateException firstException = null;
         // attempting to close the stores, just in case they
         // are not closed by a ProcessorNode yet
@@ -271,11 +274,17 @@ public class ProcessorStateManager extends 
AbstractStateManager {
                     log.error("Failed to close state store {}: ", 
store.name(), e);
                 }
             }
+            stores.clear();
+        }
 
-            if (ackedOffsets != null) {
-                checkpoint(ackedOffsets);
+        if (!clean && eosEnabled && checkpoint != null) {
+            // delete the checkpoint file if this is an unclean close
+            try {
+                checkpoint.delete();
+                checkpoint = null;
+            } catch (final IOException e) {
+                throw new ProcessorStateException(String.format("%sError while 
deleting the checkpoint file", logPrefix), e);
             }
-            stores.clear();
         }
 
         if (firstException != null) {
@@ -287,6 +296,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
     @Override
     public void checkpoint(final Map<TopicPartition, Long> 
checkpointableOffsets) {
         this.checkpointableOffsets.putAll(changelogReader.restoredOffsets());
+        log.trace("Checkpointable offsets updated with restored offsets: {}", 
this.checkpointableOffsets);
         for (final StateStore store : stores.values()) {
             final String storeName = store.name();
             // only checkpoint the offset to the offsets file if
@@ -302,6 +312,9 @@ public class ProcessorStateManager extends 
AbstractStateManager {
                 }
             }
         }
+
+        log.trace("Checkpointable offsets updated with active acked offsets: 
{}", this.checkpointableOffsets);
+
         // write the checkpoint file before closing
         if (checkpoint == null) {
             checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index bd8ba34..749b2ed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -127,8 +127,6 @@ public class StandbyTask extends AbstractTask {
      * - {@link #commit()}
      * - close state
      * <pre>
-     * @param clean ignored by {@code StandbyTask} as it can always try to 
close cleanly
-     *              (ie, commit, flush, and write checkpoint file)
      * @param isZombie ignored by {@code StandbyTask} as it can never be a 
zombie
      */
     @Override
@@ -138,14 +136,12 @@ public class StandbyTask extends AbstractTask {
             return;
         }
         log.debug("Closing");
-        boolean committedSuccessfully = false;
         try {
             if (clean) {
                 commit();
-                committedSuccessfully = true;
             }
         } finally {
-            closeStateManager(committedSuccessfully);
+            closeStateManager(true);
         }
 
         taskClosed = true;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index f6efde6..74cf12b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Map;
 
 interface StateManager extends Checkpointable {
     File baseDir();
@@ -41,7 +40,7 @@ interface StateManager extends Checkpointable {
     void reinitializeStateStoresForPartitions(final Collection<TopicPartition> 
partitions,
                                               final InternalProcessorContext 
processorContext);
 
-    void close(final Map<TopicPartition, Long> offsets) throws IOException;
+    void close(final boolean clean) throws IOException;
 
     StateStore getGlobalStore(final String name);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 59fb36f..c8a9842 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -195,6 +195,8 @@ public class StoreChangelogReader implements 
ChangelogReader {
         for (final TopicPartition partition : initialized) {
             final StateRestorer restorer = stateRestorers.get(partition);
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
+                log.trace("Found checkpoint {} from changelog {} for store 
{}.", restorer.checkpoint(), partition, restorer.storeName());
+
                 restoreConsumer.seek(partition, restorer.checkpoint());
                 logRestoreOffsets(partition,
                         restorer.checkpoint(),
@@ -202,6 +204,8 @@ public class StoreChangelogReader implements 
ChangelogReader {
                 
restorer.setStartingOffset(restoreConsumer.position(partition));
                 restorer.restoreStarted();
             } else {
+                log.trace("Did not find checkpoint from changelog {} for store 
{}, rewinding to beginning.", partition, restorer.storeName());
+
                 
restoreConsumer.seekToBeginning(Collections.singletonList(partition));
                 needsPositionUpdate.add(restorer);
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c4fecef..6dbf394 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
@@ -45,6 +46,7 @@ import 
org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -296,6 +298,15 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
             producer = producerSupplier.get();
             producer.initTransactions();
             recordCollector.init(producer);
+
+            if (stateMgr.checkpoint != null) {
+                try {
+                    stateMgr.checkpoint.delete();
+                    stateMgr.checkpoint = null;
+                } catch (final IOException e) {
+                    throw new ProcessorStateException(String.format("%sError 
while deleting the checkpoint file", logPrefix), e);
+                }
+            }
         }
     }
 
@@ -567,6 +578,9 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
                 commit(false);
             } finally {
                 if (eosEnabled) {
+
+                    stateMgr.checkpoint(activeTaskCheckpointableOffsets());
+
                     try {
                         recordCollector.close();
                     } catch (final ProducerFencedException e) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 809a0b6..5dd5f86 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -371,22 +371,11 @@ public class GlobalStateManagerImplTest {
         initializeConsumer(1, 0, t2);
         stateManager.register(store2, stateRestoreCallback);
 
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
         assertFalse(store1.isOpen());
         assertFalse(store2.isOpen());
     }
 
-    @Test
-    public void shouldWriteCheckpointsOnClose() throws IOException {
-        stateManager.initialize();
-        initializeConsumer(1, 0, t1);
-        stateManager.register(store1, stateRestoreCallback);
-        final Map<TopicPartition, Long> expected = 
Collections.singletonMap(t1, 25L);
-        stateManager.close(expected);
-        final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
-        assertEquals(expected, result);
-    }
-
     @Test(expected = ProcessorStateException.class)
     public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() 
throws IOException {
         stateManager.initialize();
@@ -398,7 +387,7 @@ public class GlobalStateManagerImplTest {
             }
         }, stateRestoreCallback);
 
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
     }
 
     @Test
@@ -415,7 +404,7 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
         stateManager.initialize();
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
         final StateDirectory stateDir = new StateDirectory(streamsConfig, new 
MockTime(), true);
         try {
             // should be able to get the lock now as it should've been 
released in close
@@ -438,9 +427,9 @@ public class GlobalStateManagerImplTest {
                 super.close();
             }
         }, stateRestoreCallback);
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
 
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
     }
 
     @Test
@@ -460,7 +449,7 @@ public class GlobalStateManagerImplTest {
         stateManager.register(store2, stateRestoreCallback);
 
         try {
-            stateManager.close(Collections.emptyMap());
+            stateManager.close(true);
         } catch (final ProcessorStateException e) {
             // expected
         }
@@ -539,7 +528,8 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize();
         initializeConsumer(10, 0, t1);
         stateManager.register(store1, stateRestoreCallback);
-        stateManager.close(Collections.emptyMap());
+        stateManager.checkpoint(Collections.emptyMap());
+        stateManager.close(true);
 
         final Map<TopicPartition, Long> checkpointMap = 
stateManager.checkpointed();
         assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 10L)));
@@ -551,7 +541,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize();
         initializeConsumer(10, 0, t3);
         stateManager.register(store3, stateRestoreCallback);
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
 
         assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap()));
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 53d374b..9001255 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -204,15 +204,14 @@ public class GlobalStateTaskTest {
 
 
     @Test
-    public void shouldCloseStateManagerWithOffsets() throws IOException {
+    public void shouldFlushStateManagerWithOffsets() throws IOException {
         final Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
         expectedOffsets.put(t1, 52L);
         expectedOffsets.put(t2, 100L);
         globalStateTask.initialize();
         globalStateTask.update(new ConsumerRecord<>(topic1, 1, 51, 
"foo".getBytes(), "foo".getBytes()));
-        globalStateTask.close();
+        globalStateTask.flushState();
         assertEquals(expectedOffsets, stateMgr.checkpointed());
-        assertTrue(stateMgr.closed);
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 119790f..7d82982 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -130,7 +130,7 @@ public class ProcessorStateManagerTest {
             assertThat(batchingRestoreCallback.getRestoredRecords().size(), 
is(1));
             
assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -153,7 +153,7 @@ public class ProcessorStateManagerTest {
             assertTrue(persistentStore.keys.contains(intKey));
             assertEquals(9, persistentStore.values.get(0).length);
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -176,7 +176,7 @@ public class ProcessorStateManagerTest {
             assertTrue(persistentStore.keys.contains(intKey));
             assertEquals(17, persistentStore.values.get(0).length);
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -204,7 +204,7 @@ public class ProcessorStateManagerTest {
             stateMgr.register(persistentStore, 
persistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new 
TopicPartition(persistentStoreTopicName, 2)));
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -231,7 +231,7 @@ public class ProcessorStateManagerTest {
             stateMgr.register(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new 
TopicPartition(nonPersistentStoreTopicName, 2)));
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -292,7 +292,7 @@ public class ProcessorStateManagerTest {
             assertEquals(-1L, (long) changeLogOffsets.get(partition3));
 
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -315,7 +315,7 @@ public class ProcessorStateManagerTest {
             assertEquals(mockKeyValueStore, 
stateMgr.getStore(nonPersistentStoreName));
 
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -352,7 +352,8 @@ public class ProcessorStateManagerTest {
         } finally {
             // close the state manager with the ack'ed offsets
             stateMgr.flush();
-            stateMgr.close(ackedOffsets);
+            stateMgr.checkpoint(ackedOffsets);
+            stateMgr.close(true);
         }
         // make sure all stores are closed, and the checkpoint file is written.
         assertTrue(persistentStore.flushed);
@@ -398,7 +399,7 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
         stateMgr.register(persistentStore, 
persistentStore.stateRestoreCallback);
-        stateMgr.close(null);
+        stateMgr.close(true);
         final Map<TopicPartition, Long> read = checkpoint.read();
         assertThat(read, equalTo(offsets));
     }
@@ -583,7 +584,7 @@ public class ProcessorStateManagerTest {
         stateManager.register(stateStore, stateStore.stateRestoreCallback);
 
         try {
-            stateManager.close(Collections.emptyMap());
+            stateManager.close(true);
             fail("Should throw ProcessorStateException if store close throws 
exception");
         } catch (final ProcessorStateException e) {
             // pass
@@ -696,7 +697,7 @@ public class ProcessorStateManagerTest {
         stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
 
         try {
-            stateManager.close(Collections.emptyMap());
+            stateManager.close(true);
         } catch (final ProcessorStateException expected) { /* ignode */ }
         Assert.assertTrue(closedStore.get());
     }
@@ -721,7 +722,7 @@ public class ProcessorStateManagerTest {
             assertFalse(checkpointFile.exists());
         } finally {
             if (stateManager != null) {
-                stateManager.close(null);
+                stateManager.close(true);
             }
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 52599c6..a57c407 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -366,6 +366,7 @@ public class StandbyTaskTest {
             singletonList(makeWindowedConsumerRecord(changelogName, 10, 1, 0L, 
60_000L))
         );
 
+        task.suspend();
         task.closeStateManager(true);
 
         final File taskDir = stateDirectory.directoryForTask(taskId);
@@ -592,7 +593,7 @@ public class StandbyTaskTest {
             }
 
             @Override
-            void closeStateManager(final boolean writeCheckpoint) throws 
ProcessorStateException {
+            void closeStateManager(final boolean clean) throws 
ProcessorStateException {
                 closedStateManager.set(true);
             }
         };
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
index b14731d..37b44d3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
@@ -45,7 +45,7 @@ public class StateManagerStub implements StateManager {
     public void flush() {}
 
     @Override
-    public void close(final Map<TopicPartition, Long> offsets) throws 
IOException {}
+    public void close(final boolean clean) throws IOException {}
 
     @Override
     public StateStore getGlobalStore(final String name) {
diff --git 
a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java 
b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index a97a91e..ccb1762 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -66,7 +66,7 @@ public class GlobalStateManagerStub implements 
GlobalStateManager {
     public void flush() {}
 
     @Override
-    public void close(final Map<TopicPartition, Long> offsets) throws 
IOException {
+    public void close(final boolean clean) throws IOException {
         this.offsets.putAll(offsets);
         closed = true;
     }

Reply via email to