lucasbru commented on code in PR #15117:
URL: https://github.com/apache/kafka/pull/15117#discussion_r1441719248


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -950,6 +963,12 @@ private void handleRestoredTasksFromStateUpdater(final 
long now,
                 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
             } else if (tasks.removePendingTaskToAddBack(task.id())) {
                 stateUpdater.add(task);
+            } else if ((inputPartitions = 
tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != 
null) {
+                if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
+                    task.revive();
+                    task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));

Review Comment:
   Done



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1164,9 +1243,12 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
         final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, 
taskId03ChangelogPartitions)
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
+        final StreamTask taskToCloseReviveAndUpdateInputPartitions = 
statefulTask(taskId04, taskId04ChangelogPartitions)
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId04Partitions).build();

Review Comment:
   Done



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1127,6 +1175,36 @@ public void 
shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void 
shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)

Review Comment:
   Done



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -916,6 +923,12 @@ private void handleRemovedTasksFromStateUpdater() {
                 stateUpdater.add(task);
             } else if (tasks.removePendingTaskToCloseClean(task.id())) {
                 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
+            } else if ((inputPartitions = 
tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != 
null) {
+                if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
+                    task.revive();
+                    task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+                    stateUpdater.add(task);

Review Comment:
   Good point! I was supposed to call `addTaskToStateUpdater` here instead.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1127,6 +1175,36 @@ public void 
shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
         Mockito.verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void 
shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        when(stateUpdater.hasRemovedTasks()).thenReturn(true);
+        when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, 
task01));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.removePendingTaskToRecycle(any())).thenReturn(null);
+        
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task00.id())).thenReturn(taskId02Partitions);
+        
when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task01.id())).thenReturn(taskId03Partitions);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to