[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16615257#comment-16615257
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---------------------------------------

mjsax closed pull request #5641: KAFKA-7192: Wipe out state store if EOS is 
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5641
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 04af9f269b7..c094b838a1c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -193,4 +193,10 @@ public ThreadCache getCache() {
     public void initialized() {
         initialized = true;
     }
+
+    @Override
+    public void uninitialize() {
+        initialized = false;
+    }
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 7f6ac7ca614..f8f6416b3e5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -226,6 +226,10 @@ void initStateStores() {
         }
     }
 
+    void reinitializeStateStoresForPartitions(final TopicPartition partitions) 
{
+        stateMgr.reinitializeStateStoresForPartitions(partitions, 
processorContext);
+    }
+
     /**
      * @throws ProcessorStateException if there is an error while closing the 
state manager
      * @param writeCheckpoint boolean indicating if a checkpoint file should 
be written
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index d59ec2b2c1b..ad4868f8fce 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -298,7 +298,7 @@ private void describe(final StringBuilder builder,
         return suspended.values();
     }
 
-    Collection<T> restoringTasks() {
+    public Collection<T> restoringTasks() {
         return Collections.unmodifiableCollection(restoring.values());
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 5ebc34c4e92..e82ee2c354f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -37,7 +37,7 @@
      * Restore all registered state stores by reading from their changelogs.
      * @return all topic partitions that have been restored
      */
-    Collection<TopicPartition> restore();
+    Collection<TopicPartition> restore(final Collection<StreamTask> 
restoringTasks);
 
     /**
      * @return the restored offsets for all persistent stores.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 57bb3ac81a6..b5719b111f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -53,4 +53,9 @@
      * Mark this contex as being initialized
      */
     void initialized();
+
+    /**
+     * Mark this context as being uninitialized
+     */
+    void uninitialize();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 34a87ce03be..93e7ffc5a06 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -33,9 +34,11 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 
 public class ProcessorStateManager implements StateManager {
@@ -50,6 +53,7 @@
     private final String logPrefix;
     private final boolean isStandby;
     private final ChangelogReader changelogReader;
+    private final boolean eosEnabled;
     private final Map<String, StateStore> stores;
     private final Map<String, StateStore> globalStores;
     private final Map<TopicPartition, Long> offsetLimits;
@@ -106,6 +110,7 @@ public ProcessorStateManager(final TaskId taskId,
         checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
         checkpointedOffsets = new HashMap<>(checkpoint.read());
 
+        this.eosEnabled = eosEnabled;
         if (eosEnabled) {
             // delete the checkpoint file after finish loading its stored 
offsets
             checkpoint.delete();
@@ -169,7 +174,8 @@ public void register(final StateStore store,
                                                              
stateRestoreCallback,
                                                              
checkpointedOffsets.get(storePartition),
                                                              
offsetLimit(storePartition),
-                                                             store.persistent()
+                                                             
store.persistent(),
+                                                             store.name()
             );
 
             changelogReader.register(restorer);
@@ -178,6 +184,61 @@ public void register(final StateStore store,
         stores.put(store.name(), store);
     }
 
+    void reinitializeStateStoresForPartitions(final TopicPartition 
topicPartition,
+                                              final InternalProcessorContext 
processorContext) {
+        final Map<String, String> changelogTopicToStore = 
inverseOneToOneMap(storeToChangelogTopic);
+        final Set<String> storeToBeReinitialized = new HashSet<>();
+        final Map<String, StateStore> storesCopy = new HashMap<>(stores);
+
+        checkpointedOffsets.remove(topicPartition);
+        
storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
+
+        if (!eosEnabled) {
+            try {
+                checkpoint.write(checkpointedOffsets);
+            } catch (final IOException fatalException) {
+                log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stores, fatalException);
+                throw new StreamsException("Failed to reinitialize stores.", 
fatalException);
+            }
+        }
+
+        for (final Map.Entry<String, StateStore> entry : 
storesCopy.entrySet()) {
+            final StateStore stateStore = entry.getValue();
+            final String storeName = stateStore.name();
+            if (storeToBeReinitialized.contains(storeName)) {
+                try {
+                    stateStore.close();
+                } catch (final RuntimeException ignoreAndSwallow) { /* ignore 
*/ }
+                processorContext.uninitialize();
+                stores.remove(entry.getKey());
+
+                try {
+                    Utils.delete(new File(baseDir + File.separator + "rocksdb" 
+ File.separator + storeName));
+                } catch (final IOException fatalException) {
+                    log.error("Failed to reinitialize store {}.", storeName, 
fatalException);
+                    throw new StreamsException(String.format("Failed to 
reinitialize store %s.", storeName), fatalException);
+                }
+
+                try {
+                    Utils.delete(new File(baseDir + File.separator + 
storeName));
+                } catch (final IOException fatalException) {
+                    log.error("Failed to reinitialize store {}.", storeName, 
fatalException);
+                    throw new StreamsException(String.format("Failed to 
reinitialize store %s.", storeName), fatalException);
+                }
+
+                stateStore.init(processorContext, stateStore);
+            }
+        }
+    }
+
+    private Map<String, String> inverseOneToOneMap(final Map<String, String> 
origin) {
+        final Map<String, String> reversedMap = new HashMap<>();
+        for (final Map.Entry<String, String> entry : origin.entrySet()) {
+            reversedMap.put(entry.getValue(), entry.getKey());
+        }
+        return reversedMap;
+    }
+
     @Override
     public Map<TopicPartition, Long> checkpointed() {
         final Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 79bfd1d0f49..3c5efe8a1e7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -22,12 +22,13 @@
 public class StateRestorer {
     static final int NO_CHECKPOINT = -1;
 
-    private final Long checkpoint;
     private final long offsetLimit;
     private final boolean persistent;
+    private final String storeName;
     private final TopicPartition partition;
     private final StateRestoreCallback stateRestoreCallback;
 
+    private long checkpointOffset;
     private long restoredOffset;
     private long startingOffset;
 
@@ -35,12 +36,14 @@
                   final StateRestoreCallback stateRestoreCallback,
                   final Long checkpoint,
                   final long offsetLimit,
-                  final boolean persistent) {
+                  final boolean persistent,
+                  final String storeName) {
         this.partition = partition;
         this.stateRestoreCallback = stateRestoreCallback;
-        this.checkpoint = checkpoint;
+        this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : 
checkpoint;
         this.offsetLimit = offsetLimit;
         this.persistent = persistent;
+        this.storeName = storeName;
     }
 
     public TopicPartition partition() {
@@ -48,7 +51,15 @@ public TopicPartition partition() {
     }
 
     long checkpoint() {
-        return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+        return checkpointOffset;
+    }
+
+    void setCheckpointOffset(final long checkpointOffset) {
+        this.checkpointOffset = checkpointOffset;
+    }
+
+    public String storeName() {
+        return storeName;
     }
 
     void restore(final byte[] key, final byte[] value) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34dcb759d8a..305bf10997d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -60,13 +60,16 @@ public StoreChangelogReader(final Consumer<byte[], byte[]> 
consumer) {
 
     @Override
     public void register(final StateRestorer restorer) {
-        stateRestorers.put(restorer.partition(), restorer);
+        if (!stateRestorers.containsKey(restorer.partition())) {
+            stateRestorers.put(restorer.partition(), restorer);
+            log.trace("Added restorer for changelog {}", restorer.partition());
+        }
         needsInitializing.put(restorer.partition(), restorer);
     }
 
-    public Collection<TopicPartition> restore() {
+    public Collection<TopicPartition> restore(final Collection<StreamTask> 
restoringTasks) {
         if (!needsInitializing.isEmpty()) {
-            initialize();
+            initialize(restoringTasks);
         }
 
         if (needsRestoring.isEmpty()) {
@@ -87,7 +90,7 @@ public void register(final StateRestorer restorer) {
         return completed();
     }
 
-    private void initialize() {
+    private void initialize(final Collection<StreamTask> restoringTasks) {
         if (!consumer.subscription().isEmpty()) {
             throw new IllegalStateException("Restore consumer should not be 
subscribed to any topics (" + consumer.subscription() + ")");
         }
@@ -139,11 +142,12 @@ private void initialize() {
 
         // set up restorer for those initializable
         if (!initializable.isEmpty()) {
-            startRestoration(initializable);
+            startRestoration(initializable, restoringTasks);
         }
     }
 
-    private void startRestoration(final Map<TopicPartition, StateRestorer> 
initialized) {
+    private void startRestoration(final Map<TopicPartition, StateRestorer> 
initialized,
+                                  final Collection<StreamTask> restoringTasks) 
{
         log.debug("{} Start restoring state stores from changelog topics {}", 
logPrefix, initialized.keySet());
 
         final Set<TopicPartition> assignment = new 
HashSet<>(consumer.assignment());
@@ -152,24 +156,48 @@ private void startRestoration(final Map<TopicPartition, 
StateRestorer> initializ
 
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
         for (final StateRestorer restorer : initialized.values()) {
+            final TopicPartition restoringPartition = restorer.partition();
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
-                consumer.seek(restorer.partition(), restorer.checkpoint());
-                logRestoreOffsets(restorer.partition(),
-                        restorer.checkpoint(),
-                        endOffsets.get(restorer.partition()));
-                
restorer.setStartingOffset(consumer.position(restorer.partition()));
+                consumer.seek(restoringPartition, restorer.checkpoint());
+                logRestoreOffsets(
+                    restoringPartition,
+                    restorer.checkpoint(),
+                    endOffsets.get(restoringPartition));
+                
restorer.setStartingOffset(consumer.position(restoringPartition));
             } else {
-                
consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+                
consumer.seekToBeginning(Collections.singletonList(restoringPartition));
                 needsPositionUpdate.add(restorer);
             }
         }
 
         for (final StateRestorer restorer : needsPositionUpdate) {
-            final long position = consumer.position(restorer.partition());
-            logRestoreOffsets(restorer.partition(),
-                    position,
-                    endOffsets.get(restorer.partition()));
-            restorer.setStartingOffset(position);
+            final TopicPartition restoringPartition = restorer.partition();
+
+            for (final StreamTask task : restoringTasks) {
+                if (task.changelogPartitions().contains(restoringPartition) || 
task.partitions().contains(restoringPartition)) {
+                    if (task.eosEnabled) {
+                        log.info("No checkpoint found for task {} state store 
{} changelog {} with EOS turned on. " +
+                            "Reinitializing the task and restore its state 
from the beginning.", task.id, restorer.storeName(), restorer.partition());
+
+                        needsInitializing.remove(restoringPartition);
+                        initialized.put(restoringPartition, restorer);
+                        
restorer.setCheckpointOffset(consumer.position(restoringPartition));
+
+                        
task.reinitializeStateStoresForPartitions(restoringPartition);
+                    } else {
+                        log.info("Restoring task {}'s state store {} from 
beginning of the changelog {} ", task.id, restorer.storeName(), 
restorer.partition());
+
+                        final long position = 
consumer.position(restoringPartition);
+                        logRestoreOffsets(
+                            restoringPartition,
+                            position,
+                            endOffsets.get(restoringPartition));
+                        restorer.setStartingOffset(position);
+                    }
+                }
+            }
+
+
         }
 
         needsRestoring.putAll(initialized);
@@ -220,7 +248,7 @@ public void reset() {
     }
 
     private void restorePartition(final ConsumerRecords<byte[], byte[]> 
allRecords,
-                                    final TopicPartition topicPartition) {
+                                  final TopicPartition topicPartition) {
         final StateRestorer restorer = stateRestorers.get(topicPartition);
         final Long endOffset = endOffsets.get(topicPartition);
         final long pos = processNext(allRecords.records(topicPartition), 
restorer, endOffset);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 210b070e852..6b7f4f1934e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -532,7 +532,7 @@ private void tryTransitToRunning() {
         active.initializeNewTasks();
         standby.initializeNewTasks();
 
-        final Collection<TopicPartition> restored = 
storeChangelogReader.restore();
+        final Collection<TopicPartition> restored = 
storeChangelogReader.restore(active.restoringTasks());
         final Set<TopicPartition> resumed = active.updateRestored(restored);
 
         if (!resumed.isEmpty()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 0c3b36aa84b..7f709507e35 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -392,7 +392,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() 
throws Exception {
         // the app commits after each 10 records per partition, and thus will 
have 2*5 uncommitted writes
         // and store updates (ie, another 5 uncommitted writes to a changelog 
topic per partition)
         //
-        // the failure gets inject after 20 committed and 30 uncommitted 
records got received
+        // the failure gets inject after 20 committed and 10 uncommitted 
records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records and the state 
stores should contain the correct sums
         // per key (even if some records got processed twice)
@@ -402,7 +402,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() 
throws Exception {
             streams.start();
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = 
prepareData(0L, 10L, 0L, 1L);
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = 
prepareData(10L, 15L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = 
prepareData(10L, 15L, 0L, 1L, 2L, 3L);
 
             final List<KeyValue<Long, Long>> dataBeforeFailure = new 
ArrayList<>();
             dataBeforeFailure.addAll(committedDataBeforeFailure);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index 6968f3319ec..3a8ebdaf22f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -28,7 +28,7 @@
 
     private static final long OFFSET_LIMIT = 50;
     private final MockRestoreCallback callback = new MockRestoreCallback();
-    private final StateRestorer restorer = new StateRestorer(new 
TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true);
+    private final StateRestorer restorer = new StateRestorer(new 
TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true, "store");
 
     @Test
     public void shouldCallRestoreOnRestoreCallback() throws Exception {
@@ -53,7 +53,7 @@ public void 
shouldBeCompletedIfEndOffsetAndRecordOffsetAreZero() throws Exceptio
 
     @Test
     public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() throws 
Exception {
-        final StateRestorer restorer = new StateRestorer(new 
TopicPartition("topic", 1), callback, null, 0, true);
+        final StateRestorer restorer = new StateRestorer(new 
TopicPartition("topic", 1), callback, null, 0, true, "store");
         assertTrue(restorer.hasCompleted(0, 10));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index a43f08344bf..24820f840b2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -59,8 +59,8 @@ public void shouldRequestTopicsAndHandleTimeoutException() 
throws Exception {
         };
 
         final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer);
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true));
-        changelogReader.restore();
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true, "store"));
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertTrue(functionCalled.get());
     }
 
@@ -68,7 +68,7 @@ public void shouldRequestTopicsAndHandleTimeoutException() 
throws Exception {
     public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws 
Exception {
         consumer.subscribe(Collections.singleton("sometopic"));
         try {
-            changelogReader.restore();
+            changelogReader.restore(Collections.<StreamTask>emptySet());
             fail("Should have thrown IllegalStateException");
         } catch (final IllegalStateException e) {
             // ok
@@ -79,9 +79,9 @@ public void 
shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Except
     public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() 
throws Exception {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true));
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true, "store"));
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(callback.restored.size(), equalTo(messages));
     }
 
@@ -89,9 +89,9 @@ public void 
shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exc
     public void shouldRestoreMessagesFromCheckpoint() throws Exception {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
5L, Long.MAX_VALUE, true));
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
5L, Long.MAX_VALUE, true, "store"));
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(callback.restored.size(), equalTo(5));
     }
 
@@ -99,18 +99,18 @@ public void shouldRestoreMessagesFromCheckpoint() throws 
Exception {
     public void shouldClearAssignmentAtEndOfRestore() throws Exception {
         final int messages = 1;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true));
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true, "store"));
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(consumer.assignment(), 
equalTo(Collections.<TopicPartition>emptySet()));
     }
 
     @Test
     public void shouldRestoreToLimitWhenSupplied() throws Exception {
         setupConsumer(10, topicPartition);
-        final StateRestorer restorer = new StateRestorer(topicPartition, 
callback, null, 3, true);
+        final StateRestorer restorer = new StateRestorer(topicPartition, 
callback, null, 3, true, "store");
         changelogReader.register(restorer);
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
     }
@@ -125,11 +125,11 @@ public void shouldRestoreMultipleStores() throws 
Exception {
         setupConsumer(5, one);
         setupConsumer(3, two);
 
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true));
-        changelogReader.register(new StateRestorer(one, callbackOne, null, 
Long.MAX_VALUE, true));
-        changelogReader.register(new StateRestorer(two, callbackTwo, null, 
Long.MAX_VALUE, true));
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true, "store"));
+        changelogReader.register(new StateRestorer(one, callbackOne, null, 
Long.MAX_VALUE, true, "store"));
+        changelogReader.register(new StateRestorer(two, callbackTwo, null, 
Long.MAX_VALUE, true, "store"));
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
 
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackOne.restored.size(), equalTo(5));
@@ -138,11 +138,11 @@ public void shouldRestoreMultipleStores() throws 
Exception {
 
     @Test
     public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws 
Exception {
-        final StateRestorer restorer = new StateRestorer(topicPartition, 
callback, null, Long.MAX_VALUE, true);
+        final StateRestorer restorer = new StateRestorer(topicPartition, 
callback, null, Long.MAX_VALUE, true, "store");
         setupConsumer(0, topicPartition);
         changelogReader.register(restorer);
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(callback.restored.size(), equalTo(0));
         assertThat(restorer.restoredOffset(), equalTo(0L));
     }
@@ -151,11 +151,11 @@ public void 
shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception {
     public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws 
Exception {
         final Long endOffset = 10L;
         setupConsumer(endOffset, topicPartition);
-        final StateRestorer restorer = new StateRestorer(topicPartition, 
callback, endOffset, Long.MAX_VALUE, true);
+        final StateRestorer restorer = new StateRestorer(topicPartition, 
callback, endOffset, Long.MAX_VALUE, true, "store");
 
         changelogReader.register(restorer);
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(callback.restored.size(), equalTo(0));
         assertThat(restorer.restoredOffset(), equalTo(endOffset));
     }
@@ -163,8 +163,8 @@ public void 
shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception
     @Test
     public void shouldReturnRestoredOffsetsForPersistentStores() throws 
Exception {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true));
-        changelogReader.restore();
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true, "store"));
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         final Map<TopicPartition, Long> restoredOffsets = 
changelogReader.restoredOffsets();
         assertThat(restoredOffsets, 
equalTo(Collections.singletonMap(topicPartition, 10L)));
     }
@@ -172,8 +172,8 @@ public void 
shouldReturnRestoredOffsetsForPersistentStores() throws Exception {
     @Test
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() throws 
Exception {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, false));
-        changelogReader.restore();
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, false, "store"));
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         final Map<TopicPartition, Long> restoredOffsets = 
changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, 
Long>emptyMap()));
     }
@@ -186,8 +186,8 @@ public void shouldIgnoreNullKeysWhenRestoring() throws 
Exception {
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), 
topicPartition.partition(), 1, (byte[]) null, bytes));
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), 
topicPartition.partition(), 2, bytes, bytes));
         consumer.assign(Collections.singletonList(topicPartition));
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, false));
-        changelogReader.restore();
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, false, "store"));
+        changelogReader.restore(Collections.<StreamTask>emptySet());
 
         assertThat(callback.restored, 
CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), 
KeyValue.pair(bytes, bytes))));
     }
