cadonna commented on code in PR #17209:
URL: https://github.com/apache/kafka/pull/17209#discussion_r1774661420


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();

Review Comment:
   We use 4 spaces and not 8 for indentation.
   ```suggestion
               .withInputPartitions(taskId00Partitions)
               .inState(State.RESTORING).build();
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {

Review Comment:
   The test is very good!
   I just have some formatting comments.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();

Review Comment:
   Please fix the indentation also here.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, 
task01));
+        doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        verify(task00).initializeIfNeeded();
+        verify(task01).initializeIfNeeded();
+        verify(tasks).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00) && 
!tasksToInit.contains(task01))
+        );
+        verify(stateUpdater, never()).add(task00);
+        verify(stateUpdater).add(task01);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        // initializeIfNeeded() has NOT been called this time
+        verify(task00, Mockito.times(1)).initializeIfNeeded();
+        verify(tasks, Mockito.times(2)).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00))
+        );
+        verify(stateUpdater, never()).add(task00);
+
+        time.sleep(10000);
+        // do not throw lock exception this time
+        doNothing().when(task00).initializeIfNeeded();
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        verify(task00, Mockito.times(2)).initializeIfNeeded();
+        verify(tasks, Mockito.times(2)).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00))

Review Comment:
   Please fix the indentation



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, 
task01));
+        doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        verify(task00).initializeIfNeeded();
+        verify(task01).initializeIfNeeded();
+        verify(tasks).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00) && 
!tasksToInit.contains(task01))

Review Comment:
   Please fix the indentation.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, 
task01));
+        doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        verify(task00).initializeIfNeeded();

Review Comment:
   Could you add an inline comment here stating:
   ```java
   // task00 should not be initialized due to LockException, task01 should be 
initialized
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, 
task01));
+        doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        verify(task00).initializeIfNeeded();
+        verify(task01).initializeIfNeeded();
+        verify(tasks).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00) && 
!tasksToInit.contains(task01))
+        );
+        verify(stateUpdater, never()).add(task00);
+        verify(stateUpdater).add(task01);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

Review Comment:
   Could you add a `time.sleep(5000)` before this call, please?
   Please add a new line between `time.sleep()` and `checkStateUpdater()`. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, 
task01));
+        doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        verify(task00).initializeIfNeeded();
+        verify(task01).initializeIfNeeded();
+        verify(tasks).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00) && 
!tasksToInit.contains(task01))
+        );
+        verify(stateUpdater, never()).add(task00);
+        verify(stateUpdater).add(task01);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        // initializeIfNeeded() has NOT been called this time
+        verify(task00, Mockito.times(1)).initializeIfNeeded();
+        verify(tasks, Mockito.times(2)).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00))

Review Comment:
   Please fix the indentation



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, 
task01));
+        doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        verify(task00).initializeIfNeeded();
+        verify(task01).initializeIfNeeded();
+        verify(tasks).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00) && 
!tasksToInit.contains(task01))
+        );
+        verify(stateUpdater, never()).add(task00);
+        verify(stateUpdater).add(task01);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        // initializeIfNeeded() has NOT been called this time
+        verify(task00, Mockito.times(1)).initializeIfNeeded();
+        verify(tasks, Mockito.times(2)).addPendingTasksToInit(

Review Comment:
   In this test class we use `times()` without `Mockito.`. Even in this test 
you use `never()` without `Mockito.`. Please remove the `Mockito.` to be 
consistent. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, 
task01));
+        doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        verify(task00).initializeIfNeeded();
+        verify(task01).initializeIfNeeded();
+        verify(tasks).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00) && 
!tasksToInit.contains(task01))
+        );
+        verify(stateUpdater, never()).add(task00);
+        verify(stateUpdater).add(task01);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        // initializeIfNeeded() has NOT been called this time
+        verify(task00, Mockito.times(1)).initializeIfNeeded();
+        verify(tasks, Mockito.times(2)).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00))
+        );
+        verify(stateUpdater, never()).add(task00);
+
+        time.sleep(10000);

Review Comment:
   I would add a new line after this line to highlight that time passed between 
the two initialization attempts.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {

Review Comment:
   Could you please rename the test to 
`shouldRetryInitializationWithBackoffWhenInitializationFails`?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, 
task01));
+        doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        verify(task00).initializeIfNeeded();
+        verify(task01).initializeIfNeeded();
+        verify(tasks).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00) && 
!tasksToInit.contains(task01))
+        );
+        verify(stateUpdater, never()).add(task00);
+        verify(stateUpdater).add(task01);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        // initializeIfNeeded() has NOT been called this time

Review Comment:
   This inline comment is not really clear. Could you please change it to 
something like:
   ```java
   // task00 should not be initialized since the backoff period has not passed.
   ``` 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1243,6 +1245,50 @@ public void 
shouldRetryInitializationWhenLockExceptionInStateUpdater() {
         verify(stateUpdater).add(task01);
     }
 
+    @Test
+    public void shouldRetryInitializationWhenCanNotInitializeTask() {
+        final StreamTask task00 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+                .withInputPartitions(taskId00Partitions)
+                .inState(State.RESTORING).build();
+        final StandbyTask task01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+                .withInputPartitions(taskId01Partitions)
+                .inState(State.RUNNING).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, 
task01));
+        doThrow(new LockException("Lock 
Exception!")).when(task00).initializeIfNeeded();
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        verify(task00).initializeIfNeeded();
+        verify(task01).initializeIfNeeded();
+        verify(tasks).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00) && 
!tasksToInit.contains(task01))
+        );
+        verify(stateUpdater, never()).add(task00);
+        verify(stateUpdater).add(task01);
+
+        taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
+
+        // initializeIfNeeded() has NOT been called this time
+        verify(task00, Mockito.times(1)).initializeIfNeeded();
+        verify(tasks, Mockito.times(2)).addPendingTasksToInit(
+                argThat(tasksToInit -> tasksToInit.contains(task00))
+        );
+        verify(stateUpdater, never()).add(task00);
+
+        time.sleep(10000);
+        // do not throw lock exception this time

Review Comment:
   Could you change this comment to something like:
   ```java
   // task00 should call initialize since the backoff period has passed
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to