[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16559096#comment-16559096
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---------------------------------------

guozhangwang closed pull request #5421: KAFKA-7192: Wipe out if EOS is turned 
on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5421
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 188ff473038..94e4c71d9c2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -137,6 +137,10 @@ public String toString() {
         return toString("");
     }
 
+    public boolean isEosEnabled() {
+        return eosEnabled;
+    }
+
     /**
      * Produces a string representation containing useful information about a 
Task starting with the given indent.
      * This is useful in debugging scenarios.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 33dce9e7558..c1a41cefc23 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -55,6 +55,10 @@ public TopicPartition partition() {
         return partition;
     }
 
+    public String storeName() {
+        return storeName;
+    }
+
     long checkpoint() {
         return checkpoint == null ? NO_CHECKPOINT : checkpoint;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 07af8019aef..1927b5a7af7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -71,7 +71,7 @@ public void register(final StateRestorer restorer) {
 
     public Collection<TopicPartition> restore(final RestoringTasks active) {
         if (!needsInitializing.isEmpty()) {
-            initialize();
+            initialize(active);
         }
 
         if (needsRestoring.isEmpty()) {
@@ -111,7 +111,7 @@ public void register(final StateRestorer restorer) {
         return completed();
     }
 
-    private void initialize() {
+    private void initialize(final RestoringTasks active) {
         if (!restoreConsumer.subscription().isEmpty()) {
             throw new StreamsException("Restore consumer should not be 
subscribed to any topics (" + restoreConsumer.subscription() + ")");
         }
@@ -165,11 +165,12 @@ private void initialize() {
 
         // set up restorer for those initializable
         if (!initializable.isEmpty()) {
-            startRestoration(initializable);
+            startRestoration(initializable, active);
         }
     }
 
-    private void startRestoration(final Map<TopicPartition, StateRestorer> 
initialized) {
+    private void startRestoration(final Map<TopicPartition, StateRestorer> 
initialized,
+                                  final RestoringTasks active) {
         log.debug("Start restoring state stores from changelog topics {}", 
initialized.keySet());
 
         final Set<TopicPartition> assignment = new 
HashSet<>(restoreConsumer.assignment());
@@ -186,6 +187,18 @@ private void startRestoration(final Map<TopicPartition, 
StateRestorer> initializ
                 
restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
                 restorer.restoreStarted();
             } else {
+                final StreamTask task = 
active.restoringTaskFor(restorer.partition());
+
+                // If checkpoint does not exist it means the task was not 
shutdown gracefully before;
+                // and in this case if EOS is turned on we should wipe out the 
state and re-initialize the task
+                if (task.isEosEnabled()) {
+                    log.info("No checkpoint found for task {} state store {} 
changelog {} with EOS turned on. " +
+                            "Reinitializing the task and restore its state 
from the beginning.", task.id, restorer.storeName(), restorer.partition());
+                    
task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition()));
+                } else {
+                    log.info("Restoring task {}'s state store {} from 
beginning of the changelog {} ", task.id, restorer.storeName(), 
restorer.partition());
+                }
+
                 
restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
                 needsPositionUpdate.add(restorer);
             }
@@ -280,6 +293,9 @@ private long processNext(final List<ConsumerRecord<byte[], 
byte[]>> records,
         if (!restoreRecords.isEmpty()) {
             restorer.restore(restoreRecords);
             restorer.restoreBatchCompleted(lastRestoredOffset, records.size());
+
+            log.trace("Restored from {} to {} with {} records, ending offset 
is {}, next starting position is {}",
+                    restorer.partition(), restorer.storeName(), 
records.size(), lastRestoredOffset, nextPosition);
         }
 
         return nextPosition;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 30c90c23bb3..770f579ad98 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -391,8 +391,9 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() 
throws Exception {
         // the app is supposed to emit all 40 update records into the output 
topic
         // the app commits after each 10 records per partition, and thus will 
have 2*5 uncommitted writes
         // and store updates (ie, another 5 uncommitted writes to a changelog 
topic per partition)
+        // in the uncommitted batch sending some data for the new key to 
validate that upon resuming they will not be shown up in the store
         //
-        // the failure gets inject after 20 committed and 30 uncommitted 
records got received
+        // the failure gets inject after 20 committed and 10 uncommitted 
records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records and the state 
stores should contain the correct sums
         // per key (even if some records got processed twice)
@@ -402,7 +403,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() 
throws Exception {
             streams.start();
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = 
prepareData(0L, 10L, 0L, 1L);
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = 
prepareData(10L, 15L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = 
prepareData(10L, 15L, 0L, 1L, 2L, 3L);
 
             final List<KeyValue<Long, Long>> dataBeforeFailure = new 
ArrayList<>();
             dataBeforeFailure.addAll(committedDataBeforeFailure);
@@ -610,10 +611,6 @@ public void init(final ProcessorContext context) {
 
                     @Override
                     public KeyValue<Long, Long> transform(final Long key, 
final Long value) {
-                        if (errorInjected.compareAndSet(true, false)) {
-                            // only tries to fail once on one of the task
-                            throw new RuntimeException("Injected test 
exception.");
-                        }
                         if (gcInjected.compareAndSet(true, false)) {
                             while (doGC) {
                                 try {
@@ -631,16 +628,27 @@ public void init(final ProcessorContext context) {
 
                         if (state != null) {
                             Long sum = state.get(key);
+
                             if (sum == null) {
                                 sum = value;
                             } else {
                                 sum += value;
                             }
                             state.put(key, sum);
-                            context.forward(key, sum);
-                            return null;
+                            state.flush();
+                        }
+
+
+                        if (errorInjected.compareAndSet(true, false)) {
+                            // only tries to fail once on one of the task
+                            throw new RuntimeException("Injected test 
exception.");
+                        }
+
+                        if (state != null) {
+                            return new KeyValue<>(key, state.get(key));
+                        } else {
+                            return new KeyValue<>(key, value);
                         }
-                        return new KeyValue<>(key, value);
                     }
 
                     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 90abf32477f..1e74d47808e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -119,7 +119,10 @@ public void 
shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
+
         assertThat(callback.restored.size(), equalTo(messages));
     }
 
@@ -136,8 +139,8 @@ public void 
shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true,
                                                    "storeName"));
 
-        
EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        EasyMock.replay(active);
+        
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        EasyMock.replay(active, task);
 
         // first restore call "fails" but we should not die with an exception
         assertEquals(0, changelogReader.restore(active).size());
@@ -164,7 +167,8 @@ public void shouldClearAssignmentAtEndOfRestore() {
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true,
                                                    "storeName"));
-
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
         assertThat(consumer.assignment(), 
equalTo(Collections.<TopicPartition>emptySet()));
     }
@@ -175,6 +179,8 @@ public void shouldRestoreToLimitWhenSupplied() {
         final StateRestorer restorer = new StateRestorer(topicPartition, 
restoreListener, null, 3, true,
                                                          "storeName");
         changelogReader.register(restorer);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
@@ -192,14 +198,14 @@ public void shouldRestoreMultipleStores() {
         setupConsumer(5, one);
         setupConsumer(3, two);
 
-        changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, null, 
Long.MAX_VALUE, true, "storeName1"));
+        changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
         changelogReader.register(new StateRestorer(one, restoreListener1, 
null, Long.MAX_VALUE, true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, 
null, Long.MAX_VALUE, true, "storeName3"));
 
-        expect(active.restoringTaskFor(one)).andReturn(null);
-        expect(active.restoringTaskFor(two)).andReturn(null);
-        replay(active);
+        expect(active.restoringTaskFor(one)).andStubReturn(task);
+        expect(active.restoringTaskFor(two)).andStubReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(10));
@@ -224,9 +230,13 @@ public void shouldRestoreAndNotifyMultipleStores() throws 
Exception {
         changelogReader.register(new StateRestorer(one, restoreListener1, 
null, Long.MAX_VALUE, true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, 
null, Long.MAX_VALUE, true, "storeName3"));
 
-        expect(active.restoringTaskFor(one)).andReturn(null);
-        expect(active.restoringTaskFor(two)).andReturn(null);
-        replay(active);
+        expect(active.restoringTaskFor(one)).andReturn(task);
+        expect(active.restoringTaskFor(two)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
+        changelogReader.restore(active);
+
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(10));
@@ -248,6 +258,8 @@ public void shouldOnlyReportTheLastRestoredOffset() {
         setupConsumer(10, topicPartition);
         changelogReader
             .register(new StateRestorer(topicPartition, restoreListener, null, 
5, true, "storeName1"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(5));
@@ -306,7 +318,10 @@ public void 
shouldNotRestoreAnythingWhenCheckpointAtEndOffset() {
     public void shouldReturnRestoredOffsetsForPersistentStores() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
+
         final Map<TopicPartition, Long> restoredOffsets = 
changelogReader.restoredOffsets();
         assertThat(restoredOffsets, 
equalTo(Collections.singletonMap(topicPartition, 10L)));
     }
@@ -315,6 +330,8 @@ public void 
shouldReturnRestoredOffsetsForPersistentStores() {
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = 
changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, 
Long>emptyMap()));
@@ -330,6 +347,8 @@ public void shouldIgnoreNullKeysWhenRestoring() {
         consumer.assign(Collections.singletonList(topicPartition));
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false,
                                                    "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored, 
CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), 
KeyValue.pair(bytes, bytes))));
@@ -340,6 +359,9 @@ public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
         final Collection<TopicPartition> expected = 
Collections.singleton(topicPartition);
         setupConsumer(0, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "store"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
+
         final Collection<TopicPartition> restored = 
changelogReader.restore(active);
         assertThat(restored, equalTo(expected));
     }
@@ -354,10 +376,9 @@ public void 
shouldRestorePartitionsRegisteredPostInitialization() {
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, false, "storeName"));
 
         final TopicPartition postInitialization = new TopicPartition("other", 
0);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
-        expect(active.restoringTaskFor(postInitialization)).andReturn(null);
-        replay(active);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        
expect(active.restoringTaskFor(postInitialization)).andStubReturn(task);
+        replay(active, task);
 
         assertTrue(changelogReader.restore(active).isEmpty());
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> --------------------------------------------
>
>                 Key: KAFKA-7192
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7192
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.1
>            Reporter: Jon Bates
>            Priority: Critical
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to