This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 309e0f986e KAFKA-10199: Add PAUSE in state updater (#12386) 309e0f986e is described below commit 309e0f986e97be966c797f7729eb1e94ef5400a9 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Mon Jul 18 16:42:48 2022 -0700 KAFKA-10199: Add PAUSE in state updater (#12386) * Add pause action to task-updater. * When removing a task, also check in the paused tasks in addition to removed tasks. * Also I realized we do not check if tasks with the same id are added, so I add that check in this PR as well. Reviewers: Bruno Cadonna <cado...@apache.org> --- .../processor/internals/DefaultStateUpdater.java | 62 ++++- .../streams/processor/internals/StateUpdater.java | 13 + .../processor/internals/StoreChangelogReader.java | 2 +- .../streams/processor/internals/TaskAndAction.java | 10 +- .../internals/DefaultStateUpdaterTest.java | 282 ++++++++++++++++++++- .../processor/internals/TaskAndActionTest.java | 20 ++ 6 files changed, 379 insertions(+), 10 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 22fd48a4ab..08959bee00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -83,7 +83,7 @@ public class DefaultStateUpdater implements StateUpdater { } public boolean onlyStandbyTasksLeft() { - return !updatingTasks.isEmpty() && updatingTasks.values().stream().allMatch(t -> !t.isActive()); + return !updatingTasks.isEmpty() && updatingTasks.values().stream().noneMatch(Task::isActive); } @Override @@ -125,6 +125,9 @@ public class DefaultStateUpdater implements StateUpdater { case REMOVE: removeTask(taskAndAction.getTaskId()); break; + case PAUSE: + pauseTask(taskAndAction.getTaskId()); + break; } } } finally { @@ -243,7 +246,12 @@ public class DefaultStateUpdater implements StateUpdater { addToRestoredTasks((StreamTask) task); log.debug("Stateless active task " + task.id() + " was added to the restored tasks of the state updater"); } else { - updatingTasks.put(task.id(), task); + final Task existingTask = updatingTasks.putIfAbsent(task.id(), task); + if (existingTask != null) { + throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " + + "should not try to add another " + (task.isActive() ? "Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE); + } + if (task.isActive()) { log.debug("Stateful active task " + task.id() + " was added to the updating tasks of the state updater"); changelogReader.enforceRestoreActive(); @@ -257,8 +265,9 @@ public class DefaultStateUpdater implements StateUpdater { } private void removeTask(final TaskId taskId) { - final Task task = updatingTasks.get(taskId); - if (task != null) { + final Task task; + if (updatingTasks.containsKey(taskId)) { + task = updatingTasks.get(taskId); task.maybeCheckpoint(true); final Collection<TopicPartition> changelogPartitions = task.changelogPartitions(); changelogReader.unregister(changelogPartitions); @@ -267,8 +276,31 @@ public class DefaultStateUpdater implements StateUpdater { transitToUpdateStandbysIfOnlyStandbysLeft(); log.debug((task.isActive() ? "Active" : "Standby") + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks."); + } else if (pausedTasks.containsKey(taskId)) { + task = pausedTasks.get(taskId); + final Collection<TopicPartition> changelogPartitions = task.changelogPartitions(); + changelogReader.unregister(changelogPartitions); + removedTasks.add(task); + pausedTasks.remove(taskId); + log.debug((task.isActive() ? "Active" : "Standby") + + " task " + task.id() + " was removed from the paused tasks and added to the removed tasks."); + } else { + log.debug("Task " + taskId + " was not removed since it is not updating or paused."); + } + } + + private void pauseTask(final TaskId taskId) { + final Task task = updatingTasks.get(taskId); + if (task != null) { + // do not need to unregister changelog partitions for paused tasks + task.maybeCheckpoint(true); + pausedTasks.put(taskId, task); + updatingTasks.remove(taskId); + transitToUpdateStandbysIfOnlyStandbysLeft(); + log.debug((task.isActive() ? "Active" : "Standby") + + " task " + task.id() + " was paused from the updating tasks and added to the paused tasks."); } else { - log.debug("Task " + taskId + " was not removed since it is not updating."); + log.debug("Task " + taskId + " was not paused since it is not updating."); } } @@ -333,6 +365,7 @@ public class DefaultStateUpdater implements StateUpdater { private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition(); private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>(); private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>(); + private final Map<TaskId, Task> pausedTasks = new ConcurrentHashMap<>(); private final long commitIntervalMs; private long lastCommitMs; @@ -407,6 +440,17 @@ public class DefaultStateUpdater implements StateUpdater { } } + @Override + public void pause(final TaskId taskId) { + tasksAndActionsLock.lock(); + try { + tasksAndActions.add(TaskAndAction.createPauseTask(taskId)); + tasksAndActionsCondition.signalAll(); + } finally { + tasksAndActionsLock.unlock(); + } + } + @Override public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) { final long timeoutMs = timeout.toMillis(); @@ -478,6 +522,10 @@ public class DefaultStateUpdater implements StateUpdater { return Collections.unmodifiableSet(new HashSet<>(removedTasks)); } + public Set<Task> getPausedTasks() { + return Collections.unmodifiableSet(new HashSet<>(pausedTasks.values())); + } + @Override public Set<Task> getTasks() { return executeWithQueuesLocked(() -> getStreamOfTasks().collect(Collectors.toSet())); @@ -520,6 +568,8 @@ public class DefaultStateUpdater implements StateUpdater { restoredActiveTasks.stream(), Stream.concat( exceptionsAndFailedTasks.stream().flatMap(exceptionAndTasks -> exceptionAndTasks.getTasks().stream()), - removedTasks.stream())))); + Stream.concat( + getPausedTasks().stream(), + removedTasks.stream()))))); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java index 1b229bc818..516e47436b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java @@ -94,6 +94,19 @@ public interface StateUpdater { */ void remove(final TaskId taskId); + /** + * Pause a task (active or standby) from restoring in the state updater. + * + * This method does not block until the task is paused. + * + * Restored tasks, removed tasks and failed tasks are not paused so this action would be an no-op for them. + * Stateless tasks will never be paused since they are immediately added to the + * restored active tasks. + * + * @param taskId ID of the task to remove + */ + void pause(final TaskId taskId); + /** * Drains the restored active tasks from the state updater. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 5240534ce7..f8926e70bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -481,7 +481,7 @@ public class StoreChangelogReader implements ChangelogReader { } private void pauseResumePartitions(final Map<TaskId, Task> tasks, - final Set<TopicPartition> restoringChangelogs) { + final Set<TopicPartition> restoringChangelogs) { if (state == ChangelogReaderState.ACTIVE_RESTORING) { updatePartitionsByType(tasks, restoringChangelogs, TaskType.ACTIVE); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java index 4c4316a864..585374c339 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java @@ -24,7 +24,8 @@ public class TaskAndAction { enum Action { ADD, - REMOVE + REMOVE, + PAUSE } private final Task task; @@ -47,6 +48,11 @@ public class TaskAndAction { return new TaskAndAction(null, taskId, Action.REMOVE); } + public static TaskAndAction createPauseTask(final TaskId taskId) { + Objects.requireNonNull(taskId, "Task ID of task to pause is null!"); + return new TaskAndAction(null, taskId, Action.PAUSE); + } + public Task getTask() { if (action != Action.ADD) { throw new IllegalStateException("Action type " + action + " cannot have a task!"); @@ -55,7 +61,7 @@ public class TaskAndAction { } public TaskId getTaskId() { - if (action != Action.REMOVE) { + if (action != Action.REMOVE && action != Action.PAUSE) { throw new IllegalStateException("Action type " + action + " cannot have a task ID!"); } return taskId; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 465ae4a1c5..14b8237fe7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -83,7 +83,6 @@ class DefaultStateUpdaterTest { private final Time time = new MockTime(1L); private final StreamsConfig config = new StreamsConfig(configProps()); private final ChangelogReader changelogReader = mock(ChangelogReader.class); - private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { }; private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time); @AfterEach @@ -152,6 +151,42 @@ class DefaultStateUpdaterTest { } } + @Test + public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldThrowIfAddingTasksWithSameId(task1, task2); + } + + @Test + public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception { + final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + shouldThrowIfAddingTasksWithSameId(task1, task2); + } + + @Test + public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + shouldThrowIfAddingTasksWithSameId(task1, task2); + } + + @Test + public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + shouldThrowIfAddingTasksWithSameId(task2, task1); + } + + private void shouldThrowIfAddingTasksWithSameId(final Task task1, final Task task2) throws Exception { + stateUpdater.start(); + stateUpdater.add(task1); + stateUpdater.add(task2); + + verifyFailedTasks(IllegalStateException.class, task1); + } + @Test public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception { final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0); @@ -177,6 +212,7 @@ class DefaultStateUpdaterTest { verifyUpdatingTasks(); verifyExceptionsAndFailedTasks(); verifyRemovedTasks(); + verifyPausedTasks(); } @Test @@ -200,6 +236,7 @@ class DefaultStateUpdaterTest { verifyUpdatingTasks(); verifyExceptionsAndFailedTasks(); verifyRemovedTasks(); + verifyPausedTasks(); verify(changelogReader, times(1)).enforceRestoreActive(); verify(changelogReader, atLeast(3)).restore(anyMap()); verify(changelogReader, never()).transitToUpdateStandby(); @@ -231,6 +268,7 @@ class DefaultStateUpdaterTest { verifyUpdatingTasks(); verifyExceptionsAndFailedTasks(); verifyRemovedTasks(); + verifyPausedTasks(); verify(changelogReader, times(3)).enforceRestoreActive(); verify(changelogReader, atLeast(4)).restore(anyMap()); verify(changelogReader, never()).transitToUpdateStandby(); @@ -286,6 +324,7 @@ class DefaultStateUpdaterTest { verifyRestoredActiveTasks(); verifyExceptionsAndFailedTasks(); verifyRemovedTasks(); + verifyPausedTasks(); verify(changelogReader, times(1)).transitToUpdateStandby(); verify(changelogReader, timeout(VERIFICATION_TIMEOUT).atLeast(1)).restore(anyMap()); verify(changelogReader, never()).enforceRestoreActive(); @@ -314,6 +353,7 @@ class DefaultStateUpdaterTest { verifyUpdatingStandbyTasks(task4, task3); verifyExceptionsAndFailedTasks(); verifyRemovedTasks(); + verifyPausedTasks(); verify(changelogReader, atLeast(3)).restore(anyMap()); final InOrder orderVerifier = inOrder(changelogReader, task1, task2); orderVerifier.verify(changelogReader, times(2)).enforceRestoreActive(); @@ -424,10 +464,37 @@ class DefaultStateUpdaterTest { verifyCheckpointTasks(true, task); verifyRestoredActiveTasks(); verifyUpdatingTasks(); + verifyPausedTasks(); verifyExceptionsAndFailedTasks(); verify(changelogReader).unregister(task.changelogPartitions()); } + @Test + public void shouldRemovePausedTask() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0)); + + stateUpdater.start(); + stateUpdater.add(task1); + stateUpdater.add(task2); + + stateUpdater.pause(task1.id()); + stateUpdater.pause(task2.id()); + + verifyPausedTasks(task1, task2); + verifyRemovedTasks(); + verifyUpdatingTasks(); + + stateUpdater.remove(task1.id()); + stateUpdater.remove(task2.id()); + + verifyRemovedTasks(task1, task2); + verifyPausedTasks(); + verifyCheckpointTasks(true, task1, task2); + verifyUpdatingTasks(); + verifyExceptionsAndFailedTasks(); + } + @Test public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception { final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); @@ -455,6 +522,7 @@ class DefaultStateUpdaterTest { verifyRemovedTasks(controlTask); verifyRestoredActiveTasks(task); verifyUpdatingTasks(); + verifyPausedTasks(); verifyExceptionsAndFailedTasks(); } @@ -493,11 +561,168 @@ class DefaultStateUpdaterTest { stateUpdater.remove(controlTask.id()); verifyRemovedTasks(controlTask); + verifyPausedTasks(); verifyExceptionsAndFailedTasks(expectedExceptionAndTasks); verifyUpdatingTasks(); verifyRestoredActiveTasks(); } + @Test + public void shouldPauseActiveStatefulTask() throws Exception { + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldPauseStatefulTask(task); + verify(changelogReader, never()).transitToUpdateStandby(); + } + + @Test + public void shouldPauseStandbyTask() throws Exception { + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldPauseStatefulTask(task); + verify(changelogReader, times(1)).transitToUpdateStandby(); + } + + @Test + public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0)); + + stateUpdater.start(); + stateUpdater.add(task1); + stateUpdater.add(task2); + + stateUpdater.pause(task1.id()); + + verifyPausedTasks(task1); + verifyCheckpointTasks(true, task1); + verifyRestoredActiveTasks(); + verifyRemovedTasks(); + verifyUpdatingTasks(task2); + verifyExceptionsAndFailedTasks(); + verify(changelogReader, times(1)).enforceRestoreActive(); + verify(changelogReader, times(1)).transitToUpdateStandby(); + } + + private void shouldPauseStatefulTask(final Task task) throws Exception { + stateUpdater.start(); + stateUpdater.add(task); + + stateUpdater.pause(task.id()); + + verifyPausedTasks(task); + verifyCheckpointTasks(true, task); + verifyRestoredActiveTasks(); + verifyRemovedTasks(); + verifyUpdatingTasks(); + verifyExceptionsAndFailedTasks(); + } + + @Test + public void shouldIgnorePausingNotExistTasks() throws Exception { + stateUpdater.start(); + stateUpdater.pause(TASK_0_0); + + verifyPausedTasks(); + verifyRestoredActiveTasks(); + verifyRemovedTasks(); + verifyUpdatingTasks(); + verifyExceptionsAndFailedTasks(); + } + + @Test + public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception { + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); + when(changelogReader.allChangelogsCompleted()).thenReturn(false); + stateUpdater.start(); + stateUpdater.add(task); + stateUpdater.add(controlTask); + verifyRestoredActiveTasks(task); + + stateUpdater.pause(task.id()); + stateUpdater.pause(controlTask.id()); + + verifyPausedTasks(controlTask); + verifyRestoredActiveTasks(task); + verifyUpdatingTasks(); + verifyExceptionsAndFailedTasks(); + } + + @Test + public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception { + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldNotPauseTaskInFailedTasks(task); + } + + @Test + public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception { + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldNotPauseTaskInFailedTasks(task); + } + + private void shouldNotPauseTaskInFailedTasks(final Task task) throws Exception { + final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamsException streamsException = new StreamsException("Something happened", task.id()); + when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()).thenReturn(false); + final Map<TaskId, Task> updatingTasks = mkMap( + mkEntry(task.id(), task), + mkEntry(controlTask.id(), controlTask) + ); + doThrow(streamsException) + .doNothing() + .when(changelogReader).restore(updatingTasks); + stateUpdater.start(); + + stateUpdater.add(task); + stateUpdater.add(controlTask); + final ExceptionAndTasks expectedExceptionAndTasks = new ExceptionAndTasks(mkSet(task), streamsException); + verifyExceptionsAndFailedTasks(expectedExceptionAndTasks); + + stateUpdater.pause(task.id()); + stateUpdater.pause(controlTask.id()); + + verifyPausedTasks(controlTask); + verifyExceptionsAndFailedTasks(expectedExceptionAndTasks); + verifyUpdatingTasks(); + verifyRestoredActiveTasks(); + } + + @Test + public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception { + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldNotPauseTaskInRemovedTasks(task); + } + + @Test + public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception { + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldNotPauseTaskInRemovedTasks(task); + } + + private void shouldNotPauseTaskInRemovedTasks(final Task task) throws Exception { + when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()).thenReturn(false); + stateUpdater.start(); + stateUpdater.add(task); + + stateUpdater.remove(task.id()); + + verifyRemovedTasks(task); + verifyCheckpointTasks(true, task); + verifyRestoredActiveTasks(); + verifyUpdatingTasks(); + verifyPausedTasks(); + verifyExceptionsAndFailedTasks(); + verify(changelogReader).unregister(task.changelogPartitions()); + + stateUpdater.pause(task.id()); + + verifyRemovedTasks(task); + verifyUpdatingTasks(); + verifyPausedTasks(); + } + @Test public void shouldDrainRemovedTasks() throws Exception { assertTrue(stateUpdater.drainRemovedTasks().isEmpty()); @@ -543,6 +768,7 @@ class DefaultStateUpdaterTest { final ExceptionAndTasks expectedExceptionAndTasks = new ExceptionAndTasks(mkSet(task1, task2), streamsException); verifyExceptionsAndFailedTasks(expectedExceptionAndTasks); verifyRemovedTasks(); + verifyPausedTasks(); verifyUpdatingTasks(); verifyRestoredActiveTasks(); } @@ -582,6 +808,7 @@ class DefaultStateUpdaterTest { verifyUpdatingTasks(task2); verifyRestoredActiveTasks(); verifyRemovedTasks(); + verifyPausedTasks(); } @Test @@ -630,6 +857,7 @@ class DefaultStateUpdaterTest { verifyUpdatingTasks(); verifyRestoredActiveTasks(); verifyRemovedTasks(); + verifyPausedTasks(); } @Test @@ -882,6 +1110,22 @@ class DefaultStateUpdaterTest { verifyGetTasks(mkSet(), mkSet()); } + @Test + public void shouldGetTasksFromPausedTasks() throws Exception { + final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_0)); + stateUpdater.start(); + stateUpdater.add(activeTask); + stateUpdater.add(standbyTask); + + stateUpdater.pause(activeTask.id()); + stateUpdater.pause(standbyTask.id()); + + verifyPausedTasks(activeTask, standbyTask); + + verifyGetTasks(mkSet(activeTask), mkSet(standbyTask)); + } + private void verifyGetTasks(final Set<StreamTask> expectedActiveTasks, final Set<StandbyTask> expectedStandbyTasks) { final Set<Task> tasks = stateUpdater.getTasks(); @@ -983,6 +1227,24 @@ class DefaultStateUpdaterTest { } } + private void verifyPausedTasks(final Task... tasks) throws Exception { + if (tasks.length == 0) { + assertTrue(stateUpdater.getPausedTasks().isEmpty()); + } else { + final Set<Task> expectedPausedTasks = mkSet(tasks); + final Set<Task> pausedTasks = new HashSet<>(); + waitForCondition( + () -> { + pausedTasks.addAll(stateUpdater.getPausedTasks()); + return pausedTasks.containsAll(expectedPausedTasks) + && pausedTasks.size() == expectedPausedTasks.size(); + }, + VERIFICATION_TIMEOUT, + "Did not get all paused task within the given timeout!" + ); + } + } + private void verifyDrainingRemovedTasks(final Task... tasks) throws Exception { final Set<Task> expectedRemovedTasks = mkSet(tasks); final Set<Task> removedTasks = new HashSet<>(); @@ -1012,6 +1274,24 @@ class DefaultStateUpdaterTest { ); } + private void verifyFailedTasks(final Class<? extends RuntimeException> clazz, final Task... tasks) throws Exception { + final List<Task> expectedFailedTasks = Arrays.asList(tasks); + final Set<Task> failedTasks = new HashSet<>(); + waitForCondition( + () -> { + for (final ExceptionAndTasks exceptionsAndTasks : stateUpdater.getExceptionsAndFailedTasks()) { + if (clazz.isInstance(exceptionsAndTasks.exception())) { + failedTasks.addAll(exceptionsAndTasks.getTasks()); + } + } + return failedTasks.containsAll(expectedFailedTasks) + && failedTasks.size() == expectedFailedTasks.size(); + }, + VERIFICATION_TIMEOUT, + "Did not get all exceptions and failed tasks within the given timeout!" + ); + } + private void verifyDrainingExceptionsAndFailedTasks(final ExceptionAndTasks... exceptionsAndTasks) throws Exception { final List<ExceptionAndTasks> expectedExceptionAndTasks = Arrays.asList(exceptionsAndTasks); final List<ExceptionAndTasks> failedTasks = new ArrayList<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java index 39b927ee09..f994ef75c9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java @@ -20,8 +20,10 @@ import org.apache.kafka.streams.processor.TaskId; import org.junit.jupiter.api.Test; import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.ADD; +import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.PAUSE; import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.REMOVE; import static org.apache.kafka.streams.processor.internals.TaskAndAction.createAddTask; +import static org.apache.kafka.streams.processor.internals.TaskAndAction.createPauseTask; import static org.apache.kafka.streams.processor.internals.TaskAndAction.createRemoveTask; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -54,6 +56,18 @@ class TaskAndActionTest { assertEquals("Action type REMOVE cannot have a task!", exception.getMessage()); } + @Test + public void shouldCreatePauseTaskAction() { + final TaskId taskId = new TaskId(0, 0); + + final TaskAndAction pauseTask = createPauseTask(taskId); + + assertEquals(PAUSE, pauseTask.getAction()); + assertEquals(taskId, pauseTask.getTaskId()); + final Exception exception = assertThrows(IllegalStateException.class, pauseTask::getTask); + assertEquals("Action type PAUSE cannot have a task!", exception.getMessage()); + } + @Test public void shouldThrowIfAddTaskActionIsCreatedWithNullTask() { final Exception exception = assertThrows(NullPointerException.class, () -> createAddTask(null)); @@ -65,4 +79,10 @@ class TaskAndActionTest { final Exception exception = assertThrows(NullPointerException.class, () -> createRemoveTask(null)); assertTrue(exception.getMessage().contains("Task ID of task to remove is null!")); } + + @Test + public void shouldThrowIfPauseTaskActionIsCreatedWithNullTaskId() { + final Exception exception = assertThrows(NullPointerException.class, () -> createPauseTask(null)); + assertTrue(exception.getMessage().contains("Task ID of task to pause is null!")); + } } \ No newline at end of file