Copilot commented on code in PR #20889:
URL: https://github.com/apache/kafka/pull/20889#discussion_r2534874432
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4203,107 +4175,85 @@ public void shouldNotFailOnTimeoutException() {
final AtomicReference<TimeoutException> timeoutException = new
AtomicReference<>();
timeoutException.set(new TimeoutException("Skip me!"));
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
- task00.transitionTo(State.RESTORING);
- task00.transitionTo(State.RUNNING);
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager) {
- @Override
- public boolean process(final long wallClockTime) {
- final TimeoutException exception = timeoutException.get();
- if (exception != null) {
- throw exception;
- }
- return true;
- }
- };
- task01.transitionTo(State.RESTORING);
- task01.transitionTo(State.RUNNING);
- final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true, stateManager);
- task02.transitionTo(State.RESTORING);
- task02.transitionTo(State.RUNNING);
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions)
+ .build();
+ // throws TimeoutException on first call, then processes 2 records
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions)
+ .build();
+ final StreamTask task02 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId02Partitions)
+ .build();
- taskManager.addTask(task00);
- taskManager.addTask(task01);
- taskManager.addTask(task02);
+ when(task00.process(anyLong()))
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false);
- task00.addRecords(
- t1p0,
- Arrays.asList(
- getConsumerRecord(t1p0, 0L),
- getConsumerRecord(t1p0, 1L)
- )
- );
- task01.addRecords(
- t1p1,
- Arrays.asList(
- getConsumerRecord(t1p1, 0L),
- getConsumerRecord(t1p1, 1L)
- )
- );
- task02.addRecords(
- t1p2,
- Arrays.asList(
- getConsumerRecord(t1p2, 0L),
- getConsumerRecord(t1p2, 1L)
- )
- );
+ when(task01.process(anyLong()))
+ .thenThrow(timeoutException.get()) // throws TimeoutException
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false);
Review Comment:
The mock behavior is evaluated at setup time, so `timeoutException.get()`
will always return the TimeoutException. Setting `timeoutException.set(null)`
on line 4218 won't change the mock's behavior. The second call to
`taskManager.process(1, time)` will still throw the TimeoutException instead of
processing records.
Consider using
`thenThrow(...).thenReturn(true).thenReturn(true).thenReturn(false)` without
the AtomicReference, or reconfigure the mock after clearing the timeout.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4203,107 +4175,85 @@ public void shouldNotFailOnTimeoutException() {
final AtomicReference<TimeoutException> timeoutException = new
AtomicReference<>();
timeoutException.set(new TimeoutException("Skip me!"));
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
- task00.transitionTo(State.RESTORING);
- task00.transitionTo(State.RUNNING);
- final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager) {
- @Override
- public boolean process(final long wallClockTime) {
- final TimeoutException exception = timeoutException.get();
- if (exception != null) {
- throw exception;
- }
- return true;
- }
- };
- task01.transitionTo(State.RESTORING);
- task01.transitionTo(State.RUNNING);
- final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true, stateManager);
- task02.transitionTo(State.RESTORING);
- task02.transitionTo(State.RUNNING);
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId00Partitions)
+ .build();
+ // throws TimeoutException on first call, then processes 2 records
+ final StreamTask task01 = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions)
+ .build();
+ final StreamTask task02 = statefulTask(taskId02,
taskId02ChangelogPartitions)
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId02Partitions)
+ .build();
- taskManager.addTask(task00);
- taskManager.addTask(task01);
- taskManager.addTask(task02);
+ when(task00.process(anyLong()))
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false);
- task00.addRecords(
- t1p0,
- Arrays.asList(
- getConsumerRecord(t1p0, 0L),
- getConsumerRecord(t1p0, 1L)
- )
- );
- task01.addRecords(
- t1p1,
- Arrays.asList(
- getConsumerRecord(t1p1, 0L),
- getConsumerRecord(t1p1, 1L)
- )
- );
- task02.addRecords(
- t1p2,
- Arrays.asList(
- getConsumerRecord(t1p2, 0L),
- getConsumerRecord(t1p2, 1L)
- )
- );
+ when(task01.process(anyLong()))
+ .thenThrow(timeoutException.get()) // throws TimeoutException
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false);
+
+ when(task02.process(anyLong()))
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false);
+
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+
+ final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
// should only process 2 records, because task01 throws
TimeoutException
assertThat(taskManager.process(1, time), is(2));
- assertThat(task01.timeout, equalTo(time.milliseconds()));
+ verify(task01).maybeInitTaskTimeoutOrThrow(anyLong(),
any(TimeoutException.class));
- // retry without error
+ // retry without error - clear the timeout and update the mock
timeoutException.set(null);
assertThat(taskManager.process(1, time), is(3));
Review Comment:
Setting `timeoutException.set(null)` doesn't update the mock's behavior. The
mock's `thenThrow()` at line 4198 was configured at setup time with the
exception object and will continue to throw it. The test expects the retry on
line 4219 to succeed, but it will still encounter the TimeoutException.
The test needs to either reconfigure the mock after line 4218 or avoid using
the AtomicReference approach entirely.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]