cadonna commented on code in PR #17209:
URL: https://github.com/apache/kafka/pull/17209#discussion_r1774661420
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
Review Comment:
We use 4 spaces and not 8 for indentation.
```suggestion
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
Review Comment:
The test is very good!
I just have some formatting comments.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
Review Comment:
Please fix the indentation also here.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00,
task01));
+ doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ verify(task00).initializeIfNeeded();
+ verify(task01).initializeIfNeeded();
+ verify(tasks).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00) &&
!tasksToInit.contains(task01))
+ );
+ verify(stateUpdater, never()).add(task00);
+ verify(stateUpdater).add(task01);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ // initializeIfNeeded() has NOT been called this time
+ verify(task00, Mockito.times(1)).initializeIfNeeded();
+ verify(tasks, Mockito.times(2)).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00))
+ );
+ verify(stateUpdater, never()).add(task00);
+
+ time.sleep(10000);
+ // do not throw lock exception this time
+ doNothing().when(task00).initializeIfNeeded();
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ verify(task00, Mockito.times(2)).initializeIfNeeded();
+ verify(tasks, Mockito.times(2)).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00))
Review Comment:
Please fix the indentation
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00,
task01));
+ doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ verify(task00).initializeIfNeeded();
+ verify(task01).initializeIfNeeded();
+ verify(tasks).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00) &&
!tasksToInit.contains(task01))
Review Comment:
Please fix the indentation.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00,
task01));
+ doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ verify(task00).initializeIfNeeded();
Review Comment:
Could you add an inline comment here stating:
```java
// task00 should not be initialized due to LockException, task01 should be
initialized
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00,
task01));
+ doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ verify(task00).initializeIfNeeded();
+ verify(task01).initializeIfNeeded();
+ verify(tasks).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00) &&
!tasksToInit.contains(task01))
+ );
+ verify(stateUpdater, never()).add(task00);
+ verify(stateUpdater).add(task01);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
Review Comment:
Could you add a `time.sleep(5000)` before this call, please?
Please add a new line between `time.sleep()` and `checkStateUpdater()`.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00,
task01));
+ doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ verify(task00).initializeIfNeeded();
+ verify(task01).initializeIfNeeded();
+ verify(tasks).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00) &&
!tasksToInit.contains(task01))
+ );
+ verify(stateUpdater, never()).add(task00);
+ verify(stateUpdater).add(task01);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ // initializeIfNeeded() has NOT been called this time
+ verify(task00, Mockito.times(1)).initializeIfNeeded();
+ verify(tasks, Mockito.times(2)).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00))
Review Comment:
Please fix the indentation
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00,
task01));
+ doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ verify(task00).initializeIfNeeded();
+ verify(task01).initializeIfNeeded();
+ verify(tasks).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00) &&
!tasksToInit.contains(task01))
+ );
+ verify(stateUpdater, never()).add(task00);
+ verify(stateUpdater).add(task01);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ // initializeIfNeeded() has NOT been called this time
+ verify(task00, Mockito.times(1)).initializeIfNeeded();
+ verify(tasks, Mockito.times(2)).addPendingTasksToInit(
Review Comment:
In this test class we use `times()` without `Mockito.`. Even in this test
you use `never()` without `Mockito.`. Please remove the `Mockito.` to be
consistent.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00,
task01));
+ doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ verify(task00).initializeIfNeeded();
+ verify(task01).initializeIfNeeded();
+ verify(tasks).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00) &&
!tasksToInit.contains(task01))
+ );
+ verify(stateUpdater, never()).add(task00);
+ verify(stateUpdater).add(task01);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ // initializeIfNeeded() has NOT been called this time
+ verify(task00, Mockito.times(1)).initializeIfNeeded();
+ verify(tasks, Mockito.times(2)).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00))
+ );
+ verify(stateUpdater, never()).add(task00);
+
+ time.sleep(10000);
Review Comment:
I would add a new line after this line to highlight that time passed between
the two initialization attempts.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
Review Comment:
Could you please rename the test to
`shouldRetryInitializationWithBackoffWhenInitializationFails`?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00,
task01));
+ doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ verify(task00).initializeIfNeeded();
+ verify(task01).initializeIfNeeded();
+ verify(tasks).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00) &&
!tasksToInit.contains(task01))
+ );
+ verify(stateUpdater, never()).add(task00);
+ verify(stateUpdater).add(task01);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ // initializeIfNeeded() has NOT been called this time
Review Comment:
This inline comment is not really clear. Could you please change it to
something like:
```java
// task00 should not be initialized since the backoff period has not passed.
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}
+ @Test
+ public void shouldRetryInitializationWhenCanNotInitializeTask() {
+ final StreamTask task00 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING).build();
+ final StandbyTask task01 = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00,
task01));
+ doThrow(new LockException("Lock
Exception!")).when(task00).initializeIfNeeded();
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ verify(task00).initializeIfNeeded();
+ verify(task01).initializeIfNeeded();
+ verify(tasks).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00) &&
!tasksToInit.contains(task01))
+ );
+ verify(stateUpdater, never()).add(task00);
+ verify(stateUpdater).add(task01);
+
+ taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+ // initializeIfNeeded() has NOT been called this time
+ verify(task00, Mockito.times(1)).initializeIfNeeded();
+ verify(tasks, Mockito.times(2)).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00))
+ );
+ verify(stateUpdater, never()).add(task00);
+
+ time.sleep(10000);
+ // do not throw lock exception this time
Review Comment:
Could you change this comment to something like:
```java
// task00 should call initialize since the backoff period has passed
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]