This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b47c4d8598 KAFKA-10199: Remove tasks from state updater on revocation 
(#12520)
b47c4d8598 is described below

commit b47c4d859805068de6a8fe8de3bda5e7a21132e2
Author: Bruno Cadonna <cado...@apache.org>
AuthorDate: Wed Aug 17 20:13:34 2022 +0200

    KAFKA-10199: Remove tasks from state updater on revocation (#12520)
    
    Removes tasks from the state updater when the input partitions of the tasks 
are revoked during a rebalance.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../streams/processor/internals/TaskManager.java   | 16 +++++
 .../processor/internals/TaskManagerTest.java       | 81 ++++++++++++++++++++++
 2 files changed, 97 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 4bba28a3f3..bab05a5184 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -757,6 +757,8 @@ public class TaskManager {
             }
         }
 
+        removeRevokedTasksFromStateUpdater(remainingRevokedPartitions);
+
         if (!remainingRevokedPartitions.isEmpty()) {
             log.debug("The following revoked partitions {} are missing from 
the current task partitions. It could "
                           + "potentially be due to race condition of consumer 
detecting the heartbeat failure, or the tasks " +
@@ -842,6 +844,20 @@ public class TaskManager {
         }
     }
 
+    private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> 
remainingRevokedPartitions) {
+        if (stateUpdater != null) {
+            for (final Task restoringTask : stateUpdater.getTasks()) {
+                if (restoringTask.isActive()) {
+                    if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
+                        tasks.addPendingTaskToClose(restoringTask.id());
+                        stateUpdater.remove(restoringTask.id());
+                        
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
+                    }
+                }
+            }
+        }
+    }
+
     private void prepareCommitAndAddOffsetsToMap(final Set<Task> 
tasksToPrepare,
                                                  final Map<Task, 
Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
         for (final Task task : tasksToPrepare) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 133541bfac..ff52ad5ae9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -751,6 +751,87 @@ public class TaskManagerTest {
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
     }
 
+    @Test
+    public void 
shouldRemoveStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
+        final StreamTask task = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final TaskManager taskManager = setupForRevocation(mkSet(task), 
mkSet(task));
+
+        taskManager.handleRevocation(taskId00Partitions);
+
+        Mockito.verify(stateUpdater).remove(task.id());
+
+        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+        Mockito.verify(task).closeClean();
+    }
+
+    public void 
shouldRemoveMultipleStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
+        final StreamTask task1 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final StreamTask task2 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId01Partitions).build();
+        final TaskManager taskManager = setupForRevocation(mkSet(task1, 
task2), mkSet(task1, task2));
+
+        taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, 
taskId01Partitions));
+
+        Mockito.verify(stateUpdater).remove(task1.id());
+        Mockito.verify(stateUpdater).remove(task2.id());
+
+        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+        Mockito.verify(task1).closeClean();
+        Mockito.verify(task2).closeClean();
+    }
+
+    @Test
+    public void 
shouldNotRemoveStatefulTaskWithoutRevokedInputPartitionsFromStateUpdaterOnRevocation()
 {
+        final StreamTask task = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final TaskManager taskManager = setupForRevocation(mkSet(task), 
Collections.emptySet());
+
+        taskManager.handleRevocation(taskId01Partitions);
+
+        Mockito.verify(stateUpdater, never()).remove(task.id());
+
+        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+        Mockito.verify(task, never()).closeClean();
+    }
+
+    @Test
+    public void shouldNotRemoveStandbyTaskFromStateUpdaterOnRevocation() {
+        final StandbyTask task = standbyTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        final TaskManager taskManager = setupForRevocation(mkSet(task), 
Collections.emptySet());
+
+        taskManager.handleRevocation(taskId00Partitions);
+
+        Mockito.verify(stateUpdater, never()).remove(task.id());
+
+        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
+
+        Mockito.verify(task, never()).closeClean();
+    }
+
+    private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
+                                           final Set<Task> removedTasks) {
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
+        when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater);
+        when(stateUpdater.drainRemovedTasks()).thenReturn(removedTasks);
+        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
+        consumer.resume(anyObject());
+        expectLastCall().anyTimes();
+        replay(consumer);
+
+        return taskManager;
+    }
+
     @Test
     public void 
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true);

Reply via email to