@@ -200,15 +200,15 @@ public void 
shouldReturnCompletedPartitionsOnEachRestoreCall() {
             consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), 
topicPartition.partition(), i, bytes, bytes));
         }
         consumer.assign(Collections.singletonList(topicPartition));
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, false));
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, false, "store"));
 
-        final Collection<TopicPartition> completedFirstTime = 
changelogReader.restore();
+        final Collection<TopicPartition> completedFirstTime = 
changelogReader.restore(Collections.<StreamTask>emptySet());
         assertTrue(completedFirstTime.isEmpty());
         for (int i = 5; i < 10; i++) {
             consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), 
topicPartition.partition(), i, bytes, bytes));
         }
         final Collection<TopicPartition> expected = 
Collections.singleton(topicPartition);
-        assertThat(changelogReader.restore(), equalTo(expected));
+        
assertThat(changelogReader.restore(Collections.<StreamTask>emptySet()), 
equalTo(expected));
     }
 
     private void setupConsumer(final long messages, final TopicPartition 
topicPartition) {
@@ -237,8 +237,8 @@ private void assignPartition(final long messages, final 
TopicPartition topicPart
     public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
         final Collection<TopicPartition> expected = 
Collections.singleton(topicPartition);
         setupConsumer(0, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true));
-        final Collection<TopicPartition> restored = changelogReader.restore();
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, true, "store"));
+        final Collection<TopicPartition> restored = 
changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(restored, equalTo(expected));
     }
 
