This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0fc029c6a47 KAFKA-14299: Fix pause and resume with state updater (#13025) 0fc029c6a47 is described below commit 0fc029c6a47a7a930a2b078569de1173cdb547a4 Author: Lucas Brutschy <lucas...@users.noreply.github.com> AuthorDate: Tue Feb 21 19:17:09 2023 +0100 KAFKA-14299: Fix pause and resume with state updater (#13025) * Fixes required to make the PauseResumeIntegrationTest pass. It was not enabled and it does not pass for the state updater code path. * Make sure no progress is made on paused topologies. The state updater restored one round of polls from the restore consumer before realizing that a newly added task was already in paused state when being added. * Wake up state updater when tasks are being resumed. If a task is resumed, it may be necessary to wake up the state updater from waiting on the tasksAndActions condition. * Make sure that allTasks methods also return the tasks that are currently being restored. * Enable PauseResumeIntegrationTest and upgrade it to JUnit5. Reviewers: Bruno Cadonna <cado...@apache.org>, Guozhang Wang <wangg...@gmail.com> --- .../org/apache/kafka/streams/KafkaStreams.java | 1 + .../processor/internals/DefaultStateUpdater.java | 65 ++++++++++---- .../streams/processor/internals/ReadOnlyTask.java | 2 +- .../streams/processor/internals/StateUpdater.java | 5 ++ .../streams/processor/internals/StreamThread.java | 7 +- .../streams/processor/internals/TaskManager.java | 35 +++++++- .../KafkaStreamsNamedTopologyWrapper.java | 2 + .../integration/PauseResumeIntegrationTest.java | 100 ++++++++++++--------- .../internals/DefaultStateUpdaterTest.java | 1 + .../processor/internals/ReadOnlyTaskTest.java | 1 + .../processor/internals/StreamThreadTest.java | 4 +- .../processor/internals/TaskManagerTest.java | 32 +++++++ 12 files changed, 189 insertions(+), 66 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 03b778ab6ec..c05e4c6c1ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1751,6 +1751,7 @@ public class KafkaStreams implements AutoCloseable { } else { topologyMetadata.resumeTopology(UNNAMED_TOPOLOGY); } + threads.forEach(StreamThread::signalResume); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index d96593c5011..ae6618c304f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -116,6 +116,7 @@ public class DefaultStateUpdater implements StateUpdater { private void runOnce() throws InterruptedException { performActionsOnTasks(); + resumeTasks(); restoreTasks(); checkAllUpdatingTaskStates(time.milliseconds()); waitIfAllChangelogsCompletelyRead(); @@ -140,6 +141,16 @@ public class DefaultStateUpdater implements StateUpdater { } } + private void resumeTasks() { + if (isTopologyResumed.compareAndSet(true, false)) { + for (final Task task : pausedTasks.values()) { + if (!topologyMetadata.isPaused(task.id().topologyName())) { + resumeTask(task); + } + } + } + } + private void restoreTasks() { try { changelogReader.restore(updatingTasks); @@ -229,7 +240,7 @@ public class DefaultStateUpdater implements StateUpdater { if (isRunning.get() && changelogReader.allChangelogsCompleted()) { tasksAndActionsLock.lock(); try { - while (tasksAndActions.isEmpty()) { + while (tasksAndActions.isEmpty() && !isTopologyResumed.get()) { tasksAndActionsCondition.await(); } } finally { @@ -258,21 +269,39 @@ public class DefaultStateUpdater implements StateUpdater { } private void addTask(final Task task) { + final TaskId taskId = task.id(); + + Task existingTask = pausedTasks.get(taskId); + if (existingTask != null) { + throw new IllegalStateException( + (existingTask.isActive() ? "Active" : "Standby") + " task " + taskId + " already exist in paused tasks, " + + "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + + BUG_ERROR_MESSAGE); + } + existingTask = updatingTasks.get(taskId); + if (existingTask != null) { + throw new IllegalStateException( + (existingTask.isActive() ? "Active" : "Standby") + " task " + taskId + " already exist in updating tasks, " + + "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + + BUG_ERROR_MESSAGE); + } + if (isStateless(task)) { addToRestoredTasks((StreamTask) task); - log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater"); + log.info("Stateless active task " + taskId + " was added to the restored tasks of the state updater"); + } else if (topologyMetadata.isPaused(taskId.topologyName())) { + pausedTasks.put(taskId, task); + changelogReader.register(task.changelogPartitions(), task.stateManager()); + log.debug((task.isActive() ? "Active" : "Standby") + + " task " + taskId + " was directly added to the paused tasks."); } else { - final Task existingTask = updatingTasks.putIfAbsent(task.id(), task); - if (existingTask != null) { - throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " + - "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE); - } + updatingTasks.put(taskId, task); changelogReader.register(task.changelogPartitions(), task.stateManager()); if (task.isActive()) { - log.info("Stateful active task " + task.id() + " was added to the state updater"); + log.info("Stateful active task " + taskId + " was added to the state updater"); changelogReader.enforceRestoreActive(); } else { - log.info("Standby task " + task.id() + " was added to the state updater"); + log.info("Standby task " + taskId + " was added to the state updater"); if (updatingTasks.size() == 1) { changelogReader.transitToUpdateStandby(); } @@ -388,12 +417,6 @@ public class DefaultStateUpdater implements StateUpdater { } } - for (final Task task : pausedTasks.values()) { - if (!topologyMetadata.isPaused(task.id().topologyName())) { - resumeTask(task); - } - } - lastCommitMs = now; } } @@ -411,6 +434,7 @@ public class DefaultStateUpdater implements StateUpdater { private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition(); private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>(); private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>(); + private final AtomicBoolean isTopologyResumed = new AtomicBoolean(false); private final long commitIntervalMs; private long lastCommitMs; @@ -506,6 +530,17 @@ public class DefaultStateUpdater implements StateUpdater { } } + @Override + public void signalResume() { + tasksAndActionsLock.lock(); + try { + isTopologyResumed.set(true); + tasksAndActionsCondition.signalAll(); + } finally { + tasksAndActionsLock.unlock(); + } + } + @Override public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) { final long timeoutMs = timeout.toMillis(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java index e26b6ca29fd..ee3989cf62e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java @@ -200,7 +200,7 @@ public class ReadOnlyTask implements Task { @Override public Map<TopicPartition, Long> changelogOffsets() { - throw new UnsupportedOperationException("This task is read-only"); + return task.changelogOffsets(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java index 10ac51874d2..3153472bf92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java @@ -94,6 +94,11 @@ public interface StateUpdater { */ void remove(final TaskId taskId); + /** + * Wakes up the state updater if it is currently dormant, to check if a paused task should be resumed. + */ + void signalResume(); + /** * Drains the restored active tasks from the state updater. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3fb1e2f3cb9..02bd74a027d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1096,6 +1096,11 @@ public class StreamThread extends Thread { return isAlive(); } + // Call method when a topology is resumed + public void signalResume() { + taskManager.signalResume(); + } + /** * Try to commit all active tasks owned by this thread. * @@ -1113,7 +1118,7 @@ public class StreamThread extends Thread { } committed = taskManager.commit( - taskManager.allTasks() + taskManager.allOwnedTasks() .values() .stream() .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 2a5b3d2bacf..cf83cb27eba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1115,6 +1115,12 @@ public class TaskManager { } } + public void signalResume() { + if (stateUpdater != null) { + stateUpdater.signalResume(); + } + } + /** * Compute the offset total summed across all stores in a task. Includes offset sum for any tasks we own the * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}. @@ -1532,15 +1538,38 @@ public class TaskManager { } Map<TaskId, Task> allTasks() { + // not bothering with an unmodifiable map, since the tasks themselves are mutable, but + // if any outside code modifies the map or the tasks, it would be a severe transgression. + if (stateUpdater != null) { + final Map<TaskId, Task> ret = stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x)); + ret.putAll(tasks.allTasksPerId()); + return ret; + } else { + return tasks.allTasksPerId(); + } + } + + /** + * Returns tasks owned by the stream thread. With state updater disabled, these are all tasks. With + * state updater enabled, this does not return any tasks currently owned by the state updater. + * @return + */ + Map<TaskId, Task> allOwnedTasks() { // not bothering with an unmodifiable map, since the tasks themselves are mutable, but // if any outside code modifies the map or the tasks, it would be a severe transgression. return tasks.allTasksPerId(); } Set<Task> readOnlyAllTasks() { - // need to make sure the returned set is unmodifiable as it could be accessed - // by other thread than the StreamThread owning this task manager; - return Collections.unmodifiableSet(tasks.allTasks()); + // not bothering with an unmodifiable map, since the tasks themselves are mutable, but + // if any outside code modifies the map or the tasks, it would be a severe transgression. + if (stateUpdater != null) { + final HashSet<Task> ret = new HashSet<>(stateUpdater.getTasks()); + ret.addAll(tasks.allTasks()); + return Collections.unmodifiableSet(ret); + } else { + return Collections.unmodifiableSet(tasks.allTasks()); + } } Map<TaskId, Task> notPausedTasks() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index 4704d1d4df7..dd70ad0abbf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -40,6 +40,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.TopologyMetadata; import org.slf4j.Logger; @@ -276,6 +277,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams { */ public void resumeNamedTopology(final String topologyName) { topologyMetadata.resumeTopology(topologyName); + threads.forEach(StreamThread::signalResume); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java index e09ba997b2e..5e1331bf675 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java @@ -35,17 +35,16 @@ import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNa import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.hamcrest.CoreMatchers; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import java.time.Duration; import java.util.ArrayList; @@ -53,6 +52,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Properties; +import java.util.stream.Stream; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -69,7 +69,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -@Category({IntegrationTest.class}) +@Tag("integration") public class PauseResumeIntegrationTest { private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); @@ -101,10 +101,14 @@ public class PauseResumeIntegrationTest { private KafkaStreams kafkaStreams, kafkaStreams2; private KafkaStreamsNamedTopologyWrapper streamsNamedTopologyWrapper; - @Rule - public final TestName testName = new TestName(); - @BeforeClass + private static Stream<Boolean> parameters() { + return Stream.of( + Boolean.TRUE, + Boolean.FALSE); + } + + @BeforeAll public static void startCluster() throws Exception { CLUSTER.start(); producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), @@ -113,18 +117,18 @@ public class PauseResumeIntegrationTest { StringDeserializer.class, LongDeserializer.class); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } - @Before - public void createTopics() throws InterruptedException { + @BeforeEach + public void createTopics(final TestInfo testInfo) throws InterruptedException { cleanStateBeforeTest(CLUSTER, 1, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2); - appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testName); + appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testInfo); } - private Properties props() { + private Properties props(final boolean stateUpdaterEnabled) { final Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -136,10 +140,11 @@ public class PauseResumeIntegrationTest { properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); + properties.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); return properties; } - @After + @AfterEach public void shutdown() throws InterruptedException { for (final KafkaStreams streams : Arrays.asList(kafkaStreams, kafkaStreams2, streamsNamedTopologyWrapper)) { if (streams != null) { @@ -152,9 +157,10 @@ public class PauseResumeIntegrationTest { IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time); } - @Test - public void shouldPauseAndResumeKafkaStreams() throws Exception { - kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); + @ParameterizedTest + @MethodSource("parameters") + public void shouldPauseAndResumeKafkaStreams(final boolean stateUpdaterEnabled) throws Exception { + kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled); kafkaStreams.start(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); @@ -176,9 +182,10 @@ public class PauseResumeIntegrationTest { assertTopicSize(OUTPUT_STREAM_1, 10); } - @Test - public void shouldAllowForTopologiesToStartPaused() throws Exception { - kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); + @ParameterizedTest + @MethodSource("parameters") + public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception { + kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled); kafkaStreams.pause(); kafkaStreams.start(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); @@ -195,9 +202,10 @@ public class PauseResumeIntegrationTest { assertTopicSize(OUTPUT_STREAM_1, 5); } - @Test - public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws Exception { - streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props()); + @ParameterizedTest + @MethodSource("parameters") + public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception { + streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled)); final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1(); final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2(); @@ -229,9 +237,10 @@ public class PauseResumeIntegrationTest { awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2); } - @Test - public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() throws Exception { - streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props()); + @ParameterizedTest + @MethodSource("parameters") + public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception { + streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled)); final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1(); final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2(); @@ -264,9 +273,10 @@ public class PauseResumeIntegrationTest { assertTopicSize(OUTPUT_STREAM_2, 5); } - @Test - public void shouldAllowForNamedTopologiesToStartPaused() throws Exception { - streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props()); + @ParameterizedTest + @MethodSource("parameters") + public void shouldAllowForNamedTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception { + streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled)); final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1(); final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2(); @@ -306,18 +316,19 @@ public class PauseResumeIntegrationTest { assertTopicSize(OUTPUT_STREAM_2, 5); } - @Test - public void pauseResumeShouldWorkAcrossInstances() throws Exception { + @ParameterizedTest + @MethodSource("parameters") + public void pauseResumeShouldWorkAcrossInstances(final boolean stateUpdaterEnabled) throws Exception { produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); - kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1); + kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled); kafkaStreams.pause(); kafkaStreams.start(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); assertTrue(kafkaStreams.isPaused()); - kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2); + kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2, stateUpdaterEnabled); kafkaStreams2.pause(); kafkaStreams2.start(); waitForApplicationState(singletonList(kafkaStreams2), State.RUNNING, STARTUP_TIMEOUT); @@ -337,11 +348,12 @@ public class PauseResumeIntegrationTest { awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA); } - @Test - public void pausedTopologyShouldNotRestoreStateStores() throws Exception { - final Properties properties1 = props(); + @ParameterizedTest + @MethodSource("parameters") + public void pausedTopologyShouldNotRestoreStateStores(final boolean stateUpdaterEnabled) throws Exception { + final Properties properties1 = props(stateUpdaterEnabled); properties1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); - final Properties properties2 = props(); + final Properties properties2 = props(stateUpdaterEnabled); properties2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); @@ -385,8 +397,8 @@ public class PauseResumeIntegrationTest { assertEquals(stateStoreLag1, stateStoreLag2); } - private KafkaStreams buildKafkaStreams(final String outputTopic) { - return buildKafkaStreams(outputTopic, props()); + private KafkaStreams buildKafkaStreams(final String outputTopic, final boolean stateUpdaterEnabled) { + return buildKafkaStreams(outputTopic, props(stateUpdaterEnabled)); } private KafkaStreams buildKafkaStreams(final String outputTopic, final Properties properties) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 3e7449f9411..b0c0ba7c156 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -1047,6 +1047,7 @@ class DefaultStateUpdaterTest { verifyUpdatingTasks(); when(topologyMetadata.isPaused(null)).thenReturn(false); + stateUpdater.signalResume(); verifyPausedTasks(); verifyUpdatingTasks(task); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java index cd5da873981..9b780dee6bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java @@ -41,6 +41,7 @@ class ReadOnlyTaskTest { add("changelogPartitions"); add("commitRequested"); add("isActive"); + add("changelogOffsets"); add("state"); add("id"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index dc9259a1f91..a8768ddd3eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -2734,7 +2734,7 @@ public class StreamThreadTest { expect(task3.state()).andReturn(Task.State.CREATED).anyTimes(); expect(task3.id()).andReturn(taskId3).anyTimes(); - expect(taskManager.allTasks()).andReturn(mkMap( + expect(taskManager.allOwnedTasks()).andReturn(mkMap( mkEntry(taskId1, task1), mkEntry(taskId2, task2), mkEntry(taskId3, task3) @@ -3084,7 +3084,7 @@ public class StreamThreadTest { expect(runningTask.state()).andStubReturn(Task.State.RUNNING); expect(runningTask.id()).andStubReturn(taskId); - expect(taskManager.allTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask)); + expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask)); expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1); return taskManager; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 00d44b045b1..f43de372388 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -497,6 +497,38 @@ public class TaskManagerTest { Mockito.verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions()); } + @Test + public void shouldReturnStateUpdaterTasksInAllTasks() { + final StreamTask activeTask = statefulTask(taskId03, taskId03ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId03Partitions).build(); + final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId02Partitions).build(); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + + when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask)); + when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask))); + assertEquals(taskManager.allTasks(), mkMap(mkEntry(taskId03, activeTask), mkEntry(taskId02, standbyTask))); + } + + @Test + public void shouldNotReturnStateUpdaterTasksInOwnedTasks() { + final StreamTask activeTask = statefulTask(taskId03, taskId03ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId03Partitions).build(); + final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId02Partitions).build(); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + + when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask)); + when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask))); + assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, activeTask))); + } + @Test public void shouldCreateActiveTaskDuringAssignment() { final StreamTask activeTaskToBeCreated = statefulTask(taskId03, taskId03ChangelogPartitions)