shashankhs11 commented on code in PR #20944:
URL: https://github.com/apache/kafka/pull/20944#discussion_r2548625110
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4360,55 +4326,85 @@ public void shouldHaveRemainingPartitionsUncleared() {
@Test
public void
shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
- final StateMachineTask migratedTask01 = new StateMachineTask(taskId01,
taskId01Partitions, false, stateManager) {
- @Override
- public void suspend() {
- super.suspend();
- throw new TaskMigratedException("t1 close exception", new
RuntimeException());
- }
- };
+ final StandbyTask migratedTask01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions)
+ .build();
+ final StandbyTask migratedTask02 = standbyTask(taskId02,
taskId02ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId02Partitions)
+ .build();
- final StateMachineTask migratedTask02 = new StateMachineTask(taskId02,
taskId02Partitions, false, stateManager) {
- @Override
- public void suspend() {
- super.suspend();
- throw new TaskMigratedException("t2 close exception", new
RuntimeException());
- }
- };
- taskManager.addTask(migratedTask01);
- taskManager.addTask(migratedTask02);
+ doThrow(new TaskMigratedException("t1 close exception", new
RuntimeException()))
+ .when(migratedTask01).suspend();
+ doThrow(new TaskMigratedException("t2 close exception", new
RuntimeException()))
+ .when(migratedTask02).suspend();
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+ when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01,
migratedTask02));
+
+ // mock futures for removing tasks from StateUpdater
+ final CompletableFuture<StateUpdater.RemovedTaskResult> future01 = new
CompletableFuture<>();
+ when(stateUpdater.remove(taskId01)).thenReturn(future01);
+ future01.complete(new StateUpdater.RemovedTaskResult(migratedTask01));
+
+ final CompletableFuture<StateUpdater.RemovedTaskResult> future02 = new
CompletableFuture<>();
+ when(stateUpdater.remove(taskId02)).thenReturn(future02);
+ future02.complete(new StateUpdater.RemovedTaskResult(migratedTask02));
final TaskMigratedException thrown = assertThrows(
TaskMigratedException.class,
() -> taskManager.handleAssignment(emptyMap(), emptyMap())
);
- // The task map orders tasks based on topic group id and partition, so
here
- // t1 should always be the first.
+
+ // iteration order here is non-deterministic due to hashset.
+ // The last exception encountered wins, so we accept either
+ // t1 or t2's exception message.
assertThat(
thrown.getMessage(),
- equalTo("t2 close exception; it means all tasks belonging to this
thread should be migrated.")
+ anyOf(
+ equalTo("t1 close exception; it means all tasks belonging to
this thread should be migrated."),
+ equalTo("t2 close exception; it means all tasks belonging to
this thread should be migrated.")
+ )
Review Comment:
The iteration order here with the new stateupdater code is
non-deterministic, whereas earlier it was deterministic due to the use of
`TreeSet`. I am not 100% sure if this was intentional and I think it may have
been an oversight while refactoring?
Should we update to use `TreeSet` instead? Please advise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]