cadonna commented on code in PR #15261:
URL: https://github.com/apache/kafka/pull/15261#discussion_r1512305606


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2016,13 +2015,14 @@ public void 
shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, 
taskId01, taskId02)));
 
         handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
-        reset(consumer);
-        expectConsumerAssignmentPaused(consumer);
-        replay(consumer);
 
         taskManager.handleRebalanceComplete();
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, 
taskId01)));
         verify(stateDirectory);
+
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));

Review Comment:
   I see `singleton(new TopicPartition("assignment", 0))` added around 45 times 
in this PR. I assume this is because `handleAssignment()` uses it internally. I 
propose to define a field final static variable `assignment` or similar that 
can be reused. Alternatively, the assignment can also be passed to 
`handleAssignment()`. Maybe to limit the changes in this PR, you should go for 
the former option of the field variable.  



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2210,6 +2210,9 @@ public void shouldComputeOffsetSumForStandbyTask() throws 
Exception {
         restoringTask.setChangelogOffsets(changelogOffsets);
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        Mockito.verify(mockitoConsumer).resume(assignment);

Review Comment:
   With EasyMock you need to call `verify()` to verify a call. I do not see a 
call to `verify()` in the old code. That tells me that verifying the call to 
`consumer.resume()` was not relevant in the old code of this test. 
Additionally, the goal of this test, is to verify the result of 
`taskManager.getTaskOffsetSums()`. For that you need some tasks assigned. So 
part of the setup of this test is to install an assignment so that you get 
tasks assigned. With `Mockito.verify(mockitoConsumer).resume(assignment)` you 
would test rather an unimportant part of the setup of this test.
   To sum up, I have the impression that you verify more in the migration that 
it was verified in the old code and you added a rather unimportant verification.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2332,19 +2335,12 @@ public void 
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
         task00.setCommittableOffsetsAndMetadata(offsets);
 
         // first `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
-        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expectLastCall();
-
-        // `handleRevocation`
-        consumer.commitSync(offsets);
-        expectLastCall();
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        when(mockitoConsumer.assignment()).thenReturn(assignment);
 
-        // second `handleAssignment`
-        consumer.commitSync(offsets);
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
-        replay(consumer);
+        taskManager.setMainConsumer(mockitoConsumer);

Review Comment:
   Fair enough! Waiting for your follow-up PR then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to