[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642553#comment-16642553
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---------------------------------------

mjsax closed pull request #5657: KAFKA-7192: Wipe out state store if EOS is 
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5657
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 410212e1ff0..eeaed893482 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -198,4 +198,10 @@ public ThreadCache getCache() {
     public void initialized() {
         initialized = true;
     }
+
+    @Override
+    public void uninitialize() {
+        initialized = false;
+    }
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index baea4af62f1..fcf2f6b13b7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -107,7 +107,7 @@ public final String applicationId() {
     }
 
     @Override
-    public final Set<TopicPartition> partitions() {
+    public Set<TopicPartition> partitions() {
         return partitions;
     }
 
@@ -226,6 +226,9 @@ void registerStateStores() {
         }
     }
 
+    void reinitializeStateStoresForPartitions(final TopicPartition partitions) 
{
+        stateMgr.reinitializeStateStoresForPartitions(partitions, 
processorContext);
+    }
 
     /**
      * @throws ProcessorStateException if there is an error while closing the 
state manager
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 0d9d04de5cc..cfce57588e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -50,7 +50,7 @@
     // IQ may access this map.
     private Map<TaskId, Task> running = new ConcurrentHashMap<>();
     private Map<TopicPartition, Task> runningByPartition = new HashMap<>();
-    private Map<TopicPartition, Task> restoringByPartition = new HashMap<>();
+    private Map<TopicPartition, StreamTask> restoringByPartition = new 
HashMap<>();
     private int committed = 0;
 
 
@@ -122,7 +122,8 @@ void addNewTask(final Task task) {
             try {
                 if (!entry.getValue().initializeStateStores()) {
                     log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
-                    addToRestoring(entry.getValue());
+                    // cast is safe, because StandbyTasks always returns 
`true` in `initializeStateStores()` above
+                    addToRestoring((StreamTask) entry.getValue());
                 } else {
                     transitionToRunning(entry.getValue(), readyPartitions);
                 }
@@ -278,7 +279,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final 
Set<TopicPartition>
         return false;
     }
 
-    private void addToRestoring(final Task task) {
+    private void addToRestoring(final StreamTask task) {
         restoring.put(task.id(), task);
         for (TopicPartition topicPartition : task.partitions()) {
             restoringByPartition.put(topicPartition, task);
@@ -307,7 +308,7 @@ private void transitionToRunning(final Task task, final 
Set<TopicPartition> read
     }
 
     @Override
-    public Task restoringTaskFor(final TopicPartition partition) {
+    public StreamTask restoringTaskFor(final TopicPartition partition) {
         return restoringByPartition.get(partition);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 57bb3ac81a6..b5719b111f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -53,4 +53,9 @@
      * Mark this contex as being initialized
      */
     void initialized();
+
+    /**
+     * Mark this context as being uninitialized
+     */
+    void uninitialize();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 5536ac12be7..9ccb458cf66 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -19,8 +19,10 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -33,9 +35,11 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 
 public class ProcessorStateManager implements StateManager {
@@ -49,6 +53,7 @@
     private final String logPrefix;
     private final boolean isStandby;
     private final ChangelogReader changelogReader;
+    private final boolean eosEnabled;
     private final Map<String, StateStore> stores;
     private final Map<String, StateStore> globalStores;
     private final Map<TopicPartition, Long> offsetLimits;
@@ -98,6 +103,7 @@ public ProcessorStateManager(final TaskId taskId,
         checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
         checkpointedOffsets = new HashMap<>(checkpoint.read());
 
+        this.eosEnabled = eosEnabled;
         if (eosEnabled) {
             // delete the checkpoint file after finish loading its stored 
offsets
             checkpoint.delete();
@@ -176,6 +182,61 @@ public void register(final StateStore store,
         return partitionsAndOffsets;
     }
 
+    void reinitializeStateStoresForPartitions(final TopicPartition 
topicPartition,
+                                              final InternalProcessorContext 
processorContext) {
+        final Map<String, String> changelogTopicToStore = 
inverseOneToOneMap(storeToChangelogTopic);
+        final Set<String> storeToBeReinitialized = new HashSet<>();
+        final Map<String, StateStore> storesCopy = new HashMap<>(stores);
+
+        checkpointedOffsets.remove(topicPartition);
+        
storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
+
+        if (!eosEnabled) {
+            try {
+                checkpoint.write(checkpointedOffsets);
+            } catch (final IOException fatalException) {
+                log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stores, fatalException);
+                throw new StreamsException("Failed to reinitialize stores.", 
fatalException);
+            }
+        }
+
+        for (final Map.Entry<String, StateStore> entry : 
storesCopy.entrySet()) {
+            final StateStore stateStore = entry.getValue();
+            final String storeName = stateStore.name();
+            if (storeToBeReinitialized.contains(storeName)) {
+                try {
+                    stateStore.close();
+                } catch (final RuntimeException ignoreAndSwallow) { /* ignore 
*/ }
+                processorContext.uninitialize();
+                stores.remove(entry.getKey());
+
+                try {
+                    Utils.delete(new File(baseDir + File.separator + "rocksdb" 
+ File.separator + storeName));
+                } catch (final IOException fatalException) {
+                    log.error("Failed to reinitialize store {}.", storeName, 
fatalException);
+                    throw new StreamsException(String.format("Failed to 
reinitialize store %s.", storeName), fatalException);
+                }
+
+                try {
+                    Utils.delete(new File(baseDir + File.separator + 
storeName));
+                } catch (final IOException fatalException) {
+                    log.error("Failed to reinitialize store {}.", storeName, 
fatalException);
+                    throw new StreamsException(String.format("Failed to 
reinitialize store %s.", storeName), fatalException);
+                }
+
+                stateStore.init(processorContext, stateStore);
+            }
+        }
+    }
+
+    private Map<String, String> inverseOneToOneMap(final Map<String, String> 
origin) {
+        final Map<String, String> reversedMap = new HashMap<>();
+        for (final Map.Entry<String, String> entry : origin.entrySet()) {
+            reversedMap.put(entry.getValue(), entry.getKey());
+        }
+        return reversedMap;
+    }
+
     List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(final 
TopicPartition storePartition,
                                                              final 
List<ConsumerRecord<byte[], byte[]>> records) {
         final long limit = offsetLimit(storePartition);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
index 6ed28fdf63c..3671b493f19 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
@@ -19,5 +19,5 @@
 import org.apache.kafka.common.TopicPartition;
 
 public interface RestoringTasks {
-    Task restoringTaskFor(final TopicPartition partition);
+    StreamTask restoringTaskFor(final TopicPartition partition);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 33dce9e7558..e0bac939b8b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -26,13 +26,13 @@
 
     static final int NO_CHECKPOINT = -1;
 
-    private final Long checkpoint;
     private final long offsetLimit;
     private final boolean persistent;
     private final String storeName;
     private final TopicPartition partition;
     private final CompositeRestoreListener compositeRestoreListener;
 
+    private long checkpointOffset;
     private long restoredOffset;
     private long startingOffset;
     private long endingOffset;
@@ -45,7 +45,7 @@
                   final String storeName) {
         this.partition = partition;
         this.compositeRestoreListener = compositeRestoreListener;
-        this.checkpoint = checkpoint;
+        this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : 
checkpoint;
         this.offsetLimit = offsetLimit;
         this.persistent = persistent;
         this.storeName = storeName;
@@ -56,7 +56,15 @@ public TopicPartition partition() {
     }
 
     long checkpoint() {
-        return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+        return checkpointOffset;
+    }
+
+    void setCheckpointOffset(final long checkpointOffset) {
+        this.checkpointOffset = checkpointOffset;
+    }
+
+    public String storeName() {
+        return storeName;
     }
 
     void restoreStarted() {
@@ -67,7 +75,8 @@ void restoreDone() {
         compositeRestoreListener.onRestoreEnd(partition, storeName, 
restoredNumRecords());
     }
 
-    void restoreBatchCompleted(long currentRestoredOffset, int numRestored) {
+    void restoreBatchCompleted(final long currentRestoredOffset,
+                               final int numRestored) {
         compositeRestoreListener.onBatchRestored(partition, storeName, 
currentRestoredOffset, numRestored);
     }
 
@@ -79,7 +88,7 @@ boolean isPersistent() {
         return persistent;
     }
 
-    void setUserRestoreListener(StateRestoreListener userRestoreListener) {
+    void setUserRestoreListener(final StateRestoreListener 
userRestoreListener) {
         
this.compositeRestoreListener.setUserRestoreListener(userRestoreListener);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 8d85b1d8faf..34350c17eb0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -69,7 +69,7 @@ public void register(final StateRestorer restorer) {
      */
     public Collection<TopicPartition> restore(final RestoringTasks active) {
         if (!needsInitializing.isEmpty()) {
-            initialize();
+            initialize(active);
         }
 
         if (needsRestoring.isEmpty()) {
@@ -90,7 +90,7 @@ public void register(final StateRestorer restorer) {
         return completed();
     }
 
-    private void initialize() {
+    private void initialize(final RestoringTasks active) {
         if (!consumer.subscription().isEmpty()) {
             throw new IllegalStateException("Restore consumer should not be 
subscribed to any topics (" + consumer.subscription() + ")");
         }
@@ -99,8 +99,8 @@ private void initialize() {
         // the needsInitializing map is not empty, meaning we do not know the 
metadata for some of them yet
         refreshChangelogInfo();
 
-        Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
-        for (Map.Entry<TopicPartition, StateRestorer> entry : 
needsInitializing.entrySet()) {
+        final Map<TopicPartition, StateRestorer> initializable = new 
HashMap<>();
+        for (final Map.Entry<TopicPartition, StateRestorer> entry : 
needsInitializing.entrySet()) {
             final TopicPartition topicPartition = entry.getKey();
             if (hasPartition(topicPartition)) {
                 initializable.put(entry.getKey(), entry.getValue());
@@ -144,11 +144,12 @@ private void initialize() {
 
         // set up restorer for those initializable
         if (!initializable.isEmpty()) {
-            startRestoration(initializable);
+            startRestoration(initializable, active);
         }
     }
 
-    private void startRestoration(final Map<TopicPartition, StateRestorer> 
initialized) {
+    private void startRestoration(final Map<TopicPartition, StateRestorer> 
initialized,
+                                  final RestoringTasks active) {
         log.debug("Start restoring state stores from changelog topics {}", 
initialized.keySet());
 
         final Set<TopicPartition> assignment = new 
HashSet<>(consumer.assignment());
@@ -157,26 +158,47 @@ private void startRestoration(final Map<TopicPartition, 
StateRestorer> initializ
 
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
         for (final StateRestorer restorer : initialized.values()) {
+            final TopicPartition restoringPartition = restorer.partition();
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
-                consumer.seek(restorer.partition(), restorer.checkpoint());
-                logRestoreOffsets(restorer.partition(),
-                        restorer.checkpoint(),
-                        endOffsets.get(restorer.partition()));
-                
restorer.setStartingOffset(consumer.position(restorer.partition()));
+                consumer.seek(restoringPartition, restorer.checkpoint());
+                logRestoreOffsets(restoringPartition,
+                                  restorer.checkpoint(),
+                                  endOffsets.get(restoringPartition));
+                
restorer.setStartingOffset(consumer.position(restoringPartition));
                 restorer.restoreStarted();
             } else {
-                
consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+                
consumer.seekToBeginning(Collections.singletonList(restoringPartition));
                 needsPositionUpdate.add(restorer);
             }
         }
 
         for (final StateRestorer restorer : needsPositionUpdate) {
-            final long position = consumer.position(restorer.partition());
-            logRestoreOffsets(restorer.partition(),
-                              position,
-                              endOffsets.get(restorer.partition()));
-            restorer.setStartingOffset(position);
-            restorer.restoreStarted();
+            final TopicPartition restoringPartition = restorer.partition();
+            final StreamTask task = 
active.restoringTaskFor(restoringPartition);
+
+            // If checkpoint does not exist it means the task was not shutdown 
gracefully before;
+            // and in this case if EOS is turned on we should wipe out the 
state and re-initialize the task
+            if (task.eosEnabled) {
+                log.info("No checkpoint found for task {} state store {} 
changelog {} with EOS turned on. " +
+                    "Reinitializing the task and restore its state from the 
beginning.", task.id, restorer.storeName(), restoringPartition);
+
+                // we move the partitions here, because they will be added 
back within
+                // `task.reinitializeStateStoresForPartitions()` that calls 
`register()` internally again
+                needsInitializing.remove(restoringPartition);
+                
restorer.setCheckpointOffset(consumer.position(restoringPartition));
+
+                task.reinitializeStateStoresForPartitions(restoringPartition);
+                stateRestorers.get(restoringPartition).restoreStarted();
+            } else {
+                log.info("Restoring task {}'s state store {} from beginning of 
the changelog {} ", task.id, restorer.storeName(), restoringPartition);
+
+                final long position = consumer.position(restoringPartition);
+                logRestoreOffsets(restoringPartition,
+                                  position,
+                                  endOffsets.get(restoringPartition));
+                restorer.setStartingOffset(position);
+                restorer.restoreStarted();
+            }
         }
 
         needsRestoring.putAll(initialized);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 6c7b2b43c9d..d07781f5b24 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -392,7 +392,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() 
throws Exception {
         // the app commits after each 10 records per partition, and thus will 
have 2*5 uncommitted writes
         // and store updates (ie, another 5 uncommitted writes to a changelog 
topic per partition)
         //
-        // the failure gets inject after 20 committed and 30 uncommitted 
records got received
+        // the failure gets inject after 20 committed and 10 uncommitted 
records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records and the state 
stores should contain the correct sums
         // per key (even if some records got processed twice)
@@ -402,7 +402,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() 
throws Exception {
             streams.start();
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = 
prepareData(0L, 10L, 0L, 1L);
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = 
prepareData(10L, 15L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = 
prepareData(10L, 15L, 0L, 1L, 2L, 3L);
 
             final List<KeyValue<Long, Long>> dataBeforeFailure = new 
ArrayList<>();
             dataBeforeFailure.addAll(committedDataBeforeFailure);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index 5c8b7c44033..01439add706 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -40,8 +40,8 @@
 
 public class AssignedTasksTest {
 
-    private final Task t1 = EasyMock.createMock(Task.class);
-    private final Task t2 = EasyMock.createMock(Task.class);
+    private final StreamTask t1 = EasyMock.createMock(StreamTask.class);
+    private final StreamTask t2 = EasyMock.createMock(StreamTask.class);
     private final TopicPartition tp1 = new TopicPartition("t1", 0);
     private final TopicPartition tp2 = new TopicPartition("t2", 0);
     private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index bb0c51e1546..a0e2140e92d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -59,7 +59,7 @@
     @Mock(type = MockType.NICE)
     private RestoringTasks active;
     @Mock(type = MockType.NICE)
-    private Task task;
+    private StreamTask task;
 
     private final MockStateRestoreListener callback = new 
MockStateRestoreListener();
     private final CompositeRestoreListener restoreListener = new 
CompositeRestoreListener(callback);
@@ -107,6 +107,10 @@ public void 
shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
+
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(messages));
     }
@@ -115,8 +119,7 @@ public void 
shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
     public void shouldRestoreMessagesFromCheckpoint() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, 5L, Long.MAX_VALUE, true,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, 5L, Long.MAX_VALUE, true, "storeName"));
 
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(5));
@@ -126,9 +129,9 @@ public void shouldRestoreMessagesFromCheckpoint() {
     public void shouldClearAssignmentAtEndOfRestore() {
         final int messages = 1;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true,
-                                                   "storeName"));
-
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
         assertThat(consumer.assignment(), 
equalTo(Collections.<TopicPartition>emptySet()));
     }
@@ -136,9 +139,10 @@ public void shouldClearAssignmentAtEndOfRestore() {
     @Test
     public void shouldRestoreToLimitWhenSupplied() {
         setupConsumer(10, topicPartition);
-        final StateRestorer restorer = new StateRestorer(topicPartition, 
restoreListener, null, 3, true,
-                                                         "storeName");
+        final StateRestorer restorer = new StateRestorer(topicPartition, 
restoreListener, null, 3, true, "storeName");
         changelogReader.register(restorer);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
@@ -156,14 +160,14 @@ public void shouldRestoreMultipleStores() {
         setupConsumer(5, one);
         setupConsumer(3, two);
 
-        changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, null, 
Long.MAX_VALUE, true, "storeName1"));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
         changelogReader.register(new StateRestorer(one, restoreListener1, 
null, Long.MAX_VALUE, true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, 
null, Long.MAX_VALUE, true, "storeName3"));
 
-        expect(active.restoringTaskFor(one)).andReturn(null);
-        expect(active.restoringTaskFor(two)).andReturn(null);
-        replay(active);
+        expect(active.restoringTaskFor(one)).andStubReturn(task);
+        expect(active.restoringTaskFor(two)).andStubReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(10));
@@ -188,9 +192,10 @@ public void shouldRestoreAndNotifyMultipleStores() throws 
Exception {
         changelogReader.register(new StateRestorer(one, restoreListener1, 
null, Long.MAX_VALUE, true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, 
null, Long.MAX_VALUE, true, "storeName3"));
 
-        expect(active.restoringTaskFor(one)).andReturn(null);
-        expect(active.restoringTaskFor(two)).andReturn(null);
-        replay(active);
+        expect(active.restoringTaskFor(one)).andStubReturn(task);
+        expect(active.restoringTaskFor(two)).andStubReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(10));
@@ -210,8 +215,10 @@ public void shouldRestoreAndNotifyMultipleStores() throws 
Exception {
     @Test
     public void shouldOnlyReportTheLastRestoredOffset() {
         setupConsumer(10, topicPartition);
-        changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, null, 
5, true, "storeName1"));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, 5, true, "storeName1"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
+
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(5));
@@ -270,6 +277,8 @@ public void 
shouldNotRestoreAnythingWhenCheckpointAtEndOffset() {
     public void shouldReturnRestoredOffsetsForPersistentStores() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = 
changelogReader.restoredOffsets();
         assertThat(restoredOffsets, 
equalTo(Collections.singletonMap(topicPartition, 10L)));
@@ -279,6 +288,8 @@ public void 
shouldReturnRestoredOffsetsForPersistentStores() {
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = 
changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, 
Long>emptyMap()));
@@ -292,8 +303,9 @@ public void shouldIgnoreNullKeysWhenRestoring() {
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), 
topicPartition.partition(), 1, (byte[]) null, bytes));
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), 
topicPartition.partition(), 2, bytes, bytes));
         consumer.assign(Collections.singletonList(topicPartition));
-        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored, 
CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), 
KeyValue.pair(bytes, bytes))));
@@ -318,10 +330,11 @@ public void 
shouldRestorePartitionsRegisteredPostInitialization() {
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
 
         final TopicPartition postInitialization = new TopicPartition("other", 
0);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
-        expect(active.restoringTaskFor(postInitialization)).andReturn(null);
-        replay(active);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        
expect(active.restoringTaskFor(postInitialization)).andStubReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
 
         assertTrue(changelogReader.restore(active).isEmpty());
 
@@ -348,7 +361,7 @@ public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
5L));
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
 
-        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active);
 
         try {
@@ -371,8 +384,8 @@ public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
6L));
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
 
-        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        replay(active);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         try {
             changelogReader.restore(active);
             fail("Should have thrown task migrated exception");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> --------------------------------------------
>
>                 Key: KAFKA-7192
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7192
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.3, 1.1.1
>            Reporter: Jon Bates
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: bugs
>             Fix For: 0.11.0.4, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to