[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692414#comment-16692414
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---------------------------------------

mjsax closed pull request #5767: KAFKA-7192: Wipe out state store if EOS is 
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5767
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index a99e45147b9..40ce79c289e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -24,4 +24,8 @@
         super(logContext, "standby task");
     }
 
+    void addToRestoring(final StandbyTask task) {
+        throw new UnsupportedOperationException("Standby tasks cannot be 
restored actively.");
+    }
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 7b05f6488e7..98c2bbd2563 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -23,12 +23,21 @@
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements 
RestoringTasks {
     private final Logger log;
+    private final Map<TaskId, StreamTask> restoring = new HashMap<>();
+    private final Set<TopicPartition> restoredPartitions = new HashSet<>();
+    private final Map<TopicPartition, StreamTask> restoringByPartition = new 
HashMap<>();
     private final TaskAction<StreamTask> maybeCommitAction;
     private int committed = 0;
 
@@ -59,6 +68,52 @@ public StreamTask restoringTaskFor(final TopicPartition 
partition) {
         return restoringByPartition.get(partition);
     }
 
+    void updateRestored(final Collection<TopicPartition> restored) {
+        if (restored.isEmpty()) {
+            return;
+        }
+        log.trace("Stream task changelog partitions that have completed 
restoring so far: {}", restored);
+        restoredPartitions.addAll(restored);
+        for (final Iterator<Map.Entry<TaskId, StreamTask>> it = 
restoring.entrySet().iterator(); it.hasNext(); ) {
+            final Map.Entry<TaskId, StreamTask> entry = it.next();
+            final StreamTask task = entry.getValue();
+            if (restoredPartitions.containsAll(task.changelogPartitions())) {
+                transitionToRunning(task);
+                it.remove();
+                log.trace("Stream task {} completed restoration as all its 
changelog partitions {} have been applied to restore state",
+                    task.id(),
+                    task.changelogPartitions());
+            } else {
+                if (log.isTraceEnabled()) {
+                    final HashSet<TopicPartition> outstandingPartitions = new 
HashSet<>(task.changelogPartitions());
+                    outstandingPartitions.removeAll(restoredPartitions);
+                    log.trace("Stream task {} cannot resume processing yet 
since some of its changelog partitions have not completed restoring: {}",
+                        task.id(),
+                        outstandingPartitions);
+                }
+            }
+        }
+        if (allTasksRunning()) {
+            restoredPartitions.clear();
+        }
+    }
+
+    void addToRestoring(final StreamTask task) {
+        restoring.put(task.id(), task);
+        for (final TopicPartition topicPartition : task.partitions()) {
+            restoringByPartition.put(topicPartition, task);
+        }
+        for (final TopicPartition topicPartition : task.changelogPartitions()) 
{
+            restoringByPartition.put(topicPartition, task);
+        }
+    }
+
+    boolean allTasksRunning() {
+        return created.isEmpty()
+            && suspended.isEmpty()
+            && restoring.isEmpty();
+    }
+
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
@@ -139,4 +194,43 @@ int punctuate() {
         return punctuated;
     }
 
+    RuntimeException suspend() {
+        final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(super.suspend());
+        log.trace("Close restoring stream task {}", restoring.keySet());
+        firstException.compareAndSet(null, 
closeNonRunningTasks(restoring.values()));
+        restoring.clear();
+        restoringByPartition.clear();
+        return firstException.get();
+    }
+
+    public String toString(final String indent) {
+        final StringBuilder builder = new StringBuilder();
+        builder.append(super.toString(indent));
+        describe(builder, restoring.values(), indent, "Restoring:");
+        return builder.toString();
+    }
+
+    List<StreamTask> allTasks() {
+        final List<StreamTask> tasks = super.allTasks();
+        tasks.addAll(restoring.values());
+        return tasks;
+    }
+
+    Collection<StreamTask> restoringTasks() {
+        return Collections.unmodifiableCollection(restoring.values());
+    }
+
+    Set<TaskId> allAssignedTaskIds() {
+        final Set<TaskId> taskIds = super.allAssignedTaskIds();
+        taskIds.addAll(restoring.keySet());
+        return taskIds;
+    }
+
+    void clear() {
+        super.clear();
+        restoring.clear();
+        restoringByPartition.clear();
+        restoredPartitions.clear();
+    }
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index da402bc251c..52b1335f951 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -40,15 +40,12 @@
     private final Logger log;
     private final String taskTypeName;
     private final TaskAction<T> commitAction;
-    private Map<TaskId, T> created = new HashMap<>();
-    private Map<TaskId, T> suspended = new HashMap<>();
-    private Map<TaskId, T> restoring = new HashMap<>();
-    private Set<TopicPartition> restoredPartitions = new HashSet<>();
-    private Set<TaskId> previousActiveTasks = new HashSet<>();
+    Map<TaskId, T> created = new HashMap<>();
+    Map<TaskId, T> suspended = new HashMap<>();
+    private final Set<TaskId> previousActiveTasks = new HashSet<>();
     // IQ may access this map.
     Map<TaskId, T> running = new ConcurrentHashMap<>();
-    private Map<TopicPartition, T> runningByPartition = new HashMap<>();
-    Map<TopicPartition, T> restoringByPartition = new HashMap<>();
+    private final Map<TopicPartition, T> runningByPartition = new HashMap<>();
 
     AssignedTasks(final LogContext logContext,
                   final String taskTypeName) {
@@ -99,42 +96,11 @@ void initializeNewTasks() {
         }
     }
 
-    void updateRestored(final Collection<TopicPartition> restored) {
-        if (restored.isEmpty()) {
-            return;
-        }
-        log.trace("{} changelog partitions that have completed restoring so 
far: {}", taskTypeName, restored);
-        restoredPartitions.addAll(restored);
-        for (final Iterator<Map.Entry<TaskId, T>> it = 
restoring.entrySet().iterator(); it.hasNext(); ) {
-            final Map.Entry<TaskId, T> entry = it.next();
-            final T task = entry.getValue();
-            if (restoredPartitions.containsAll(task.changelogPartitions())) {
-                transitionToRunning(task);
-                it.remove();
-                log.trace("{} {} completed restoration as all its changelog 
partitions {} have been applied to restore state",
-                        taskTypeName,
-                        task.id(),
-                        task.changelogPartitions());
-            } else {
-                if (log.isTraceEnabled()) {
-                    final HashSet<TopicPartition> outstandingPartitions = new 
HashSet<>(task.changelogPartitions());
-                    outstandingPartitions.removeAll(restoredPartitions);
-                    log.trace("{} {} cannot resume processing yet since some 
of its changelog partitions have not completed restoring: {}",
-                              taskTypeName,
-                              task.id(),
-                              outstandingPartitions);
-                }
-            }
-        }
-        if (allTasksRunning()) {
-            restoredPartitions.clear();
-        }
-    }
+    abstract void addToRestoring(final T task);
 
     boolean allTasksRunning() {
         return created.isEmpty()
-                && suspended.isEmpty()
-                && restoring.isEmpty();
+                && suspended.isEmpty();
     }
 
     Collection<T> running() {
@@ -145,21 +111,17 @@ RuntimeException suspend() {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
         log.trace("Suspending running {} {}", taskTypeName, runningTaskIds());
         firstException.compareAndSet(null, suspendTasks(running.values()));
-        log.trace("Close restoring {} {}", taskTypeName, restoring.keySet());
-        firstException.compareAndSet(null, 
closeNonRunningTasks(restoring.values()));
         log.trace("Close created {} {}", taskTypeName, created.keySet());
         firstException.compareAndSet(null, 
closeNonRunningTasks(created.values()));
         previousActiveTasks.clear();
         previousActiveTasks.addAll(running.keySet());
         running.clear();
-        restoring.clear();
         created.clear();
         runningByPartition.clear();
-        restoringByPartition.clear();
         return firstException.get();
     }
 
-    private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
+    RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
         RuntimeException exception = null;
         for (final T task : tasks) {
             try {
@@ -176,7 +138,7 @@ private RuntimeException closeNonRunningTasks(final 
Collection<T> tasks) {
 
     private RuntimeException suspendTasks(final Collection<T> tasks) {
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
-        for (Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
+        for (final Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
             final T task = it.next();
             try {
                 task.suspend();
@@ -247,27 +209,17 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, 
final Set<TopicPartition>
         return false;
     }
 
-    private void addToRestoring(final T task) {
-        restoring.put(task.id(), task);
-        for (TopicPartition topicPartition : task.partitions()) {
-            restoringByPartition.put(topicPartition, task);
-        }
-        for (TopicPartition topicPartition : task.changelogPartitions()) {
-            restoringByPartition.put(topicPartition, task);
-        }
-    }
-
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    private void transitionToRunning(final T task) {
+    void transitionToRunning(final T task) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
         task.initializeTopology();
-        for (TopicPartition topicPartition : task.partitions()) {
+        for (final TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
         }
-        for (TopicPartition topicPartition : task.changelogPartitions()) {
+        for (final TopicPartition topicPartition : task.changelogPartitions()) 
{
             runningByPartition.put(topicPartition, task);
         }
     }
@@ -288,15 +240,14 @@ public String toString(final String indent) {
         final StringBuilder builder = new StringBuilder();
         describe(builder, running.values(), indent, "Running:");
         describe(builder, suspended.values(), indent, "Suspended:");
-        describe(builder, restoring.values(), indent, "Restoring:");
         describe(builder, created.values(), indent, "New:");
         return builder.toString();
     }
 
-    private void describe(final StringBuilder builder,
-                          final Collection<T> tasks,
-                          final String indent,
-                          final String name) {
+    void describe(final StringBuilder builder,
+                  final Collection<T> tasks,
+                  final String indent,
+                  final String name) {
         builder.append(indent).append(name);
         for (final T t : tasks) {
             builder.append(indent).append(t.toString(indent + "\t\t"));
@@ -304,35 +255,27 @@ private void describe(final StringBuilder builder,
         builder.append("\n");
     }
 
-    private List<T> allTasks() {
+    List<T> allTasks() {
         final List<T> tasks = new ArrayList<>();
         tasks.addAll(running.values());
         tasks.addAll(suspended.values());
-        tasks.addAll(restoring.values());
         tasks.addAll(created.values());
         return tasks;
     }
 
-    Collection<T> restoringTasks() {
-        return Collections.unmodifiableCollection(restoring.values());
-    }
-
     Set<TaskId> allAssignedTaskIds() {
         final Set<TaskId> taskIds = new HashSet<>();
         taskIds.addAll(running.keySet());
         taskIds.addAll(suspended.keySet());
-        taskIds.addAll(restoring.keySet());
         taskIds.addAll(created.keySet());
         return taskIds;
     }
 
     void clear() {
         runningByPartition.clear();
-        restoringByPartition.clear();
         running.clear();
         created.clear();
         suspended.clear();
-        restoredPartitions.clear();
     }
 
     Set<TaskId> previousTaskIds() {
@@ -351,7 +294,7 @@ int commit() {
     void applyToRunningTasks(final TaskAction<T> action) {
         RuntimeException firstException = null;
 
-        for (Iterator<T> it = running().iterator(); it.hasNext(); ) {
+        for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
             final T task = it.next();
             try {
                 action.apply(task);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 33dce9e7558..e0bac939b8b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -26,13 +26,13 @@
 
     static final int NO_CHECKPOINT = -1;
 
-    private final Long checkpoint;
     private final long offsetLimit;
     private final boolean persistent;
     private final String storeName;
     private final TopicPartition partition;
     private final CompositeRestoreListener compositeRestoreListener;
 
+    private long checkpointOffset;
     private long restoredOffset;
     private long startingOffset;
     private long endingOffset;
@@ -45,7 +45,7 @@
                   final String storeName) {
         this.partition = partition;
         this.compositeRestoreListener = compositeRestoreListener;
-        this.checkpoint = checkpoint;
+        this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : 
checkpoint;
         this.offsetLimit = offsetLimit;
         this.persistent = persistent;
         this.storeName = storeName;
@@ -56,7 +56,15 @@ public TopicPartition partition() {
     }
 
     long checkpoint() {
-        return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+        return checkpointOffset;
+    }
+
+    void setCheckpointOffset(final long checkpointOffset) {
+        this.checkpointOffset = checkpointOffset;
+    }
+
+    public String storeName() {
+        return storeName;
     }
 
     void restoreStarted() {
@@ -67,7 +75,8 @@ void restoreDone() {
         compositeRestoreListener.onRestoreEnd(partition, storeName, 
restoredNumRecords());
     }
 
-    void restoreBatchCompleted(long currentRestoredOffset, int numRestored) {
+    void restoreBatchCompleted(final long currentRestoredOffset,
+                               final int numRestored) {
         compositeRestoreListener.onBatchRestored(partition, storeName, 
currentRestoredOffset, numRestored);
     }
 
@@ -79,7 +88,7 @@ boolean isPersistent() {
         return persistent;
     }
 
-    void setUserRestoreListener(StateRestoreListener userRestoreListener) {
+    void setUserRestoreListener(final StateRestoreListener 
userRestoreListener) {
         
this.compositeRestoreListener.setUserRestoreListener(userRestoreListener);
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5fcba76570e..e0fc82de016 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -61,9 +61,14 @@ public StoreChangelogReader(final Consumer<byte[], byte[]> 
restoreConsumer,
 
     @Override
     public void register(final StateRestorer restorer) {
-        restorer.setUserRestoreListener(userStateRestoreListener);
-        stateRestorers.put(restorer.partition(), restorer);
-        needsInitializing.put(restorer.partition(), restorer);
+        final StateRestorer existingRestorer = 
stateRestorers.get(restorer.partition());
+        if (existingRestorer == null) {
+            restorer.setUserRestoreListener(userStateRestoreListener);
+            stateRestorers.put(restorer.partition(), restorer);
+            needsInitializing.put(restorer.partition(), restorer);
+        } else {
+            needsInitializing.put(restorer.partition(), existingRestorer);
+        }
     }
 
     /**
@@ -71,7 +76,7 @@ public void register(final StateRestorer restorer) {
      */
     public Collection<TopicPartition> restore(final RestoringTasks active) {
         if (!needsInitializing.isEmpty()) {
-            initialize();
+            initialize(active);
         }
 
         if (needsRestoring.isEmpty()) {
@@ -103,7 +108,7 @@ public void register(final StateRestorer restorer) {
         return completed();
     }
 
-    private void initialize() {
+    private void initialize(final RestoringTasks active) {
         if (!restoreConsumer.subscription().isEmpty()) {
             throw new StreamsException("Restore consumer should not be 
subscribed to any topics (" + restoreConsumer.subscription() + ")");
         }
@@ -112,8 +117,8 @@ private void initialize() {
         // the needsInitializing map is not empty, meaning we do not know the 
metadata for some of them yet
         refreshChangelogInfo();
 
-        Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
-        for (Map.Entry<TopicPartition, StateRestorer> entry : 
needsInitializing.entrySet()) {
+        final Map<TopicPartition, StateRestorer> initializable = new 
HashMap<>();
+        for (final Map.Entry<TopicPartition, StateRestorer> entry : 
needsInitializing.entrySet()) {
             final TopicPartition topicPartition = entry.getKey();
             if (hasPartition(topicPartition)) {
                 initializable.put(entry.getKey(), entry.getValue());
@@ -157,11 +162,12 @@ private void initialize() {
 
         // set up restorer for those initializable
         if (!initializable.isEmpty()) {
-            startRestoration(initializable);
+            startRestoration(initializable, active);
         }
     }
 
-    private void startRestoration(final Map<TopicPartition, StateRestorer> 
initialized) {
+    private void startRestoration(final Map<TopicPartition, StateRestorer> 
initialized,
+                                  final RestoringTasks active) {
         log.debug("Start restoring state stores from changelog topics {}", 
initialized.keySet());
 
         final Set<TopicPartition> assignment = new 
HashSet<>(restoreConsumer.assignment());
@@ -170,26 +176,42 @@ private void startRestoration(final Map<TopicPartition, 
StateRestorer> initializ
 
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
         for (final StateRestorer restorer : initialized.values()) {
+            final TopicPartition restoringPartition = restorer.partition();
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
-                restoreConsumer.seek(restorer.partition(), 
restorer.checkpoint());
-                logRestoreOffsets(restorer.partition(),
+                restoreConsumer.seek(restoringPartition, 
restorer.checkpoint());
+                logRestoreOffsets(restoringPartition,
                                   restorer.checkpoint(),
-                                  endOffsets.get(restorer.partition()));
-                
restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
+                                  endOffsets.get(restoringPartition));
+                
restorer.setStartingOffset(restoreConsumer.position(restoringPartition));
                 restorer.restoreStarted();
             } else {
-                
restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+                
restoreConsumer.seekToBeginning(Collections.singletonList(restoringPartition));
                 needsPositionUpdate.add(restorer);
             }
         }
 
         for (final StateRestorer restorer : needsPositionUpdate) {
-            final long position = 
restoreConsumer.position(restorer.partition());
-            logRestoreOffsets(restorer.partition(),
-                              position,
-                              endOffsets.get(restorer.partition()));
-            restorer.setStartingOffset(position);
-            restorer.restoreStarted();
+            final TopicPartition restoringPartition = restorer.partition();
+            final StreamTask task = 
active.restoringTaskFor(restoringPartition);
+            // If checkpoint does not exist it means the task was not shutdown 
gracefully before;
+            // and in this case if EOS is turned on we should wipe out the 
state and re-initialize the task
+            if (task.eosEnabled) {
+                log.info("No checkpoint found for task {} state store {} 
changelog {} with EOS turned on. " +
+                    "Reinitializing the task and restore its state from the 
beginning.", task.id, restorer.storeName(), restoringPartition);
+                // we move the partitions here, because they will be added 
back within
+                // `task.reinitializeStateStoresForPartitions()` that calls 
`register()` internally again
+                needsInitializing.remove(restoringPartition);
+                
restorer.setCheckpointOffset(restoreConsumer.position(restoringPartition));
+                
task.reinitializeStateStoresForPartitions(Collections.singleton(restoringPartition));
+            } else {
+                log.info("Restoring task {}'s state store {} from beginning of 
the changelog {} ", task.id, restorer.storeName(), restoringPartition);
+                final long position = 
restoreConsumer.position(restoringPartition);
+                logRestoreOffsets(restoringPartition,
+                    position,
+                    endOffsets.get(restoringPartition));
+                restorer.setStartingOffset(position);
+                restorer.restoreStarted();
+            }
         }
 
         needsRestoring.putAll(initialized);
@@ -280,7 +302,7 @@ private long processNext(final List<ConsumerRecord<byte[], 
byte[]>> records,
                              final Long endOffset) {
         final List<KeyValue<byte[], byte[]>> restoreRecords = new 
ArrayList<>();
         long nextPosition = -1;
-        int numberRecords = records.size();
+        final int numberRecords = records.size();
         int numberRestored = 0;
         long lastRestoredOffset = -1;
         for (final ConsumerRecord<byte[], byte[]> record : records) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 6c7b2b43c9d..f6811c66702 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -317,7 +317,7 @@ public void shouldNotViolateEosIfOneTaskFails() throws 
Exception {
         // the app is supposed to copy all 40 records into the output topic
         // the app commits after each 10 records per partition, and thus will 
have 2*5 uncommitted writes
         //
-        // the failure gets inject after 20 committed and 30 uncommitted 
records got received
+        // the failure gets inject after 20 committed and 10 uncommitted 
records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records (even if 50 
record got written)
 
@@ -392,7 +392,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() 
throws Exception {
         // the app commits after each 10 records per partition, and thus will 
have 2*5 uncommitted writes
         // and store updates (ie, another 5 uncommitted writes to a changelog 
topic per partition)
         //
-        // the failure gets inject after 20 committed and 30 uncommitted 
records got received
+        // the failure gets inject after 20 committed and 10 uncommitted 
records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records and the state 
stores should contain the correct sums
         // per key (even if some records got processed twice)
@@ -402,7 +402,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() 
throws Exception {
             streams.start();
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = 
prepareData(0L, 10L, 0L, 1L);
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = 
prepareData(10L, 15L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = 
prepareData(10L, 15L, 0L, 1L, 2L, 3L);
 
             final List<KeyValue<Long, Long>> dataBeforeFailure = new 
ArrayList<>();
             dataBeforeFailure.addAll(committedDataBeforeFailure);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index c65d4efadb1..a117dc30adf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -100,7 +100,10 @@ public void shouldRequestTopicsAndHandleTimeoutException() 
{
     public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
         final StateRestorer mockRestorer = EasyMock.mock(StateRestorer.class);
         mockRestorer.setUserRestoreListener(stateRestoreListener);
-        expect(mockRestorer.partition()).andReturn(new 
TopicPartition("sometopic", 0)).andReturn(new TopicPartition("sometopic", 0));
+        expect(mockRestorer.partition())
+            .andReturn(new TopicPartition("sometopic", 0))
+            .andReturn(new TopicPartition("sometopic", 0))
+            .andReturn(new TopicPartition("sometopic", 0));
         EasyMock.replay(mockRestorer);
         changelogReader.register(mockRestorer);
 
@@ -119,6 +122,10 @@ public void 
shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(messages));
     }
@@ -133,10 +140,9 @@ public void 
shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
                 return Collections.singleton(topicPartition);
             }
         });
-        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
 
-        
EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        
EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task).andReturn(task);
         EasyMock.replay(active);
 
         // first restore call "fails" but we should not die with an exception
@@ -165,6 +171,9 @@ public void shouldClearAssignmentAtEndOfRestore() {
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true,
                                                    "storeName"));
 
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
         assertThat(consumer.assignment(), 
equalTo(Collections.<TopicPartition>emptySet()));
     }
@@ -175,6 +184,10 @@ public void shouldRestoreToLimitWhenSupplied() {
         final StateRestorer restorer = new StateRestorer(topicPartition, 
restoreListener, null, 3, true,
                                                          "storeName");
         changelogReader.register(restorer);
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
@@ -197,8 +210,9 @@ public void shouldRestoreMultipleStores() {
         changelogReader.register(new StateRestorer(one, restoreListener1, 
null, Long.MAX_VALUE, true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, 
null, Long.MAX_VALUE, true, "storeName3"));
 
-        expect(active.restoringTaskFor(one)).andReturn(null);
-        expect(active.restoringTaskFor(two)).andReturn(null);
+        expect(active.restoringTaskFor(one)).andReturn(task);
+        expect(active.restoringTaskFor(two)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
         changelogReader.restore(active);
 
@@ -224,8 +238,9 @@ public void shouldRestoreAndNotifyMultipleStores() throws 
Exception {
         changelogReader.register(new StateRestorer(one, restoreListener1, 
null, Long.MAX_VALUE, true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, 
null, Long.MAX_VALUE, true, "storeName3"));
 
-        expect(active.restoringTaskFor(one)).andReturn(null);
-        expect(active.restoringTaskFor(two)).andReturn(null);
+        expect(active.restoringTaskFor(one)).andReturn(task);
+        expect(active.restoringTaskFor(two)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
         changelogReader.restore(active);
 
@@ -246,8 +261,11 @@ public void shouldRestoreAndNotifyMultipleStores() throws 
Exception {
     @Test
     public void shouldOnlyReportTheLastRestoredOffset() {
         setupConsumer(10, topicPartition);
-        changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, null, 
5, true, "storeName1"));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, 5, true, "storeName1"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(5));
@@ -306,6 +324,10 @@ public void 
shouldNotRestoreAnythingWhenCheckpointAtEndOffset() {
     public void shouldReturnRestoredOffsetsForPersistentStores() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = 
changelogReader.restoredOffsets();
         assertThat(restoredOffsets, 
equalTo(Collections.singletonMap(topicPartition, 10L)));
@@ -315,6 +337,10 @@ public void 
shouldReturnRestoredOffsetsForPersistentStores() {
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = 
changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, 
Long>emptyMap()));
@@ -328,8 +354,11 @@ public void shouldIgnoreNullKeysWhenRestoring() {
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), 
topicPartition.partition(), 1, (byte[]) null, bytes));
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), 
topicPartition.partition(), 2, bytes, bytes));
         consumer.assign(Collections.singletonList(topicPartition));
-        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
 
         assertThat(callback.restored, 
CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), 
KeyValue.pair(bytes, bytes))));
@@ -354,9 +383,10 @@ public void 
shouldRestorePartitionsRegisteredPostInitialization() {
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
 
         final TopicPartition postInitialization = new TopicPartition("other", 
0);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
-        expect(active.restoringTaskFor(postInitialization)).andReturn(null);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        expect(active.restoringTaskFor(postInitialization)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
 
         assertTrue(changelogReader.restore(active).isEmpty());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> --------------------------------------------
>
>                 Key: KAFKA-7192
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7192
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>            Reporter: Jon Bates
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: bugs
>             Fix For: 0.11.0.4, 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to