[ https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16615257#comment-16615257 ]
ASF GitHub Bot commented on KAFKA-7192: --------------------------------------- mjsax closed pull request #5641: KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist URL: https://github.com/apache/kafka/pull/5641 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 04af9f269b7..c094b838a1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -193,4 +193,10 @@ public ThreadCache getCache() { public void initialized() { initialized = true; } + + @Override + public void uninitialize() { + initialized = false; + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 7f6ac7ca614..f8f6416b3e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -226,6 +226,10 @@ void initStateStores() { } } + void reinitializeStateStoresForPartitions(final TopicPartition partitions) { + stateMgr.reinitializeStateStoresForPartitions(partitions, processorContext); + } + /** * @throws ProcessorStateException if there is an error while closing the state manager * @param writeCheckpoint boolean indicating if a checkpoint file should be written diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index d59ec2b2c1b..ad4868f8fce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -298,7 +298,7 @@ private void describe(final StringBuilder builder, return suspended.values(); } - Collection<T> restoringTasks() { + public Collection<T> restoringTasks() { return Collections.unmodifiableCollection(restoring.values()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java index 5ebc34c4e92..e82ee2c354f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java @@ -37,7 +37,7 @@ * Restore all registered state stores by reading from their changelogs. * @return all topic partitions that have been restored */ - Collection<TopicPartition> restore(); + Collection<TopicPartition> restore(final Collection<StreamTask> restoringTasks); /** * @return the restored offsets for all persistent stores. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 57bb3ac81a6..b5719b111f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -53,4 +53,9 @@ * Mark this contex as being initialized */ void initialized(); + + /** + * Mark this context as being uninitialized + */ + void uninitialize(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 34a87ce03be..93e7ffc5a06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; @@ -33,9 +34,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; public class ProcessorStateManager implements StateManager { @@ -50,6 +53,7 @@ private final String logPrefix; private final boolean isStandby; private final ChangelogReader changelogReader; + private final boolean eosEnabled; private final Map<String, StateStore> stores; private final Map<String, StateStore> globalStores; private final Map<TopicPartition, Long> offsetLimits; @@ -106,6 +110,7 @@ public ProcessorStateManager(final TaskId taskId, checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); checkpointedOffsets = new HashMap<>(checkpoint.read()); + this.eosEnabled = eosEnabled; if (eosEnabled) { // delete the checkpoint file after finish loading its stored offsets checkpoint.delete(); @@ -169,7 +174,8 @@ public void register(final StateStore store, stateRestoreCallback, checkpointedOffsets.get(storePartition), offsetLimit(storePartition), - store.persistent() + store.persistent(), + store.name() ); changelogReader.register(restorer); @@ -178,6 +184,61 @@ public void register(final StateStore store, stores.put(store.name(), store); } + void reinitializeStateStoresForPartitions(final TopicPartition topicPartition, + final InternalProcessorContext processorContext) { + final Map<String, String> changelogTopicToStore = inverseOneToOneMap(storeToChangelogTopic); + final Set<String> storeToBeReinitialized = new HashSet<>(); + final Map<String, StateStore> storesCopy = new HashMap<>(stores); + + checkpointedOffsets.remove(topicPartition); + storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic())); + + if (!eosEnabled) { + try { + checkpoint.write(checkpointedOffsets); + } catch (final IOException fatalException) { + log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stores, fatalException); + throw new StreamsException("Failed to reinitialize stores.", fatalException); + } + } + + for (final Map.Entry<String, StateStore> entry : storesCopy.entrySet()) { + final StateStore stateStore = entry.getValue(); + final String storeName = stateStore.name(); + if (storeToBeReinitialized.contains(storeName)) { + try { + stateStore.close(); + } catch (final RuntimeException ignoreAndSwallow) { /* ignore */ } + processorContext.uninitialize(); + stores.remove(entry.getKey()); + + try { + Utils.delete(new File(baseDir + File.separator + "rocksdb" + File.separator + storeName)); + } catch (final IOException fatalException) { + log.error("Failed to reinitialize store {}.", storeName, fatalException); + throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException); + } + + try { + Utils.delete(new File(baseDir + File.separator + storeName)); + } catch (final IOException fatalException) { + log.error("Failed to reinitialize store {}.", storeName, fatalException); + throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException); + } + + stateStore.init(processorContext, stateStore); + } + } + } + + private Map<String, String> inverseOneToOneMap(final Map<String, String> origin) { + final Map<String, String> reversedMap = new HashMap<>(); + for (final Map.Entry<String, String> entry : origin.entrySet()) { + reversedMap.put(entry.getValue(), entry.getKey()); + } + return reversedMap; + } + @Override public Map<TopicPartition, Long> checkpointed() { final Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index 79bfd1d0f49..3c5efe8a1e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -22,12 +22,13 @@ public class StateRestorer { static final int NO_CHECKPOINT = -1; - private final Long checkpoint; private final long offsetLimit; private final boolean persistent; + private final String storeName; private final TopicPartition partition; private final StateRestoreCallback stateRestoreCallback; + private long checkpointOffset; private long restoredOffset; private long startingOffset; @@ -35,12 +36,14 @@ final StateRestoreCallback stateRestoreCallback, final Long checkpoint, final long offsetLimit, - final boolean persistent) { + final boolean persistent, + final String storeName) { this.partition = partition; this.stateRestoreCallback = stateRestoreCallback; - this.checkpoint = checkpoint; + this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint; this.offsetLimit = offsetLimit; this.persistent = persistent; + this.storeName = storeName; } public TopicPartition partition() { @@ -48,7 +51,15 @@ public TopicPartition partition() { } long checkpoint() { - return checkpoint == null ? NO_CHECKPOINT : checkpoint; + return checkpointOffset; + } + + void setCheckpointOffset(final long checkpointOffset) { + this.checkpointOffset = checkpointOffset; + } + + public String storeName() { + return storeName; } void restore(final byte[] key, final byte[] value) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 34dcb759d8a..305bf10997d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -60,13 +60,16 @@ public StoreChangelogReader(final Consumer<byte[], byte[]> consumer) { @Override public void register(final StateRestorer restorer) { - stateRestorers.put(restorer.partition(), restorer); + if (!stateRestorers.containsKey(restorer.partition())) { + stateRestorers.put(restorer.partition(), restorer); + log.trace("Added restorer for changelog {}", restorer.partition()); + } needsInitializing.put(restorer.partition(), restorer); } - public Collection<TopicPartition> restore() { + public Collection<TopicPartition> restore(final Collection<StreamTask> restoringTasks) { if (!needsInitializing.isEmpty()) { - initialize(); + initialize(restoringTasks); } if (needsRestoring.isEmpty()) { @@ -87,7 +90,7 @@ public void register(final StateRestorer restorer) { return completed(); } - private void initialize() { + private void initialize(final Collection<StreamTask> restoringTasks) { if (!consumer.subscription().isEmpty()) { throw new IllegalStateException("Restore consumer should not be subscribed to any topics (" + consumer.subscription() + ")"); } @@ -139,11 +142,12 @@ private void initialize() { // set up restorer for those initializable if (!initializable.isEmpty()) { - startRestoration(initializable); + startRestoration(initializable, restoringTasks); } } - private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) { + private void startRestoration(final Map<TopicPartition, StateRestorer> initialized, + final Collection<StreamTask> restoringTasks) { log.debug("{} Start restoring state stores from changelog topics {}", logPrefix, initialized.keySet()); final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment()); @@ -152,24 +156,48 @@ private void startRestoration(final Map<TopicPartition, StateRestorer> initializ final List<StateRestorer> needsPositionUpdate = new ArrayList<>(); for (final StateRestorer restorer : initialized.values()) { + final TopicPartition restoringPartition = restorer.partition(); if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) { - consumer.seek(restorer.partition(), restorer.checkpoint()); - logRestoreOffsets(restorer.partition(), - restorer.checkpoint(), - endOffsets.get(restorer.partition())); - restorer.setStartingOffset(consumer.position(restorer.partition())); + consumer.seek(restoringPartition, restorer.checkpoint()); + logRestoreOffsets( + restoringPartition, + restorer.checkpoint(), + endOffsets.get(restoringPartition)); + restorer.setStartingOffset(consumer.position(restoringPartition)); } else { - consumer.seekToBeginning(Collections.singletonList(restorer.partition())); + consumer.seekToBeginning(Collections.singletonList(restoringPartition)); needsPositionUpdate.add(restorer); } } for (final StateRestorer restorer : needsPositionUpdate) { - final long position = consumer.position(restorer.partition()); - logRestoreOffsets(restorer.partition(), - position, - endOffsets.get(restorer.partition())); - restorer.setStartingOffset(position); + final TopicPartition restoringPartition = restorer.partition(); + + for (final StreamTask task : restoringTasks) { + if (task.changelogPartitions().contains(restoringPartition) || task.partitions().contains(restoringPartition)) { + if (task.eosEnabled) { + log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " + + "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restorer.partition()); + + needsInitializing.remove(restoringPartition); + initialized.put(restoringPartition, restorer); + restorer.setCheckpointOffset(consumer.position(restoringPartition)); + + task.reinitializeStateStoresForPartitions(restoringPartition); + } else { + log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restorer.partition()); + + final long position = consumer.position(restoringPartition); + logRestoreOffsets( + restoringPartition, + position, + endOffsets.get(restoringPartition)); + restorer.setStartingOffset(position); + } + } + } + + } needsRestoring.putAll(initialized); @@ -220,7 +248,7 @@ public void reset() { } private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords, - final TopicPartition topicPartition) { + final TopicPartition topicPartition) { final StateRestorer restorer = stateRestorers.get(topicPartition); final Long endOffset = endOffsets.get(topicPartition); final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 210b070e852..6b7f4f1934e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -532,7 +532,7 @@ private void tryTransitToRunning() { active.initializeNewTasks(); standby.initializeNewTasks(); - final Collection<TopicPartition> restored = storeChangelogReader.restore(); + final Collection<TopicPartition> restored = storeChangelogReader.restore(active.restoringTasks()); final Set<TopicPartition> resumed = active.updateRestored(restored); if (!resumed.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 0c3b36aa84b..7f709507e35 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -392,7 +392,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception { // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition) // - // the failure gets inject after 20 committed and 30 uncommitted records got received + // the failure gets inject after 20 committed and 10 uncommitted records got received // -> the failure only kills one thread // after fail over, we should read 40 committed records and the state stores should contain the correct sums // per key (even if some records got processed twice) @@ -402,7 +402,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception { streams.start(); final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L); - final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L); + final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L, 2L, 3L); final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>(); dataBeforeFailure.addAll(committedDataBeforeFailure); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java index 6968f3319ec..3a8ebdaf22f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java @@ -28,7 +28,7 @@ private static final long OFFSET_LIMIT = 50; private final MockRestoreCallback callback = new MockRestoreCallback(); - private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true); + private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true, "store"); @Test public void shouldCallRestoreOnRestoreCallback() throws Exception { @@ -53,7 +53,7 @@ public void shouldBeCompletedIfEndOffsetAndRecordOffsetAreZero() throws Exceptio @Test public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() throws Exception { - final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, 0, true); + final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, 0, true, "store"); assertTrue(restorer.hasCompleted(0, 10)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index a43f08344bf..24820f840b2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -59,8 +59,8 @@ public void shouldRequestTopicsAndHandleTimeoutException() throws Exception { }; final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); - changelogReader.restore(); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); + changelogReader.restore(Collections.<StreamTask>emptySet()); assertTrue(functionCalled.get()); } @@ -68,7 +68,7 @@ public void shouldRequestTopicsAndHandleTimeoutException() throws Exception { public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Exception { consumer.subscribe(Collections.singleton("sometopic")); try { - changelogReader.restore(); + changelogReader.restore(Collections.<StreamTask>emptySet()); fail("Should have thrown IllegalStateException"); } catch (final IllegalStateException e) { // ok @@ -79,9 +79,9 @@ public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Except public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exception { final int messages = 10; setupConsumer(messages, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); - changelogReader.restore(); + changelogReader.restore(Collections.<StreamTask>emptySet()); assertThat(callback.restored.size(), equalTo(messages)); } @@ -89,9 +89,9 @@ public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exc public void shouldRestoreMessagesFromCheckpoint() throws Exception { final int messages = 10; setupConsumer(messages, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, 5L, Long.MAX_VALUE, true)); + changelogReader.register(new StateRestorer(topicPartition, callback, 5L, Long.MAX_VALUE, true, "store")); - changelogReader.restore(); + changelogReader.restore(Collections.<StreamTask>emptySet()); assertThat(callback.restored.size(), equalTo(5)); } @@ -99,18 +99,18 @@ public void shouldRestoreMessagesFromCheckpoint() throws Exception { public void shouldClearAssignmentAtEndOfRestore() throws Exception { final int messages = 1; setupConsumer(messages, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); - changelogReader.restore(); + changelogReader.restore(Collections.<StreamTask>emptySet()); assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet())); } @Test public void shouldRestoreToLimitWhenSupplied() throws Exception { setupConsumer(10, topicPartition); - final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, 3, true); + final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, 3, true, "store"); changelogReader.register(restorer); - changelogReader.restore(); + changelogReader.restore(Collections.<StreamTask>emptySet()); assertThat(callback.restored.size(), equalTo(3)); assertThat(restorer.restoredOffset(), equalTo(3L)); } @@ -125,11 +125,11 @@ public void shouldRestoreMultipleStores() throws Exception { setupConsumer(5, one); setupConsumer(3, two); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); - changelogReader.register(new StateRestorer(one, callbackOne, null, Long.MAX_VALUE, true)); - changelogReader.register(new StateRestorer(two, callbackTwo, null, Long.MAX_VALUE, true)); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); + changelogReader.register(new StateRestorer(one, callbackOne, null, Long.MAX_VALUE, true, "store")); + changelogReader.register(new StateRestorer(two, callbackTwo, null, Long.MAX_VALUE, true, "store")); - changelogReader.restore(); + changelogReader.restore(Collections.<StreamTask>emptySet()); assertThat(callback.restored.size(), equalTo(10)); assertThat(callbackOne.restored.size(), equalTo(5)); @@ -138,11 +138,11 @@ public void shouldRestoreMultipleStores() throws Exception { @Test public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception { - final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true); + final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store"); setupConsumer(0, topicPartition); changelogReader.register(restorer); - changelogReader.restore(); + changelogReader.restore(Collections.<StreamTask>emptySet()); assertThat(callback.restored.size(), equalTo(0)); assertThat(restorer.restoredOffset(), equalTo(0L)); } @@ -151,11 +151,11 @@ public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception { public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception { final Long endOffset = 10L; setupConsumer(endOffset, topicPartition); - final StateRestorer restorer = new StateRestorer(topicPartition, callback, endOffset, Long.MAX_VALUE, true); + final StateRestorer restorer = new StateRestorer(topicPartition, callback, endOffset, Long.MAX_VALUE, true, "store"); changelogReader.register(restorer); - changelogReader.restore(); + changelogReader.restore(Collections.<StreamTask>emptySet()); assertThat(callback.restored.size(), equalTo(0)); assertThat(restorer.restoredOffset(), equalTo(endOffset)); } @@ -163,8 +163,8 @@ public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception @Test public void shouldReturnRestoredOffsetsForPersistentStores() throws Exception { setupConsumer(10, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); - changelogReader.restore(); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); + changelogReader.restore(Collections.<StreamTask>emptySet()); final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets(); assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L))); } @@ -172,8 +172,8 @@ public void shouldReturnRestoredOffsetsForPersistentStores() throws Exception { @Test public void shouldNotReturnRestoredOffsetsForNonPersistentStore() throws Exception { setupConsumer(10, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false)); - changelogReader.restore(); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store")); + changelogReader.restore(Collections.<StreamTask>emptySet()); final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets(); assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap())); } @@ -186,8 +186,8 @@ public void shouldIgnoreNullKeysWhenRestoring() throws Exception { consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 1, (byte[]) null, bytes)); consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 2, bytes, bytes)); consumer.assign(Collections.singletonList(topicPartition)); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false)); - changelogReader.restore(); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store")); + changelogReader.restore(Collections.<StreamTask>emptySet()); assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), KeyValue.pair(bytes, bytes)))); } @@ -200,15 +200,15 @@ public void shouldReturnCompletedPartitionsOnEachRestoreCall() { consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), i, bytes, bytes)); } consumer.assign(Collections.singletonList(topicPartition)); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false)); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store")); - final Collection<TopicPartition> completedFirstTime = changelogReader.restore(); + final Collection<TopicPartition> completedFirstTime = changelogReader.restore(Collections.<StreamTask>emptySet()); assertTrue(completedFirstTime.isEmpty()); for (int i = 5; i < 10; i++) { consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), i, bytes, bytes)); } final Collection<TopicPartition> expected = Collections.singleton(topicPartition); - assertThat(changelogReader.restore(), equalTo(expected)); + assertThat(changelogReader.restore(Collections.<StreamTask>emptySet()), equalTo(expected)); } private void setupConsumer(final long messages, final TopicPartition topicPartition) { @@ -237,8 +237,8 @@ private void assignPartition(final long messages, final TopicPartition topicPart public void shouldCompleteImmediatelyWhenEndOffsetIs0() { final Collection<TopicPartition> expected = Collections.singleton(topicPartition); setupConsumer(0, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); - final Collection<TopicPartition> restored = changelogReader.restore(); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); + final Collection<TopicPartition> restored = changelogReader.restore(Collections.<StreamTask>emptySet()); assertThat(restored, equalTo(expected)); } @@ -248,9 +248,9 @@ public void shouldRestorePartitionsRegisteredPostInitialization() { setupConsumer(1, topicPartition); consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L)); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false)); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store")); - assertTrue(changelogReader.restore().isEmpty()); + assertTrue(changelogReader.restore(Collections.<StreamTask>emptySet()).isEmpty()); addRecords(9, topicPartition, 1); @@ -259,12 +259,12 @@ public void shouldRestorePartitionsRegisteredPostInitialization() { consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L)); consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L)); - changelogReader.register(new StateRestorer(postInitialization, callbackTwo, null, Long.MAX_VALUE, false)); + changelogReader.register(new StateRestorer(postInitialization, callbackTwo, null, Long.MAX_VALUE, false, "store")); final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization); consumer.assign(expected); - assertThat(changelogReader.restore(), equalTo(expected)); + assertThat(changelogReader.restore(Collections.<StreamTask>emptySet()), equalTo(expected)); assertThat(callback.restored.size(), equalTo(10)); assertThat(callbackTwo.restored.size(), equalTo(3)); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java index 54fd858393c..93ce801be1c 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java +++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.internals.ChangelogReader; import org.apache.kafka.streams.processor.internals.StateRestorer; +import org.apache.kafka.streams.processor.internals.StreamTask; import java.util.Collection; import java.util.Collections; @@ -35,7 +36,7 @@ public void register(final StateRestorer restorer) { } @Override - public Collection<TopicPartition> restore() { + public Collection<TopicPartition> restore(final Collection<StreamTask> restoringTasks) { return registered; } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index cb56fa1b1a6..fed8752ba40 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -142,6 +142,9 @@ public ThreadCache getCache() { @Override public void initialized() {} + @Override + public void uninitialize() {} + @Override public File stateDir() { if (stateDir == null) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > State-store can desynchronise with changelog > -------------------------------------------- > > Key: KAFKA-7192 > URL: https://issues.apache.org/jira/browse/KAFKA-7192 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.1 > Reporter: Jon Bates > Assignee: Guozhang Wang > Priority: Critical > Labels: bugs > Fix For: 2.0.1, 2.1.0 > > > n.b. this bug has been verified with exactly-once processing enabled > Consider the following scenario: > * A record, N is read into a Kafka topology > * the state store is updated > * the topology crashes > h3. *Expected behaviour:* > # Node is restarted > # Offset was never updated, so record N is reprocessed > # State-store is reset to position N-1 > # Record is reprocessed > h3. *Actual Behaviour* > # Node is restarted > # Record N is reprocessed (good) > # The state store has the state from the previous processing > I'd consider this a corruption of the state-store, hence the critical > Priority, although High may be more appropriate. > I wrote a proof-of-concept here, which demonstrates the problem on Linux: > [https://github.com/spadger/kafka-streams-sad-state-store] -- This message was sent by Atlassian JIRA (v7.6.3#76005)