[
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16615257#comment-16615257
]
ASF GitHub Bot commented on KAFKA-7192:
---------------------------------------
mjsax closed pull request #5641: KAFKA-7192: Wipe out state store if EOS is
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5641
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 04af9f269b7..c094b838a1c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -193,4 +193,10 @@ public ThreadCache getCache() {
public void initialized() {
initialized = true;
}
+
+ @Override
+ public void uninitialize() {
+ initialized = false;
+ }
+
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 7f6ac7ca614..f8f6416b3e5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -226,6 +226,10 @@ void initStateStores() {
}
}
+ void reinitializeStateStoresForPartitions(final TopicPartition partitions)
{
+ stateMgr.reinitializeStateStoresForPartitions(partitions,
processorContext);
+ }
+
/**
* @throws ProcessorStateException if there is an error while closing the
state manager
* @param writeCheckpoint boolean indicating if a checkpoint file should
be written
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index d59ec2b2c1b..ad4868f8fce 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -298,7 +298,7 @@ private void describe(final StringBuilder builder,
return suspended.values();
}
- Collection<T> restoringTasks() {
+ public Collection<T> restoringTasks() {
return Collections.unmodifiableCollection(restoring.values());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 5ebc34c4e92..e82ee2c354f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -37,7 +37,7 @@
* Restore all registered state stores by reading from their changelogs.
* @return all topic partitions that have been restored
*/
- Collection<TopicPartition> restore();
+ Collection<TopicPartition> restore(final Collection<StreamTask>
restoringTasks);
/**
* @return the restored offsets for all persistent stores.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 57bb3ac81a6..b5719b111f0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -53,4 +53,9 @@
* Mark this contex as being initialized
*/
void initialized();
+
+ /**
+ * Mark this context as being uninitialized
+ */
+ void uninitialize();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 34a87ce03be..93e7ffc5a06 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -18,6 +18,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
@@ -33,9 +34,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class ProcessorStateManager implements StateManager {
@@ -50,6 +53,7 @@
private final String logPrefix;
private final boolean isStandby;
private final ChangelogReader changelogReader;
+ private final boolean eosEnabled;
private final Map<String, StateStore> stores;
private final Map<String, StateStore> globalStores;
private final Map<TopicPartition, Long> offsetLimits;
@@ -106,6 +110,7 @@ public ProcessorStateManager(final TaskId taskId,
checkpoint = new OffsetCheckpoint(new File(baseDir,
CHECKPOINT_FILE_NAME));
checkpointedOffsets = new HashMap<>(checkpoint.read());
+ this.eosEnabled = eosEnabled;
if (eosEnabled) {
// delete the checkpoint file after finish loading its stored
offsets
checkpoint.delete();
@@ -169,7 +174,8 @@ public void register(final StateStore store,
stateRestoreCallback,
checkpointedOffsets.get(storePartition),
offsetLimit(storePartition),
- store.persistent()
+
store.persistent(),
+ store.name()
);
changelogReader.register(restorer);
@@ -178,6 +184,61 @@ public void register(final StateStore store,
stores.put(store.name(), store);
}
+ void reinitializeStateStoresForPartitions(final TopicPartition
topicPartition,
+ final InternalProcessorContext
processorContext) {
+ final Map<String, String> changelogTopicToStore =
inverseOneToOneMap(storeToChangelogTopic);
+ final Set<String> storeToBeReinitialized = new HashSet<>();
+ final Map<String, StateStore> storesCopy = new HashMap<>(stores);
+
+ checkpointedOffsets.remove(topicPartition);
+
storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
+
+ if (!eosEnabled) {
+ try {
+ checkpoint.write(checkpointedOffsets);
+ } catch (final IOException fatalException) {
+ log.error("Failed to write offset checkpoint file to {} while
re-initializing {}: {}", checkpoint, stores, fatalException);
+ throw new StreamsException("Failed to reinitialize stores.",
fatalException);
+ }
+ }
+
+ for (final Map.Entry<String, StateStore> entry :
storesCopy.entrySet()) {
+ final StateStore stateStore = entry.getValue();
+ final String storeName = stateStore.name();
+ if (storeToBeReinitialized.contains(storeName)) {
+ try {
+ stateStore.close();
+ } catch (final RuntimeException ignoreAndSwallow) { /* ignore
*/ }
+ processorContext.uninitialize();
+ stores.remove(entry.getKey());
+
+ try {
+ Utils.delete(new File(baseDir + File.separator + "rocksdb"
+ File.separator + storeName));
+ } catch (final IOException fatalException) {
+ log.error("Failed to reinitialize store {}.", storeName,
fatalException);
+ throw new StreamsException(String.format("Failed to
reinitialize store %s.", storeName), fatalException);
+ }
+
+ try {
+ Utils.delete(new File(baseDir + File.separator +
storeName));
+ } catch (final IOException fatalException) {
+ log.error("Failed to reinitialize store {}.", storeName,
fatalException);
+ throw new StreamsException(String.format("Failed to
reinitialize store %s.", storeName), fatalException);
+ }
+
+ stateStore.init(processorContext, stateStore);
+ }
+ }
+ }
+
+ private Map<String, String> inverseOneToOneMap(final Map<String, String>
origin) {
+ final Map<String, String> reversedMap = new HashMap<>();
+ for (final Map.Entry<String, String> entry : origin.entrySet()) {
+ reversedMap.put(entry.getValue(), entry.getKey());
+ }
+ return reversedMap;
+ }
+
@Override
public Map<TopicPartition, Long> checkpointed() {
final Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 79bfd1d0f49..3c5efe8a1e7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -22,12 +22,13 @@
public class StateRestorer {
static final int NO_CHECKPOINT = -1;
- private final Long checkpoint;
private final long offsetLimit;
private final boolean persistent;
+ private final String storeName;
private final TopicPartition partition;
private final StateRestoreCallback stateRestoreCallback;
+ private long checkpointOffset;
private long restoredOffset;
private long startingOffset;
@@ -35,12 +36,14 @@
final StateRestoreCallback stateRestoreCallback,
final Long checkpoint,
final long offsetLimit,
- final boolean persistent) {
+ final boolean persistent,
+ final String storeName) {
this.partition = partition;
this.stateRestoreCallback = stateRestoreCallback;
- this.checkpoint = checkpoint;
+ this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT :
checkpoint;
this.offsetLimit = offsetLimit;
this.persistent = persistent;
+ this.storeName = storeName;
}
public TopicPartition partition() {
@@ -48,7 +51,15 @@ public TopicPartition partition() {
}
long checkpoint() {
- return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+ return checkpointOffset;
+ }
+
+ void setCheckpointOffset(final long checkpointOffset) {
+ this.checkpointOffset = checkpointOffset;
+ }
+
+ public String storeName() {
+ return storeName;
}
void restore(final byte[] key, final byte[] value) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34dcb759d8a..305bf10997d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -60,13 +60,16 @@ public StoreChangelogReader(final Consumer<byte[], byte[]>
consumer) {
@Override
public void register(final StateRestorer restorer) {
- stateRestorers.put(restorer.partition(), restorer);
+ if (!stateRestorers.containsKey(restorer.partition())) {
+ stateRestorers.put(restorer.partition(), restorer);
+ log.trace("Added restorer for changelog {}", restorer.partition());
+ }
needsInitializing.put(restorer.partition(), restorer);
}
- public Collection<TopicPartition> restore() {
+ public Collection<TopicPartition> restore(final Collection<StreamTask>
restoringTasks) {
if (!needsInitializing.isEmpty()) {
- initialize();
+ initialize(restoringTasks);
}
if (needsRestoring.isEmpty()) {
@@ -87,7 +90,7 @@ public void register(final StateRestorer restorer) {
return completed();
}
- private void initialize() {
+ private void initialize(final Collection<StreamTask> restoringTasks) {
if (!consumer.subscription().isEmpty()) {
throw new IllegalStateException("Restore consumer should not be
subscribed to any topics (" + consumer.subscription() + ")");
}
@@ -139,11 +142,12 @@ private void initialize() {
// set up restorer for those initializable
if (!initializable.isEmpty()) {
- startRestoration(initializable);
+ startRestoration(initializable, restoringTasks);
}
}
- private void startRestoration(final Map<TopicPartition, StateRestorer>
initialized) {
+ private void startRestoration(final Map<TopicPartition, StateRestorer>
initialized,
+ final Collection<StreamTask> restoringTasks)
{
log.debug("{} Start restoring state stores from changelog topics {}",
logPrefix, initialized.keySet());
final Set<TopicPartition> assignment = new
HashSet<>(consumer.assignment());
@@ -152,24 +156,48 @@ private void startRestoration(final Map<TopicPartition,
StateRestorer> initializ
final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
for (final StateRestorer restorer : initialized.values()) {
+ final TopicPartition restoringPartition = restorer.partition();
if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
- consumer.seek(restorer.partition(), restorer.checkpoint());
- logRestoreOffsets(restorer.partition(),
- restorer.checkpoint(),
- endOffsets.get(restorer.partition()));
-
restorer.setStartingOffset(consumer.position(restorer.partition()));
+ consumer.seek(restoringPartition, restorer.checkpoint());
+ logRestoreOffsets(
+ restoringPartition,
+ restorer.checkpoint(),
+ endOffsets.get(restoringPartition));
+
restorer.setStartingOffset(consumer.position(restoringPartition));
} else {
-
consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+
consumer.seekToBeginning(Collections.singletonList(restoringPartition));
needsPositionUpdate.add(restorer);
}
}
for (final StateRestorer restorer : needsPositionUpdate) {
- final long position = consumer.position(restorer.partition());
- logRestoreOffsets(restorer.partition(),
- position,
- endOffsets.get(restorer.partition()));
- restorer.setStartingOffset(position);
+ final TopicPartition restoringPartition = restorer.partition();
+
+ for (final StreamTask task : restoringTasks) {
+ if (task.changelogPartitions().contains(restoringPartition) ||
task.partitions().contains(restoringPartition)) {
+ if (task.eosEnabled) {
+ log.info("No checkpoint found for task {} state store
{} changelog {} with EOS turned on. " +
+ "Reinitializing the task and restore its state
from the beginning.", task.id, restorer.storeName(), restorer.partition());
+
+ needsInitializing.remove(restoringPartition);
+ initialized.put(restoringPartition, restorer);
+
restorer.setCheckpointOffset(consumer.position(restoringPartition));
+
+
task.reinitializeStateStoresForPartitions(restoringPartition);
+ } else {
+ log.info("Restoring task {}'s state store {} from
beginning of the changelog {} ", task.id, restorer.storeName(),
restorer.partition());
+
+ final long position =
consumer.position(restoringPartition);
+ logRestoreOffsets(
+ restoringPartition,
+ position,
+ endOffsets.get(restoringPartition));
+ restorer.setStartingOffset(position);
+ }
+ }
+ }
+
+
}
needsRestoring.putAll(initialized);
@@ -220,7 +248,7 @@ public void reset() {
}
private void restorePartition(final ConsumerRecords<byte[], byte[]>
allRecords,
- final TopicPartition topicPartition) {
+ final TopicPartition topicPartition) {
final StateRestorer restorer = stateRestorers.get(topicPartition);
final Long endOffset = endOffsets.get(topicPartition);
final long pos = processNext(allRecords.records(topicPartition),
restorer, endOffset);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 210b070e852..6b7f4f1934e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -532,7 +532,7 @@ private void tryTransitToRunning() {
active.initializeNewTasks();
standby.initializeNewTasks();
- final Collection<TopicPartition> restored =
storeChangelogReader.restore();
+ final Collection<TopicPartition> restored =
storeChangelogReader.restore(active.restoringTasks());
final Set<TopicPartition> resumed = active.updateRestored(restored);
if (!resumed.isEmpty()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 0c3b36aa84b..7f709507e35 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -392,7 +392,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState()
throws Exception {
// the app commits after each 10 records per partition, and thus will
have 2*5 uncommitted writes
// and store updates (ie, another 5 uncommitted writes to a changelog
topic per partition)
//
- // the failure gets inject after 20 committed and 30 uncommitted
records got received
+ // the failure gets inject after 20 committed and 10 uncommitted
records got received
// -> the failure only kills one thread
// after fail over, we should read 40 committed records and the state
stores should contain the correct sums
// per key (even if some records got processed twice)
@@ -402,7 +402,7 @@ public void shouldNotViolateEosIfOneTaskFailsWithState()
throws Exception {
streams.start();
final List<KeyValue<Long, Long>> committedDataBeforeFailure =
prepareData(0L, 10L, 0L, 1L);
- final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure =
prepareData(10L, 15L, 0L, 1L);
+ final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure =
prepareData(10L, 15L, 0L, 1L, 2L, 3L);
final List<KeyValue<Long, Long>> dataBeforeFailure = new
ArrayList<>();
dataBeforeFailure.addAll(committedDataBeforeFailure);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index 6968f3319ec..3a8ebdaf22f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -28,7 +28,7 @@
private static final long OFFSET_LIMIT = 50;
private final MockRestoreCallback callback = new MockRestoreCallback();
- private final StateRestorer restorer = new StateRestorer(new
TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true);
+ private final StateRestorer restorer = new StateRestorer(new
TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true, "store");
@Test
public void shouldCallRestoreOnRestoreCallback() throws Exception {
@@ -53,7 +53,7 @@ public void
shouldBeCompletedIfEndOffsetAndRecordOffsetAreZero() throws Exceptio
@Test
public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() throws
Exception {
- final StateRestorer restorer = new StateRestorer(new
TopicPartition("topic", 1), callback, null, 0, true);
+ final StateRestorer restorer = new StateRestorer(new
TopicPartition("topic", 1), callback, null, 0, true, "store");
assertTrue(restorer.hasCompleted(0, 10));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index a43f08344bf..24820f840b2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -59,8 +59,8 @@ public void shouldRequestTopicsAndHandleTimeoutException()
throws Exception {
};
final StoreChangelogReader changelogReader = new
StoreChangelogReader(consumer);
- changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true));
- changelogReader.restore();
+ changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true, "store"));
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertTrue(functionCalled.get());
}
@@ -68,7 +68,7 @@ public void shouldRequestTopicsAndHandleTimeoutException()
throws Exception {
public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws
Exception {
consumer.subscribe(Collections.singleton("sometopic"));
try {
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
fail("Should have thrown IllegalStateException");
} catch (final IllegalStateException e) {
// ok
@@ -79,9 +79,9 @@ public void
shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Except
public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull()
throws Exception {
final int messages = 10;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true));
+ changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true, "store"));
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(messages));
}
@@ -89,9 +89,9 @@ public void
shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exc
public void shouldRestoreMessagesFromCheckpoint() throws Exception {
final int messages = 10;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback,
5L, Long.MAX_VALUE, true));
+ changelogReader.register(new StateRestorer(topicPartition, callback,
5L, Long.MAX_VALUE, true, "store"));
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(5));
}
@@ -99,18 +99,18 @@ public void shouldRestoreMessagesFromCheckpoint() throws
Exception {
public void shouldClearAssignmentAtEndOfRestore() throws Exception {
final int messages = 1;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true));
+ changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true, "store"));
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(consumer.assignment(),
equalTo(Collections.<TopicPartition>emptySet()));
}
@Test
public void shouldRestoreToLimitWhenSupplied() throws Exception {
setupConsumer(10, topicPartition);
- final StateRestorer restorer = new StateRestorer(topicPartition,
callback, null, 3, true);
+ final StateRestorer restorer = new StateRestorer(topicPartition,
callback, null, 3, true, "store");
changelogReader.register(restorer);
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(3));
assertThat(restorer.restoredOffset(), equalTo(3L));
}
@@ -125,11 +125,11 @@ public void shouldRestoreMultipleStores() throws
Exception {
setupConsumer(5, one);
setupConsumer(3, two);
- changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true));
- changelogReader.register(new StateRestorer(one, callbackOne, null,
Long.MAX_VALUE, true));
- changelogReader.register(new StateRestorer(two, callbackTwo, null,
Long.MAX_VALUE, true));
+ changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true, "store"));
+ changelogReader.register(new StateRestorer(one, callbackOne, null,
Long.MAX_VALUE, true, "store"));
+ changelogReader.register(new StateRestorer(two, callbackTwo, null,
Long.MAX_VALUE, true, "store"));
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(10));
assertThat(callbackOne.restored.size(), equalTo(5));
@@ -138,11 +138,11 @@ public void shouldRestoreMultipleStores() throws
Exception {
@Test
public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws
Exception {
- final StateRestorer restorer = new StateRestorer(topicPartition,
callback, null, Long.MAX_VALUE, true);
+ final StateRestorer restorer = new StateRestorer(topicPartition,
callback, null, Long.MAX_VALUE, true, "store");
setupConsumer(0, topicPartition);
changelogReader.register(restorer);
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(0));
assertThat(restorer.restoredOffset(), equalTo(0L));
}
@@ -151,11 +151,11 @@ public void
shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception {
public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws
Exception {
final Long endOffset = 10L;
setupConsumer(endOffset, topicPartition);
- final StateRestorer restorer = new StateRestorer(topicPartition,
callback, endOffset, Long.MAX_VALUE, true);
+ final StateRestorer restorer = new StateRestorer(topicPartition,
callback, endOffset, Long.MAX_VALUE, true, "store");
changelogReader.register(restorer);
- changelogReader.restore();
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored.size(), equalTo(0));
assertThat(restorer.restoredOffset(), equalTo(endOffset));
}
@@ -163,8 +163,8 @@ public void
shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception
@Test
public void shouldReturnRestoredOffsetsForPersistentStores() throws
Exception {
setupConsumer(10, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true));
- changelogReader.restore();
+ changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true, "store"));
+ changelogReader.restore(Collections.<StreamTask>emptySet());
final Map<TopicPartition, Long> restoredOffsets =
changelogReader.restoredOffsets();
assertThat(restoredOffsets,
equalTo(Collections.singletonMap(topicPartition, 10L)));
}
@@ -172,8 +172,8 @@ public void
shouldReturnRestoredOffsetsForPersistentStores() throws Exception {
@Test
public void shouldNotReturnRestoredOffsetsForNonPersistentStore() throws
Exception {
setupConsumer(10, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, false));
- changelogReader.restore();
+ changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, false, "store"));
+ changelogReader.restore(Collections.<StreamTask>emptySet());
final Map<TopicPartition, Long> restoredOffsets =
changelogReader.restoredOffsets();
assertThat(restoredOffsets, equalTo(Collections.<TopicPartition,
Long>emptyMap()));
}
@@ -186,8 +186,8 @@ public void shouldIgnoreNullKeysWhenRestoring() throws
Exception {
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(),
topicPartition.partition(), 1, (byte[]) null, bytes));
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(),
topicPartition.partition(), 2, bytes, bytes));
consumer.assign(Collections.singletonList(topicPartition));
- changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, false));
- changelogReader.restore();
+ changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, false, "store"));
+ changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(callback.restored,
CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes),
KeyValue.pair(bytes, bytes))));
}
@@ -200,15 +200,15 @@ public void
shouldReturnCompletedPartitionsOnEachRestoreCall() {
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(),
topicPartition.partition(), i, bytes, bytes));
}
consumer.assign(Collections.singletonList(topicPartition));
- changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, false));
+ changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, false, "store"));
- final Collection<TopicPartition> completedFirstTime =
changelogReader.restore();
+ final Collection<TopicPartition> completedFirstTime =
changelogReader.restore(Collections.<StreamTask>emptySet());
assertTrue(completedFirstTime.isEmpty());
for (int i = 5; i < 10; i++) {
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(),
topicPartition.partition(), i, bytes, bytes));
}
final Collection<TopicPartition> expected =
Collections.singleton(topicPartition);
- assertThat(changelogReader.restore(), equalTo(expected));
+
assertThat(changelogReader.restore(Collections.<StreamTask>emptySet()),
equalTo(expected));
}
private void setupConsumer(final long messages, final TopicPartition
topicPartition) {
@@ -237,8 +237,8 @@ private void assignPartition(final long messages, final
TopicPartition topicPart
public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
final Collection<TopicPartition> expected =
Collections.singleton(topicPartition);
setupConsumer(0, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true));
- final Collection<TopicPartition> restored = changelogReader.restore();
+ changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, true, "store"));
+ final Collection<TopicPartition> restored =
changelogReader.restore(Collections.<StreamTask>emptySet());
assertThat(restored, equalTo(expected));
}
@@ -248,9 +248,9 @@ public void
shouldRestorePartitionsRegisteredPostInitialization() {
setupConsumer(1, topicPartition);
consumer.updateEndOffsets(Collections.singletonMap(topicPartition,
10L));
- changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, false));
+ changelogReader.register(new StateRestorer(topicPartition, callback,
null, Long.MAX_VALUE, false, "store"));
- assertTrue(changelogReader.restore().isEmpty());
+
assertTrue(changelogReader.restore(Collections.<StreamTask>emptySet()).isEmpty());
addRecords(9, topicPartition, 1);
@@ -259,12 +259,12 @@ public void
shouldRestorePartitionsRegisteredPostInitialization() {
consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization,
0L));
consumer.updateEndOffsets(Collections.singletonMap(postInitialization,
3L));
- changelogReader.register(new StateRestorer(postInitialization,
callbackTwo, null, Long.MAX_VALUE, false));
+ changelogReader.register(new StateRestorer(postInitialization,
callbackTwo, null, Long.MAX_VALUE, false, "store"));
final Collection<TopicPartition> expected =
Utils.mkSet(topicPartition, postInitialization);
consumer.assign(expected);
- assertThat(changelogReader.restore(), equalTo(expected));
+
assertThat(changelogReader.restore(Collections.<StreamTask>emptySet()),
equalTo(expected));
assertThat(callback.restored.size(), equalTo(10));
assertThat(callbackTwo.restored.size(), equalTo(3));
}
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
index 54fd858393c..93ce801be1c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
@@ -19,6 +19,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.StateRestorer;
+import org.apache.kafka.streams.processor.internals.StreamTask;
import java.util.Collection;
import java.util.Collections;
@@ -35,7 +36,7 @@ public void register(final StateRestorer restorer) {
}
@Override
- public Collection<TopicPartition> restore() {
+ public Collection<TopicPartition> restore(final Collection<StreamTask>
restoringTasks) {
return registered;
}
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index cb56fa1b1a6..fed8752ba40 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -142,6 +142,9 @@ public ThreadCache getCache() {
@Override
public void initialized() {}
+ @Override
+ public void uninitialize() {}
+
@Override
public File stateDir() {
if (stateDir == null) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> State-store can desynchronise with changelog
> --------------------------------------------
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.1
> Reporter: Jon Bates
> Assignee: Guozhang Wang
> Priority: Critical
> Labels: bugs
> Fix For: 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
> * A record, N is read into a Kafka topology
> * the state store is updated
> * the topology crashes
> h3. *Expected behaviour:*
> # Node is restarted
> # Offset was never updated, so record N is reprocessed
> # State-store is reset to position N-1
> # Record is reprocessed
> h3. *Actual Behaviour*
> # Node is restarted
> # Record N is reprocessed (good)
> # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)