lucasbru commented on code in PR #20818:
URL: https://github.com/apache/kafka/pull/20818#discussion_r2490841720
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2872,88 +2872,87 @@ public void shouldAddNewActiveTasks() {
@Test
public void shouldNotCompleteRestorationIfTasksCannotInitialize() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.CREATED)
+ .build();
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
- mkEntry(taskId00, taskId00Partitions),
- mkEntry(taskId01, taskId01Partitions)
+ mkEntry(taskId00, taskId00Partitions)
);
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true, stateManager) {
- @Override
- public void initializeIfNeeded() {
- throw new LockException("can't lock");
- }
- };
- final Task task01 = new StateMachineTask(taskId01, taskId01Partitions,
true, stateManager) {
- @Override
- public void initializeIfNeeded() {
- throw new TimeoutException("timed out");
- }
- };
-
- when(activeTaskCreator.createTasks(any(),
eq(assignment))).thenReturn(asList(task00, task01));
+ when(activeTaskCreator.createTasks(any(), eq(assignment)))
+ .thenReturn(singletonList(task00));
taskManager.handleAssignment(assignment, emptyMap());
- assertThat(task00.state(), is(Task.State.CREATED));
- assertThat(task01.state(), is(Task.State.CREATED));
+ verify(tasks).addPendingTasksToInit(singletonList(task00));
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(false));
+ when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00));
+ final LockException lockException = new LockException("can't lock");
+ doThrow(lockException).when(task00).initializeIfNeeded();
+ when(tasks.hasPendingTasksToInit()).thenReturn(true);
- assertThat(task00.state(), is(Task.State.CREATED));
- assertThat(task01.state(), is(Task.State.CREATED));
- assertThat(
- taskManager.activeTaskMap(),
- Matchers.equalTo(mkMap(mkEntry(taskId00, task00),
mkEntry(taskId01, task01)))
+ final boolean restorationComplete =
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ assertFalse(restorationComplete);
+ verify(task00).initializeIfNeeded();
+ verify(tasks, times(2)).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00))
);
- assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(changeLogReader).enforceRestoreActive();
+ verify(stateUpdater, never()).add(task00);
verifyNoInteractions(consumer);
}
@Test
public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() {
- final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
- mkEntry(taskId00, taskId00Partitions)
- );
- final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
true, stateManager) {
- @Override
- public void completeRestoration(final
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
- throw new TimeoutException("timeout!");
- }
- };
-
- when(activeTaskCreator.createTasks(any(),
eq(assignment))).thenReturn(singletonList(task00));
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING)
+ .build();
- taskManager.handleAssignment(assignment, emptyMap());
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- assertThat(task00.state(), is(Task.State.CREATED));
+ when(stateUpdater.restoresActiveTasks()).thenReturn(true);
+
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of(task00));
+ final TimeoutException timeoutException = new
TimeoutException("timeout!");
+ doThrow(timeoutException).when(task00).completeRestoration(any());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(false));
+ final boolean restorationComplete =
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- assertThat(task00.state(), is(Task.State.RESTORING));
- assertThat(
- taskManager.activeTaskMap(),
- Matchers.equalTo(mkMap(mkEntry(taskId00, task00)))
- );
- assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(changeLogReader).enforceRestoreActive();
- verifyNoInteractions(consumer);
Review Comment:
why did you remove `verifyNoInteractions(consumer)`?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2872,88 +2872,87 @@ public void shouldAddNewActiveTasks() {
@Test
public void shouldNotCompleteRestorationIfTasksCannotInitialize() {
Review Comment:
Wait. We should actually also handle TimeoutException in the production
code. I suppose you removed TimeoutException because state updater doesn't
handle it.
We should fix that, because `initializeIfNeeded` may throw a timeout
exception.
If we get a TimeoutException inside addTasksToStateUpdater, we should
```
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
tasks.addPendingTasksToInit(Collections.singleton(task));
updateOrCreateBackoffRecord(task.id(), nowMs);
```
and otherwise, we should
```
task.clearTaskTimeout();
```
after the initialization.
And then we should bring back this behavior in the test
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3041,218 +3040,272 @@ public void
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
@Test
public void shouldCommitAllNeededTasksOnHandleRevocation() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
+ // revoked task that needs commit
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
final Map<TopicPartition, OffsetAndMetadata> offsets00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
- task00.setCommittableOffsetsAndMetadata(offsets00);
- task00.setCommitNeeded();
+ when(task00.commitNeeded()).thenReturn(true);
+ when(task00.prepareCommit(true)).thenReturn(offsets00);
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager);
+ // non revoked task that needs commit
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
final Map<TopicPartition, OffsetAndMetadata> offsets01 =
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
- task01.setCommittableOffsetsAndMetadata(offsets01);
- task01.setCommitNeeded();
+ when(task01.commitNeeded()).thenReturn(true);
+ when(task01.prepareCommit(true)).thenReturn(offsets01);
- final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true, stateManager);
- final Map<TopicPartition, OffsetAndMetadata> offsets02 =
singletonMap(t1p2, new OffsetAndMetadata(2L, null));
- task02.setCommittableOffsetsAndMetadata(offsets02);
+ // non revoked task that does NOT need commit
+ final StreamTask task02 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .withInputPartitions(taskId02Partitions)
+ .inState(State.RUNNING)
+ .build();
+ when(task02.commitNeeded()).thenReturn(false);
- final StateMachineTask task10 = new StateMachineTask(taskId10,
taskId10Partitions, false, stateManager);
+ // standby task (not be affected by revocation)
+ final StandbyTask task03 = standbyTask(taskId03,
taskId03ChangelogPartitions)
+ .withInputPartitions(taskId03Partitions)
+ .inState(State.RUNNING)
+ .build();
final Map<TopicPartition, OffsetAndMetadata> expectedCommittedOffsets
= new HashMap<>();
expectedCommittedOffsets.putAll(offsets00);
expectedCommittedOffsets.putAll(offsets01);
- final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
- mkEntry(taskId00, taskId00Partitions),
- mkEntry(taskId01, taskId01Partitions),
- mkEntry(taskId02, taskId02Partitions)
- );
-
- final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
- mkEntry(taskId10, taskId10Partitions)
- );
- when(consumer.assignment()).thenReturn(assignment);
-
- when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
- .thenReturn(asList(task00, task01, task02));
- when(standbyTaskCreator.createTasks(assignmentStandby))
- .thenReturn(singletonList(task10));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task03));
- taskManager.handleAssignment(assignmentActive, assignmentStandby);
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(task01.state(), is(Task.State.RUNNING));
- assertThat(task02.state(), is(Task.State.RUNNING));
- assertThat(task10.state(), is(Task.State.RUNNING));
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRevocation(taskId00Partitions);
- assertThat(task00.commitNeeded, is(false));
- assertThat(task00.commitPrepared, is(true));
- assertThat(task01.commitNeeded, is(false));
- assertThat(task01.commitPrepared, is(true));
- assertThat(task02.commitPrepared, is(false));
- assertThat(task10.commitPrepared, is(false));
+ // both tasks needing commit had prepareCommit called
+ verify(task00).prepareCommit(true);
+ verify(task01).prepareCommit(true);
+ verify(task02, never()).prepareCommit(anyBoolean());
+ verify(task03, never()).prepareCommit(anyBoolean());
verify(consumer).commitSync(expectedCommittedOffsets);
+
+ // revoked task suspended
+ verify(task00).suspend();
+ verify(task00).postCommit(true);
+
+ // non-revoked task with commit was also post-committed (but not
suspended)
+ verify(task01).postCommit(false);
+ verify(task01, never()).suspend();
+
+ // task02 and task03 should not be affected
+ verify(task02, never()).postCommit(anyBoolean());
+ verify(task02, never()).suspend();
+ verify(task03, never()).postCommit(anyBoolean());
+ verify(task03, never()).suspend();
}
@Test
public void shouldNotCommitIfNoRevokedTasksNeedCommitting() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
-
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager);
- task01.setCommitNeeded();
+ // task00 being revoked, no commit needed
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RUNNING)
+ .build();
- final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true, stateManager);
+ // task01 NOT being revoked, commit needed
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
- final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
- mkEntry(taskId00, taskId00Partitions),
- mkEntry(taskId01, taskId01Partitions),
- mkEntry(taskId02, taskId02Partitions)
- );
+ // task02 NOT being revoked, no commit needed
+ final StreamTask task02 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .withInputPartitions(taskId02Partitions)
+ .inState(State.RUNNING)
+ .build();
- when(consumer.assignment()).thenReturn(assignment);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
- when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
- .thenReturn(asList(task00, task01, task02));
+ when(task00.commitNeeded()).thenReturn(false);
+ when(task01.commitNeeded()).thenReturn(true); // only task01 needs
commit
+ when(task02.commitNeeded()).thenReturn(false);
- taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
- assertThat(task01.state(), is(Task.State.RUNNING));
- assertThat(task02.state(), is(Task.State.RUNNING));
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleRevocation(taskId00Partitions);
- assertThat(task00.commitPrepared, is(false));
- assertThat(task01.commitPrepared, is(false));
- assertThat(task02.commitPrepared, is(false));
+ verify(task00, never()).prepareCommit(anyBoolean());
+ verify(task01, never()).prepareCommit(anyBoolean());
+ verify(task02, never()).prepareCommit(anyBoolean());
+
+ verify(task00).suspend();
+ verify(task01, never()).suspend();
+ verify(task02, never()).suspend();
}
@Test
public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() {
Review Comment:
yes. We could consider parametrizing on ProcessingMode here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]