vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r453121093
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -1177,6 +1180,109 @@ public void
shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi
assertEquals(1, thread.activeTasks().size());
}
+ @Test
+ public void shouldReinitializeRevivedTasksInAnyState() {
+ final StreamThread thread = createStreamThread(CLIENT_ID, new
StreamsConfig(configProps(false)), false);
+
+ final String storeName = "store";
+ final String storeChangelog = "stream-thread-test-store-changelog";
+ final TopicPartition storeChangelogTopicPartition = new
TopicPartition(storeChangelog, 1);
+
+ internalTopologyBuilder.addSource(null, "name", null, null, null,
topic1);
+ final AtomicBoolean shouldThrow = new AtomicBoolean(false);
+ final AtomicBoolean processed = new AtomicBoolean(false);
+ internalTopologyBuilder.addProcessor("proc", new
ProcessorSupplier<Object, Object>() {
+ @Override
+ public Processor<Object, Object> get() {
+ return new Processor<Object, Object>() {
+ private ProcessorContext context;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void process(final Object key, final Object value) {
+ if (shouldThrow.get()) {
+ throw new
TaskCorruptedException(singletonMap(task1, new
HashSet<TopicPartition>(singleton(storeChangelogTopicPartition))));
+ } else {
+ processed.set(true);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ }
+ }, "name");
+ internalTopologyBuilder.addStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(storeName),
+ Serdes.String(),
+ Serdes.String()
+ ),
+ "proc"
+ );
+ internalTopologyBuilder.addSink("out", "output", null, null, null,
"proc");
Review comment:
Uh, no. It was just copied with the rest of the test. Sorry about that.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]