This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1f9aa01 KAFKA-7672 : force write checkpoint during StreamTask #suspend (#6115) 1f9aa01 is described below commit 1f9aa01a5b3b59d90499a059d719af03483d5130 Author: Boyang Chen <bche...@outlook.com> AuthorDate: Fri Feb 22 21:49:28 2019 -0800 KAFKA-7672 : force write checkpoint during StreamTask #suspend (#6115) This fix is aiming for #2 issue pointed out within https://issues.apache.org/jira/browse/KAFKA-7672 In the current setup, we do offset checkpoint file write when EOS is turned on during #suspend, which introduces the potential race condition during StateManager #closeSuspend call. To mitigate the problem, we attempt to always write checkpoint file in #suspend call. Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <mj...@apache.org>, John Roesler <j...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../processor/internals/AbstractStateManager.java | 2 +- .../streams/processor/internals/AbstractTask.java | 5 ++-- .../internals/GlobalStateManagerImpl.java | 3 +-- .../processor/internals/GlobalStateUpdateTask.java | 2 +- .../processor/internals/ProcessorStateManager.java | 23 ++++++++++++++---- .../streams/processor/internals/StandbyTask.java | 6 +---- .../streams/processor/internals/StateManager.java | 3 +-- .../processor/internals/StoreChangelogReader.java | 4 ++++ .../streams/processor/internals/StreamTask.java | 14 +++++++++++ .../internals/GlobalStateManagerImplTest.java | 28 +++++++--------------- .../processor/internals/GlobalStateTaskTest.java | 5 ++-- .../internals/ProcessorStateManagerTest.java | 25 +++++++++---------- .../processor/internals/StandbyTaskTest.java | 3 ++- .../processor/internals/StateManagerStub.java | 2 +- .../apache/kafka/test/GlobalStateManagerStub.java | 2 +- 15 files changed, 71 insertions(+), 56 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java index 36482aa..c306468 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java @@ -41,7 +41,7 @@ abstract class AbstractStateManager implements StateManager { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; final File baseDir; - private final boolean eosEnabled; + final boolean eosEnabled; OffsetCheckpoint checkpoint; final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 1838eb7..3387801 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -240,14 +240,13 @@ public abstract class AbstractTask implements Task { } /** - * @param writeCheckpoint boolean indicating if a checkpoint file should be written * @throws ProcessorStateException if there is an error while closing the state manager */ - void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException { + void closeStateManager(final boolean clean) throws ProcessorStateException { ProcessorStateException exception = null; log.trace("Closing state manager"); try { - stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null); + stateMgr.close(clean); } catch (final ProcessorStateException e) { exception = e; } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 48319a5..7682814 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -318,7 +318,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @Override - public void close(final Map<TopicPartition, Long> offsets) throws IOException { + public void close(final boolean clean) throws IOException { try { if (globalStores.isEmpty()) { return; @@ -341,7 +341,6 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob if (closeFailed.length() > 0) { throw new ProcessorStateException("Exceptions caught during close of 1 or more global state globalStores\n" + closeFailed); } - checkpoint(offsets); } finally { stateDirectory.unlockGlobalState(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 202fa36..29c861e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -105,7 +105,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { } public void close() throws IOException { - stateMgr.close(offsets); + stateMgr.close(true); } private void initTopology() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index f75f185..5c54ee7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -89,6 +89,9 @@ public class ProcessorStateManager extends AbstractStateManager { // load the checkpoint information checkpointableOffsets.putAll(checkpoint.read()); + + log.trace("Checkpointable offsets read from checkpoint: {}", checkpointableOffsets); + if (eosEnabled) { // delete the checkpoint file after finish loading its stored offsets checkpoint.delete(); @@ -140,7 +143,7 @@ public class ProcessorStateManager extends AbstractStateManager { restoreCallbacks.put(topic, stateRestoreCallback); recordConverters.put(topic, recordConverter); } else { - log.trace("Restoring state store {} from changelog topic {}", storeName, topic); + log.trace("Restoring state store {} from changelog topic {} at checkpoint {}", storeName, topic, checkpointableOffsets.get(storePartition)); final StateRestorer restorer = new StateRestorer( storePartition, @@ -254,7 +257,7 @@ public class ProcessorStateManager extends AbstractStateManager { * @throws ProcessorStateException if any error happens when closing the state stores */ @Override - public void close(final Map<TopicPartition, Long> ackedOffsets) throws ProcessorStateException { + public void close(final boolean clean) throws ProcessorStateException { ProcessorStateException firstException = null; // attempting to close the stores, just in case they // are not closed by a ProcessorNode yet @@ -271,11 +274,17 @@ public class ProcessorStateManager extends AbstractStateManager { log.error("Failed to close state store {}: ", store.name(), e); } } + stores.clear(); + } - if (ackedOffsets != null) { - checkpoint(ackedOffsets); + if (!clean && eosEnabled && checkpoint != null) { + // delete the checkpoint file if this is an unclean close + try { + checkpoint.delete(); + checkpoint = null; + } catch (final IOException e) { + throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", logPrefix), e); } - stores.clear(); } if (firstException != null) { @@ -287,6 +296,7 @@ public class ProcessorStateManager extends AbstractStateManager { @Override public void checkpoint(final Map<TopicPartition, Long> checkpointableOffsets) { this.checkpointableOffsets.putAll(changelogReader.restoredOffsets()); + log.trace("Checkpointable offsets updated with restored offsets: {}", this.checkpointableOffsets); for (final StateStore store : stores.values()) { final String storeName = store.name(); // only checkpoint the offset to the offsets file if @@ -302,6 +312,9 @@ public class ProcessorStateManager extends AbstractStateManager { } } } + + log.trace("Checkpointable offsets updated with active acked offsets: {}", this.checkpointableOffsets); + // write the checkpoint file before closing if (checkpoint == null) { checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index bd8ba34..749b2ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -127,8 +127,6 @@ public class StandbyTask extends AbstractTask { * - {@link #commit()} * - close state * <pre> - * @param clean ignored by {@code StandbyTask} as it can always try to close cleanly - * (ie, commit, flush, and write checkpoint file) * @param isZombie ignored by {@code StandbyTask} as it can never be a zombie */ @Override @@ -138,14 +136,12 @@ public class StandbyTask extends AbstractTask { return; } log.debug("Closing"); - boolean committedSuccessfully = false; try { if (clean) { commit(); - committedSuccessfully = true; } } finally { - closeStateManager(committedSuccessfully); + closeStateManager(true); } taskClosed = true; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java index f6efde6..74cf12b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java @@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.StateStore; import java.io.File; import java.io.IOException; import java.util.Collection; -import java.util.Map; interface StateManager extends Checkpointable { File baseDir(); @@ -41,7 +40,7 @@ interface StateManager extends Checkpointable { void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions, final InternalProcessorContext processorContext); - void close(final Map<TopicPartition, Long> offsets) throws IOException; + void close(final boolean clean) throws IOException; StateStore getGlobalStore(final String name); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 59fb36f..c8a9842 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -195,6 +195,8 @@ public class StoreChangelogReader implements ChangelogReader { for (final TopicPartition partition : initialized) { final StateRestorer restorer = stateRestorers.get(partition); if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) { + log.trace("Found checkpoint {} from changelog {} for store {}.", restorer.checkpoint(), partition, restorer.storeName()); + restoreConsumer.seek(partition, restorer.checkpoint()); logRestoreOffsets(partition, restorer.checkpoint(), @@ -202,6 +204,8 @@ public class StoreChangelogReader implements ChangelogReader { restorer.setStartingOffset(restoreConsumer.position(partition)); restorer.restoreStarted(); } else { + log.trace("Did not find checkpoint from changelog {} for store {}, rewinding to beginning.", partition, restorer.storeName()); + restoreConsumer.seekToBeginning(Collections.singletonList(partition)); needsPositionUpdate.add(restorer); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index c4fecef..6dbf394 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; @@ -45,6 +46,7 @@ import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -296,6 +298,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator producer = producerSupplier.get(); producer.initTransactions(); recordCollector.init(producer); + + if (stateMgr.checkpoint != null) { + try { + stateMgr.checkpoint.delete(); + stateMgr.checkpoint = null; + } catch (final IOException e) { + throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", logPrefix), e); + } + } } } @@ -567,6 +578,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator commit(false); } finally { if (eosEnabled) { + + stateMgr.checkpoint(activeTaskCheckpointableOffsets()); + try { recordCollector.close(); } catch (final ProducerFencedException e) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 809a0b6..5dd5f86 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -371,22 +371,11 @@ public class GlobalStateManagerImplTest { initializeConsumer(1, 0, t2); stateManager.register(store2, stateRestoreCallback); - stateManager.close(Collections.emptyMap()); + stateManager.close(true); assertFalse(store1.isOpen()); assertFalse(store2.isOpen()); } - @Test - public void shouldWriteCheckpointsOnClose() throws IOException { - stateManager.initialize(); - initializeConsumer(1, 0, t1); - stateManager.register(store1, stateRestoreCallback); - final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L); - stateManager.close(expected); - final Map<TopicPartition, Long> result = readOffsetsCheckpoint(); - assertEquals(expected, result); - } - @Test(expected = ProcessorStateException.class) public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException { stateManager.initialize(); @@ -398,7 +387,7 @@ public class GlobalStateManagerImplTest { } }, stateRestoreCallback); - stateManager.close(Collections.emptyMap()); + stateManager.close(true); } @Test @@ -415,7 +404,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException { stateManager.initialize(); - stateManager.close(Collections.emptyMap()); + stateManager.close(true); final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime(), true); try { // should be able to get the lock now as it should've been released in close @@ -438,9 +427,9 @@ public class GlobalStateManagerImplTest { super.close(); } }, stateRestoreCallback); - stateManager.close(Collections.emptyMap()); + stateManager.close(true); - stateManager.close(Collections.emptyMap()); + stateManager.close(true); } @Test @@ -460,7 +449,7 @@ public class GlobalStateManagerImplTest { stateManager.register(store2, stateRestoreCallback); try { - stateManager.close(Collections.emptyMap()); + stateManager.close(true); } catch (final ProcessorStateException e) { // expected } @@ -539,7 +528,8 @@ public class GlobalStateManagerImplTest { stateManager.initialize(); initializeConsumer(10, 0, t1); stateManager.register(store1, stateRestoreCallback); - stateManager.close(Collections.emptyMap()); + stateManager.checkpoint(Collections.emptyMap()); + stateManager.close(true); final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed(); assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 10L))); @@ -551,7 +541,7 @@ public class GlobalStateManagerImplTest { stateManager.initialize(); initializeConsumer(10, 0, t3); stateManager.register(store3, stateRestoreCallback); - stateManager.close(Collections.emptyMap()); + stateManager.close(true); assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap())); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index 53d374b..9001255 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -204,15 +204,14 @@ public class GlobalStateTaskTest { @Test - public void shouldCloseStateManagerWithOffsets() throws IOException { + public void shouldFlushStateManagerWithOffsets() throws IOException { final Map<TopicPartition, Long> expectedOffsets = new HashMap<>(); expectedOffsets.put(t1, 52L); expectedOffsets.put(t2, 100L); globalStateTask.initialize(); globalStateTask.update(new ConsumerRecord<>(topic1, 1, 51, "foo".getBytes(), "foo".getBytes())); - globalStateTask.close(); + globalStateTask.flushState(); assertEquals(expectedOffsets, stateMgr.checkpointed()); - assertTrue(stateMgr.closed); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 119790f..7d82982 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -130,7 +130,7 @@ public class ProcessorStateManagerTest { assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1)); assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue)); } finally { - stateMgr.close(Collections.emptyMap()); + stateMgr.close(true); } } @@ -153,7 +153,7 @@ public class ProcessorStateManagerTest { assertTrue(persistentStore.keys.contains(intKey)); assertEquals(9, persistentStore.values.get(0).length); } finally { - stateMgr.close(Collections.emptyMap()); + stateMgr.close(true); } } @@ -176,7 +176,7 @@ public class ProcessorStateManagerTest { assertTrue(persistentStore.keys.contains(intKey)); assertEquals(17, persistentStore.values.get(0).length); } finally { - stateMgr.close(Collections.emptyMap()); + stateMgr.close(true); } } @@ -204,7 +204,7 @@ public class ProcessorStateManagerTest { stateMgr.register(persistentStore, persistentStore.stateRestoreCallback); assertTrue(changelogReader.wasRegistered(new TopicPartition(persistentStoreTopicName, 2))); } finally { - stateMgr.close(Collections.emptyMap()); + stateMgr.close(true); } } @@ -231,7 +231,7 @@ public class ProcessorStateManagerTest { stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback); assertTrue(changelogReader.wasRegistered(new TopicPartition(nonPersistentStoreTopicName, 2))); } finally { - stateMgr.close(Collections.emptyMap()); + stateMgr.close(true); } } @@ -292,7 +292,7 @@ public class ProcessorStateManagerTest { assertEquals(-1L, (long) changeLogOffsets.get(partition3)); } finally { - stateMgr.close(Collections.emptyMap()); + stateMgr.close(true); } } @@ -315,7 +315,7 @@ public class ProcessorStateManagerTest { assertEquals(mockKeyValueStore, stateMgr.getStore(nonPersistentStoreName)); } finally { - stateMgr.close(Collections.emptyMap()); + stateMgr.close(true); } } @@ -352,7 +352,8 @@ public class ProcessorStateManagerTest { } finally { // close the state manager with the ack'ed offsets stateMgr.flush(); - stateMgr.close(ackedOffsets); + stateMgr.checkpoint(ackedOffsets); + stateMgr.close(true); } // make sure all stores are closed, and the checkpoint file is written. assertTrue(persistentStore.flushed); @@ -398,7 +399,7 @@ public class ProcessorStateManagerTest { false, logContext); stateMgr.register(persistentStore, persistentStore.stateRestoreCallback); - stateMgr.close(null); + stateMgr.close(true); final Map<TopicPartition, Long> read = checkpoint.read(); assertThat(read, equalTo(offsets)); } @@ -583,7 +584,7 @@ public class ProcessorStateManagerTest { stateManager.register(stateStore, stateStore.stateRestoreCallback); try { - stateManager.close(Collections.emptyMap()); + stateManager.close(true); fail("Should throw ProcessorStateException if store close throws exception"); } catch (final ProcessorStateException e) { // pass @@ -696,7 +697,7 @@ public class ProcessorStateManagerTest { stateManager.register(stateStore2, stateStore2.stateRestoreCallback); try { - stateManager.close(Collections.emptyMap()); + stateManager.close(true); } catch (final ProcessorStateException expected) { /* ignode */ } Assert.assertTrue(closedStore.get()); } @@ -721,7 +722,7 @@ public class ProcessorStateManagerTest { assertFalse(checkpointFile.exists()); } finally { if (stateManager != null) { - stateManager.close(null); + stateManager.close(true); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 52599c6..a57c407 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -366,6 +366,7 @@ public class StandbyTaskTest { singletonList(makeWindowedConsumerRecord(changelogName, 10, 1, 0L, 60_000L)) ); + task.suspend(); task.closeStateManager(true); final File taskDir = stateDirectory.directoryForTask(taskId); @@ -592,7 +593,7 @@ public class StandbyTaskTest { } @Override - void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException { + void closeStateManager(final boolean clean) throws ProcessorStateException { closedStateManager.set(true); } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java index b14731d..37b44d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java @@ -45,7 +45,7 @@ public class StateManagerStub implements StateManager { public void flush() {} @Override - public void close(final Map<TopicPartition, Long> offsets) throws IOException {} + public void close(final boolean clean) throws IOException {} @Override public StateStore getGlobalStore(final String name) { diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java index a97a91e..ccb1762 100644 --- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java @@ -66,7 +66,7 @@ public class GlobalStateManagerStub implements GlobalStateManager { public void flush() {} @Override - public void close(final Map<TopicPartition, Long> offsets) throws IOException { + public void close(final boolean clean) throws IOException { this.offsets.putAll(offsets); closed = true; }