[ https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16559096#comment-16559096 ]
ASF GitHub Bot commented on KAFKA-7192: --------------------------------------- guozhangwang closed pull request #5421: KAFKA-7192: Wipe out if EOS is turned on and checkpoint file does not exist URL: https://github.com/apache/kafka/pull/5421 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 188ff473038..94e4c71d9c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -137,6 +137,10 @@ public String toString() { return toString(""); } + public boolean isEosEnabled() { + return eosEnabled; + } + /** * Produces a string representation containing useful information about a Task starting with the given indent. * This is useful in debugging scenarios. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index 33dce9e7558..c1a41cefc23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -55,6 +55,10 @@ public TopicPartition partition() { return partition; } + public String storeName() { + return storeName; + } + long checkpoint() { return checkpoint == null ? NO_CHECKPOINT : checkpoint; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 07af8019aef..1927b5a7af7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -71,7 +71,7 @@ public void register(final StateRestorer restorer) { public Collection<TopicPartition> restore(final RestoringTasks active) { if (!needsInitializing.isEmpty()) { - initialize(); + initialize(active); } if (needsRestoring.isEmpty()) { @@ -111,7 +111,7 @@ public void register(final StateRestorer restorer) { return completed(); } - private void initialize() { + private void initialize(final RestoringTasks active) { if (!restoreConsumer.subscription().isEmpty()) { throw new StreamsException("Restore consumer should not be subscribed to any topics (" + restoreConsumer.subscription() + ")"); } @@ -165,11 +165,12 @@ private void initialize() { // set up restorer for those initializable if (!initializable.isEmpty()) { - startRestoration(initializable); + startRestoration(initializable, active); } } - private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) { + private void startRestoration(final Map<TopicPartition, StateRestorer> initialized, + final RestoringTasks active) { log.debug("Start restoring state stores from changelog topics {}", initialized.keySet()); final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment()); @@ -186,6 +187,18 @@ private void startRestoration(final Map<TopicPartition, StateRestorer> initializ restorer.setStartingOffset(restoreConsumer.position(restorer.partition())); restorer.restoreStarted(); } else { + final StreamTask task = active.restoringTaskFor(restorer.partition()); + + // If checkpoint does not exist it means the task was not shutdown gracefully before; + // and in this case if EOS is turned on we should wipe out the state and re-initialize the task + if (task.isEosEnabled()) { + log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " + + "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restorer.partition()); + task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition())); + } else { + log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restorer.partition()); + } + restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition())); needsPositionUpdate.add(restorer); } @@ -280,6 +293,9 @@ private long processNext(final List<ConsumerRecord<byte[], byte[]>> records, if (!restoreRecords.isEmpty()) { restorer.restore(restoreRecords); restorer.restoreBatchCompleted(lastRestoredOffset, records.size()); + + log.trace("Restored from {} to {} with {} records, ending offset is {}, next starting position is {}", + restorer.partition(), restorer.storeName(), records.size(), lastRestoredOffset, nextPosition); } return nextPosition; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 30c90c23bb3..770f579ad98 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -391,8 +391,9 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception { // the app is supposed to emit all 40 update records into the output topic // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition) + // in the uncommitted batch sending some data for the new key to validate that upon resuming they will not be shown up in the store // - // the failure gets inject after 20 committed and 30 uncommitted records got received + // the failure gets inject after 20 committed and 10 uncommitted records got received // -> the failure only kills one thread // after fail over, we should read 40 committed records and the state stores should contain the correct sums // per key (even if some records got processed twice) @@ -402,7 +403,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception { streams.start(); final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L); - final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L); + final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L, 2L, 3L); final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>(); dataBeforeFailure.addAll(committedDataBeforeFailure); @@ -610,10 +611,6 @@ public void init(final ProcessorContext context) { @Override public KeyValue<Long, Long> transform(final Long key, final Long value) { - if (errorInjected.compareAndSet(true, false)) { - // only tries to fail once on one of the task - throw new RuntimeException("Injected test exception."); - } if (gcInjected.compareAndSet(true, false)) { while (doGC) { try { @@ -631,16 +628,27 @@ public void init(final ProcessorContext context) { if (state != null) { Long sum = state.get(key); + if (sum == null) { sum = value; } else { sum += value; } state.put(key, sum); - context.forward(key, sum); - return null; + state.flush(); + } + + + if (errorInjected.compareAndSet(true, false)) { + // only tries to fail once on one of the task + throw new RuntimeException("Injected test exception."); + } + + if (state != null) { + return new KeyValue<>(key, state.get(key)); + } else { + return new KeyValue<>(key, value); } - return new KeyValue<>(key, value); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 90abf32477f..1e74d47808e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -119,7 +119,10 @@ public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() { final int messages = 10; setupConsumer(messages, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); + assertThat(callback.restored.size(), equalTo(messages)); } @@ -136,8 +139,8 @@ public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() { changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task); - EasyMock.replay(active); + EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + EasyMock.replay(active, task); // first restore call "fails" but we should not die with an exception assertEquals(0, changelogReader.restore(active).size()); @@ -164,7 +167,8 @@ public void shouldClearAssignmentAtEndOfRestore() { setupConsumer(messages, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); - + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet())); } @@ -175,6 +179,8 @@ public void shouldRestoreToLimitWhenSupplied() { final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, 3, true, "storeName"); changelogReader.register(restorer); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(3)); assertThat(restorer.restoredOffset(), equalTo(3L)); @@ -192,14 +198,14 @@ public void shouldRestoreMultipleStores() { setupConsumer(5, one); setupConsumer(3, two); - changelogReader - .register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1")); + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1")); changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2")); changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3")); - expect(active.restoringTaskFor(one)).andReturn(null); - expect(active.restoringTaskFor(two)).andReturn(null); - replay(active); + expect(active.restoringTaskFor(one)).andStubReturn(task); + expect(active.restoringTaskFor(two)).andStubReturn(task); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(10)); @@ -224,9 +230,13 @@ public void shouldRestoreAndNotifyMultipleStores() throws Exception { changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2")); changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3")); - expect(active.restoringTaskFor(one)).andReturn(null); - expect(active.restoringTaskFor(two)).andReturn(null); - replay(active); + expect(active.restoringTaskFor(one)).andReturn(task); + expect(active.restoringTaskFor(two)).andReturn(task); + expect(active.restoringTaskFor(topicPartition)).andReturn(task); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); + changelogReader.restore(active); + changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(10)); @@ -248,6 +258,8 @@ public void shouldOnlyReportTheLastRestoredOffset() { setupConsumer(10, topicPartition); changelogReader .register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(5)); @@ -306,7 +318,10 @@ public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() { public void shouldReturnRestoredOffsetsForPersistentStores() { setupConsumer(10, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); + final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets(); assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L))); } @@ -315,6 +330,8 @@ public void shouldReturnRestoredOffsetsForPersistentStores() { public void shouldNotReturnRestoredOffsetsForNonPersistentStore() { setupConsumer(10, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets(); assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap())); @@ -330,6 +347,8 @@ public void shouldIgnoreNullKeysWhenRestoring() { consumer.assign(Collections.singletonList(topicPartition)); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); changelogReader.restore(active); assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), KeyValue.pair(bytes, bytes)))); @@ -340,6 +359,9 @@ public void shouldCompleteImmediatelyWhenEndOffsetIs0() { final Collection<TopicPartition> expected = Collections.singleton(topicPartition); setupConsumer(0, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "store")); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); + final Collection<TopicPartition> restored = changelogReader.restore(active); assertThat(restored, equalTo(expected)); } @@ -354,10 +376,9 @@ public void shouldRestorePartitionsRegisteredPostInitialization() { changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); final TopicPartition postInitialization = new TopicPartition("other", 0); - expect(active.restoringTaskFor(topicPartition)).andReturn(null); - expect(active.restoringTaskFor(topicPartition)).andReturn(null); - expect(active.restoringTaskFor(postInitialization)).andReturn(null); - replay(active); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + expect(active.restoringTaskFor(postInitialization)).andStubReturn(task); + replay(active, task); assertTrue(changelogReader.restore(active).isEmpty()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > State-store can desynchronise with changelog > -------------------------------------------- > > Key: KAFKA-7192 > URL: https://issues.apache.org/jira/browse/KAFKA-7192 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.1 > Reporter: Jon Bates > Priority: Critical > > n.b. this bug has been verified with exactly-once processing enabled > Consider the following scenario: > * A record, N is read into a Kafka topology > * the state store is updated > * the topology crashes > h3. *Expected behaviour:* > # Node is restarted > # Offset was never updated, so record N is reprocessed > # State-store is reset to position N-1 > # Record is reprocessed > h3. *Actual Behaviour* > # Node is restarted > # Record N is reprocessed (good) > # The state store has the state from the previous processing > I'd consider this a corruption of the state-store, hence the critical > Priority, although High may be more appropriate. > I wrote a proof-of-concept here, which demonstrates the problem on Linux: > [https://github.com/spadger/kafka-streams-sad-state-store] -- This message was sent by Atlassian JIRA (v7.6.3#76005)