@@ -248,9 +248,9 @@ public void 
shouldRestorePartitionsRegisteredPostInitialization() {
 
         setupConsumer(1, topicPartition);
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
10L));
-        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, false));
+        changelogReader.register(new StateRestorer(topicPartition, callback, 
null, Long.MAX_VALUE, false, "store"));
 
-        assertTrue(changelogReader.restore().isEmpty());
+        
assertTrue(changelogReader.restore(Collections.<StreamTask>emptySet()).isEmpty());
 
         addRecords(9, topicPartition, 1);
 
@@ -259,12 +259,12 @@ public void 
shouldRestorePartitionsRegisteredPostInitialization() {
         
consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 
0L));
         consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 
3L));
 
-        changelogReader.register(new StateRestorer(postInitialization, 
callbackTwo, null, Long.MAX_VALUE, false));
+        changelogReader.register(new StateRestorer(postInitialization, 
callbackTwo, null, Long.MAX_VALUE, false, "store"));
 
         final Collection<TopicPartition> expected = 
Utils.mkSet(topicPartition, postInitialization);
         consumer.assign(expected);
 
-        assertThat(changelogReader.restore(), equalTo(expected));
+        
assertThat(changelogReader.restore(Collections.<StreamTask>emptySet()), 
equalTo(expected));
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackTwo.restored.size(), equalTo(3));
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java 
b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
index 54fd858393c..93ce801be1c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.internals.ChangelogReader;
 import org.apache.kafka.streams.processor.internals.StateRestorer;
+import org.apache.kafka.streams.processor.internals.StreamTask;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -35,7 +36,7 @@ public void register(final StateRestorer restorer) {
     }
 
     @Override
-    public Collection<TopicPartition> restore() {
+    public Collection<TopicPartition> restore(final Collection<StreamTask> 
restoringTasks) {
         return registered;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index cb56fa1b1a6..fed8752ba40 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -142,6 +142,9 @@ public ThreadCache getCache() {
     @Override
     public void initialized() {}
 
+    @Override
+    public void uninitialize() {}
+
     @Override
     public File stateDir() {
         if (stateDir == null) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> --------------------------------------------
>
>                 Key: KAFKA-7192
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7192
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.1
>            Reporter: Jon Bates
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: bugs
>             Fix For: 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to