shashankhs11 commented on code in PR #20818:
URL: https://github.com/apache/kafka/pull/20818#discussion_r2492233742


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3041,218 +3040,272 @@ public void 
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
 
     @Test
     public void shouldCommitAllNeededTasksOnHandleRevocation() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
+        // revoked task that needs commit
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
         final Map<TopicPartition, OffsetAndMetadata> offsets00 = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
-        task00.setCommittableOffsetsAndMetadata(offsets00);
-        task00.setCommitNeeded();
+        when(task00.commitNeeded()).thenReturn(true);
+        when(task00.prepareCommit(true)).thenReturn(offsets00);
 
-        final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, true, stateManager);
+        // non revoked task that needs commit
+        final StreamTask task01 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING)
+            .build();
         final Map<TopicPartition, OffsetAndMetadata> offsets01 = 
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
-        task01.setCommittableOffsetsAndMetadata(offsets01);
-        task01.setCommitNeeded();
+        when(task01.commitNeeded()).thenReturn(true);
+        when(task01.prepareCommit(true)).thenReturn(offsets01);
 
-        final StateMachineTask task02 = new StateMachineTask(taskId02, 
taskId02Partitions, true, stateManager);
-        final Map<TopicPartition, OffsetAndMetadata> offsets02 = 
singletonMap(t1p2, new OffsetAndMetadata(2L, null));
-        task02.setCommittableOffsetsAndMetadata(offsets02);
+        // non revoked task that does NOT need commit
+        final StreamTask task02 = statefulTask(taskId02, 
taskId02ChangelogPartitions)
+            .withInputPartitions(taskId02Partitions)
+            .inState(State.RUNNING)
+            .build();
+        when(task02.commitNeeded()).thenReturn(false);
 
-        final StateMachineTask task10 = new StateMachineTask(taskId10, 
taskId10Partitions, false, stateManager);
+        // standby task (not be affected by revocation)
+        final StandbyTask task03 = standbyTask(taskId03, 
taskId03ChangelogPartitions)
+            .withInputPartitions(taskId03Partitions)
+            .inState(State.RUNNING)
+            .build();
 
         final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets 
= new HashMap<>();
         expectedCommittedOffsets.putAll(offsets00);
         expectedCommittedOffsets.putAll(offsets01);
 
-        final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
-            mkEntry(taskId00, taskId00Partitions),
-            mkEntry(taskId01, taskId01Partitions),
-            mkEntry(taskId02, taskId02Partitions)
-        );
-
-        final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
-            mkEntry(taskId10, taskId10Partitions)
-        );
-        when(consumer.assignment()).thenReturn(assignment);
-
-        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
-            .thenReturn(asList(task00, task01, task02));
-        when(standbyTaskCreator.createTasks(assignmentStandby))
-            .thenReturn(singletonList(task10));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, 
task03));
 
-        taskManager.handleAssignment(assignmentActive, assignmentStandby);
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
-        assertThat(task00.state(), is(Task.State.RUNNING));
-        assertThat(task01.state(), is(Task.State.RUNNING));
-        assertThat(task02.state(), is(Task.State.RUNNING));
-        assertThat(task10.state(), is(Task.State.RUNNING));
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
 
-        assertThat(task00.commitNeeded, is(false));
-        assertThat(task00.commitPrepared, is(true));
-        assertThat(task01.commitNeeded, is(false));
-        assertThat(task01.commitPrepared, is(true));
-        assertThat(task02.commitPrepared, is(false));
-        assertThat(task10.commitPrepared, is(false));
+        // both tasks needing commit had prepareCommit called
+        verify(task00).prepareCommit(true);
+        verify(task01).prepareCommit(true);
+        verify(task02, never()).prepareCommit(anyBoolean());
+        verify(task03, never()).prepareCommit(anyBoolean());
 
         verify(consumer).commitSync(expectedCommittedOffsets);
+
+        // revoked task suspended
+        verify(task00).suspend();
+        verify(task00).postCommit(true);
+
+        // non-revoked task with commit was also post-committed (but not 
suspended)
+        verify(task01).postCommit(false);
+        verify(task01, never()).suspend();
+
+        // task02 and task03 should not be affected
+        verify(task02, never()).postCommit(anyBoolean());
+        verify(task02, never()).suspend();
+        verify(task03, never()).postCommit(anyBoolean());
+        verify(task03, never()).suspend();
     }
 
     @Test
     public void shouldNotCommitIfNoRevokedTasksNeedCommitting() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
-
-        final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, true, stateManager);
-        task01.setCommitNeeded();
+        // task00 being revoked, no commit needed
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .withInputPartitions(taskId00Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        final StateMachineTask task02 = new StateMachineTask(taskId02, 
taskId02Partitions, true, stateManager);
+        // task01 NOT being revoked, commit needed
+        final StreamTask task01 = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
-            mkEntry(taskId00, taskId00Partitions),
-            mkEntry(taskId01, taskId01Partitions),
-            mkEntry(taskId02, taskId02Partitions)
-        );
+        // task02 NOT being revoked, no commit needed
+        final StreamTask task02 = statefulTask(taskId02, 
taskId02ChangelogPartitions)
+            .withInputPartitions(taskId02Partitions)
+            .inState(State.RUNNING)
+            .build();
 
-        when(consumer.assignment()).thenReturn(assignment);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
 
-        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
-            .thenReturn(asList(task00, task01, task02));
+        when(task00.commitNeeded()).thenReturn(false);
+        when(task01.commitNeeded()).thenReturn(true); // only task01 needs 
commit
+        when(task02.commitNeeded()).thenReturn(false);
 
-        taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
-        assertThat(task00.state(), is(Task.State.RUNNING));
-        assertThat(task01.state(), is(Task.State.RUNNING));
-        assertThat(task02.state(), is(Task.State.RUNNING));
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         taskManager.handleRevocation(taskId00Partitions);
 
-        assertThat(task00.commitPrepared, is(false));
-        assertThat(task01.commitPrepared, is(false));
-        assertThat(task02.commitPrepared, is(false));
+        verify(task00, never()).prepareCommit(anyBoolean());
+        verify(task01, never()).prepareCommit(anyBoolean());
+        verify(task02, never()).prepareCommit(anyBoolean());
+
+        verify(task00).suspend();
+        verify(task01, never()).suspend();
+        verify(task02, never()).suspend();
     }
 
     @Test
     public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() {

Review Comment:
   done in ff6b3b0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to