cadonna commented on code in PR #13932:
URL: https://github.com/apache/kafka/pull/13932#discussion_r1396975782


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -125,11 +122,11 @@ public class ProcessorStateManagerTest {
     private OffsetCheckpoint checkpoint;
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateStore store;

Review Comment:
   My IDE tells me that this field is never used.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -361,41 +339,30 @@ public void shouldRecycleStoreAndReregisterChangelog() {
         
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
         assertThat(stateMgr.getStore(persistentStoreName), equalTo(store));
 
-        reset(context, store);
-        context.uninitialize();
-        expect(store.name()).andStubReturn(persistentStoreName);
-        replay(context, store);
+        when(store.name()).thenReturn(persistentStoreName);
 
         stateMgr.registerStateStores(singletonList(store), context);
 
-        verify(context, store);
+        verify(context, times(2)).uninitialize();
         
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
     }
 
     @Test
     public void shouldClearStoreCache() {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-        reset(storeMetadata);
-        final CachingStore store = EasyMock.createMock(CachingStore.class);
-        
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-        expect(storeMetadata.store()).andStubReturn(store);
-        expect(store.name()).andStubReturn(persistentStoreName);
-        store.clearCache();

Review Comment:
   I am missing this verification with Mockito and I assume this is the most 
important one given the name of the test.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1430,16 +1385,11 @@ public void 
shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedE
 
     @Test
     public void shouldShutdownTaskManagerOnClose() {
-        final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        EasyMock.replay(consumerGroupMetadata);
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
-        taskManager.shutdown(true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(taskManager, consumer);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final TaskManager taskManager = mock(TaskManager.class);
+        doNothing().when(taskManager).shutdown(true);

Review Comment:
   Please change this to verification instead of a stub.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -766,14 +750,10 @@ public void 
shouldNotEnforceRebalanceWhenCurrentlyRebalancing() throws Interrupt
             mockTime
         );
 
-        final Consumer<byte[], byte[]> mockConsumer = 
EasyMock.createNiceMock(Consumer.class);
-        
expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty());

Review Comment:
   You cannot remove this stub without replacement. In this way a poll on the 
consumer will return `null`. Some tests are failing because of that. You 
haven't seen failing them because of the removal of `@Parametrized`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2701,32 +2622,20 @@ public void 
shouldThrowTaskMigratedExceptionHandlingRevocation() {
     @SuppressWarnings("unchecked")
     public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() {
         final StreamsConfig config = new StreamsConfig(configProps(false));
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
-        consumer.unsubscribe();
-        EasyMock.expectLastCall().anyTimes();
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        EasyMock.replay(consumerGroupMetadata);
+        doNothing().when(consumer).subscribe((Collection<String>) any(), 
any());

Review Comment:
   Please change this to verification instead of a stub or remove it if it was 
not verified before the migration.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2920,26 +2801,18 @@ void runOnceWithoutProcessingThreads() {
 
         thread.setState(StreamThread.State.STARTING);
         thread.runLoop();
-
-        verify(taskManager);
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void 
shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask() {
         final StreamsConfig config = new StreamsConfig(configProps(true));
-
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
-        consumer.unsubscribe();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.replay(consumerGroupMetadata);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        doNothing().when(consumer).subscribe((Collection<String>) any(), 
any());

Review Comment:
   
   
   Please change this to verification instead of a stub or remove it if it was 
not verified before the migration.
   



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -311,48 +303,34 @@ public void shouldRestoreTimestampedStoreWithConverter() {
     @Test
     public void shouldUnregisterChangelogsDuringClose() {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-        reset(storeMetadata);
-        final StateStore store = EasyMock.createMock(StateStore.class);
-        
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-        expect(storeMetadata.store()).andStubReturn(store);
-        expect(store.name()).andStubReturn(persistentStoreName);
-
-        context.uninitialize();
-        store.init((StateStoreContext) context, store);
-        replay(storeMetadata, context, store);
+        final StateStore store = mock(StateStore.class);
+        when(store.name()).thenReturn(persistentStoreName);
 
         stateMgr.registerStateStores(singletonList(store), context);
-        verify(context, store);
+
+        verify(context).uninitialize();
+        verify(store).init((StateStoreContext) context, store);
 
         stateMgr.registerStore(store, noopStateRestoreCallback, null);
         
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
-        reset(store);
-        expect(store.name()).andStubReturn(persistentStoreName);
-        store.close();
-        replay(store);
+        when(store.name()).thenReturn(persistentStoreName);

Review Comment:
   I think you do not need this. You have already specified the same stub at 
line 307. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3094,34 +2941,27 @@ public void 
shouldNotCommitNonRunningNonRestoringTasks() {
         final TaskId taskId2 = new TaskId(0, 2);
         final TaskId taskId3 = new TaskId(0, 3);
 
-        expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
-        expect(task1.id()).andReturn(taskId1).anyTimes();
-        expect(task2.state()).andReturn(Task.State.RESTORING).anyTimes();
-        expect(task2.id()).andReturn(taskId2).anyTimes();
-        expect(task3.state()).andReturn(Task.State.CREATED).anyTimes();
-        expect(task3.id()).andReturn(taskId3).anyTimes();
+        when(task1.state()).thenReturn(Task.State.RUNNING);
+        when(task2.state()).thenReturn(Task.State.RESTORING);
+        when(task3.state()).thenReturn(Task.State.CREATED);
 
-        expect(taskManager.allOwnedTasks()).andReturn(mkMap(
+        when(taskManager.allOwnedTasks()).thenReturn(mkMap(
             mkEntry(taskId1, task1),
             mkEntry(taskId2, task2),
             mkEntry(taskId3, task3)
-        )).anyTimes();
+        ));
 
         // expect not to try and commit task3, because it's not running.
-        expect(taskManager.commit(mkSet(task1, task2))).andReturn(2).times(1);

Review Comment:
   I think you also need to migrate the `times(1)` in form of a verification.
   ```
   verify(taskManager, once()).commit(...);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3494,32 +3323,27 @@ private StreamThread setUpThread(final Properties 
streamsConfigProps) {
     }
 
     private TaskManager mockTaskManager(final Task runningTask) {
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
+        final TaskManager taskManager = mock(TaskManager.class);
         final TaskId taskId = new TaskId(0, 0);
 
-        expect(runningTask.state()).andStubReturn(Task.State.RUNNING);
-        expect(runningTask.id()).andStubReturn(taskId);
-        
expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId,
 runningTask));
-        
expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1);
+        when(runningTask.state()).thenReturn(Task.State.RUNNING);
+        
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
         return taskManager;
     }
 
     private TaskManager mockTaskManagerPurge(final int numberOfPurges) {
         final Task runningTask = mock(Task.class);
         final TaskManager taskManager = mockTaskManager(runningTask);
 
-        taskManager.maybePurgeCommittedRecords();
-        EasyMock.expectLastCall().times(numberOfPurges);
-        EasyMock.replay(taskManager, runningTask);
+        doNothing().when(taskManager).maybePurgeCommittedRecords();
         return taskManager;
     }
 
     private TaskManager mockTaskManagerCommit(final int commits) {
         final Task runningTask = mock(Task.class);
         final TaskManager taskManager = mockTaskManager(runningTask);
 
-        
expect(taskManager.commit(Collections.singleton(runningTask))).andReturn(commits).times(1);

Review Comment:
   I think the `times(1)` needs to be transformed in a verification in Mockito.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -162,10 +154,14 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
-@RunWith(Parameterized.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)

Review Comment:
   I assume this happened during the rebase.
   
   You cannot simply replace `@RunWith(Parameterized.class)` with 
`@RunWith(MockitoJUnitRunner.StrictStubs.class)`. You need to keep 
`@RunWith(Parameterized.class)` and specify a rule like this:
   
   ```
   @Rule
   public MockitoRule rule = 
MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
   ``` 
   
   Otherwise, the test class will not be run for all parameter values.
   
   If you do that, you will realize that there are some test failures. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -570,27 +563,25 @@ public void shouldAlsoPurgeWhenNothingGetsCommitted() {
         props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, 
Long.toString(purgeInterval));
 
         final StreamsConfig config = new StreamsConfig(props);
-        final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
         final TaskId taskId = new TaskId(0, 0);
         final Task runningTask = statelessTask(taskId)
             .inState(Task.State.RUNNING).build();
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId,
 runningTask));
-        
expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(0);
-        taskManager.maybePurgeCommittedRecords();
-        EasyMock.replay(consumer, consumerGroupMetadata, taskManager);
+        final TaskManager taskManager = mock(TaskManager.class);
+        
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
+        
when(taskManager.commit(Collections.singleton(runningTask))).thenReturn(0);
+        doNothing().when(taskManager).maybePurgeCommittedRecords();

Review Comment:
   Do you really need this stub? I think this is something that you need to 
verify with
   ```
   verify(taskManager).maybePurgeCommittedRecords();
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -698,14 +689,11 @@ public void 
shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing() thro
             config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             mockTime
         );
-        
-        final Consumer<byte[], byte[]> mockConsumer = 
EasyMock.createNiceMock(Consumer.class);
-        
expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty());
+
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        
expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        EasyMock.replay(consumerGroupMetadata, mockConsumer);
-        final EasyMockConsumerClientSupplier mockClientSupplier = new 
EasyMockConsumerClientSupplier(mockConsumer);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final EasyMockConsumerClientSupplier mockClientSupplier = new 
EasyMockConsumerClientSupplier(consumer);

Review Comment:
   Could you please rename this class to something more generic? It seems weird 
to have an `EasyMockConsumerClientSupplier` that supplies a Mockito mock.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1100,39 +1067,27 @@ public void shouldPurgeAfterPurgeInterval() {
 
         thread.setNow(mockTime.milliseconds());
         thread.maybeCommit();
-
-        verify(taskManager);
     }
 
     @Test
     public void shouldRecordCommitLatency() {
-        final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        expect(consumer.poll(anyObject())).andStubReturn(new 
ConsumerRecords<>(Collections.emptyMap()));
-        expect(consumer.assignment()).andStubReturn(emptySet());
-        final Task task = niceMock(Task.class);
-        expect(task.id()).andStubReturn(task1);
-        
expect(task.inputPartitions()).andStubReturn(Collections.singleton(t1p1));
-        expect(task.committedOffsets()).andStubReturn(Collections.emptyMap());
-        expect(task.highWaterMark()).andStubReturn(Collections.emptyMap());
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        when(consumer.poll(any())).thenReturn(new 
ConsumerRecords<>(Collections.emptyMap()));

Review Comment:
   nit:
   You could use `ConsumerRecords.empty()`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1496,9 +1443,7 @@ public long restore(final Map<TaskId, Task> tasks) {
             };
         }
 
-        taskManager.handleLostAll();
-
-        EasyMock.replay(taskManager, internalTopologyBuilder);
+        doNothing().when(taskManager).handleLostAll();

Review Comment:
   Please change this to verification instead of a stub.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -361,41 +339,30 @@ public void shouldRecycleStoreAndReregisterChangelog() {
         
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
         assertThat(stateMgr.getStore(persistentStoreName), equalTo(store));
 
-        reset(context, store);
-        context.uninitialize();
-        expect(store.name()).andStubReturn(persistentStoreName);
-        replay(context, store);
+        when(store.name()).thenReturn(persistentStoreName);

Review Comment:
   Same here. This stub is already specified on line 329.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1538,16 +1481,11 @@ public long restore(final Map<TaskId, Task> tasks) {
 
     @Test
     public void shouldShutdownTaskManagerOnCloseWithoutStart() {
-        final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        EasyMock.replay(consumerGroupMetadata);
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
-        taskManager.shutdown(true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(taskManager, consumer);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final TaskManager taskManager = mock(TaskManager.class);
+        doNothing().when(taskManager).shutdown(true);

Review Comment:
   Please change this to verification instead of a stub.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -203,6 +199,11 @@ public static Collection<Object[]> data() {
     @SuppressWarnings("unchecked")
     private final Consumer<byte[], byte[]> mainConsumer = (Consumer<byte[], 
byte[]>) Mockito.mock(Consumer.class);
 
+    @Mock
+    private Consumer<byte[], byte[]> consumer;
+
+    private StreamsMetadataState streamsMetadataState;
+

Review Comment:
   My IDE says that this is never used.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1557,21 +1495,15 @@ public void 
shouldShutdownTaskManagerOnCloseWithoutStart() {
         thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata)
             .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
         thread.shutdown();
-        verify(taskManager);
     }
 
     @Test
     public void shouldOnlyShutdownOnce() {
-        final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        EasyMock.replay(consumerGroupMetadata);
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
-        taskManager.shutdown(true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(taskManager, consumer);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final TaskManager taskManager = mock(TaskManager.class);
+        doNothing().when(taskManager).shutdown(true);

Review Comment:
   Please change this to verification instead of a stub.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2842,45 +2737,31 @@ void runOnceWithoutProcessingThreads() {
         thread.setStreamsUncaughtExceptionHandler((e, b) -> 
exceptionHandlerInvoked.set(true));
         thread.run();
 
-        verify(taskManager);
         assertThat(exceptionHandlerInvoked.get(), is(true));
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void 
shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() {
         final StreamsConfig config = new StreamsConfig(configProps(false));
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
-        consumer.unsubscribe();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.replay(consumerGroupMetadata);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        doNothing().when(consumer).subscribe((Collection<String>) any(), 
any());
+        doNothing().when(consumer).unsubscribe();

Review Comment:
   Please change these to verifications instead of stubs or remove them if they 
where not verified before the migration.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2701,32 +2622,20 @@ public void 
shouldThrowTaskMigratedExceptionHandlingRevocation() {
     @SuppressWarnings("unchecked")
     public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() {
         final StreamsConfig config = new StreamsConfig(configProps(false));
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
-        consumer.unsubscribe();

Review Comment:
   Is this verification not needed anymore?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2765,41 +2674,27 @@ void runOnceWithoutProcessingThreads() {
         }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
 
         thread.run();
-
-        verify(taskManager);
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void 
shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler() {
         final StreamsConfig config = new StreamsConfig(configProps(false));
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
-        consumer.unsubscribe();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.replay(consumerGroupMetadata);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        doNothing().when(consumer).subscribe((Collection<String>) any(), 
any());
+        doNothing().when(consumer).unsubscribe();

Review Comment:
   Please change these to verifications instead of stubs or remove them if they 
where not verified before the migration.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2997,27 +2863,19 @@ void runOnceWithoutProcessingThreads() {
 
         thread.setState(StreamThread.State.STARTING);
         thread.runLoop();
-
-        verify(taskManager);
-        verify(consumer);
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void 
shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() {
         final StreamsConfig config = new StreamsConfig(configProps(true));
-
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
+        
when(taskManager.producerClientIds()).thenReturn(Collections.emptySet());
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
-        consumer.unsubscribe();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.replay(consumerGroupMetadata);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        doNothing().when(consumer).subscribe((Collection<String>) any(), 
any());

Review Comment:
   Please change this to verification instead of a stub.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -177,6 +174,9 @@ public class StreamThreadTest {
     @Mock
     private Consumer<byte[], byte[]> mainConsumer;
 
+    @Mock
+    private Consumer<byte[], byte[]> consumer;

Review Comment:
   I am wondering why we have two consumer mocks here. But this is not relevant 
for this PR since it seems it was there before.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2997,27 +2863,19 @@ void runOnceWithoutProcessingThreads() {
 
         thread.setState(StreamThread.State.STARTING);
         thread.runLoop();
-
-        verify(taskManager);
-        verify(consumer);
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void 
shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() {
         final StreamsConfig config = new StreamsConfig(configProps(true));
-
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
+        
when(taskManager.producerClientIds()).thenReturn(Collections.emptySet());

Review Comment:
   I think you can remove this since it is the default return value and you 
removed those stubs in other places.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3494,32 +3323,27 @@ private StreamThread setUpThread(final Properties 
streamsConfigProps) {
     }
 
     private TaskManager mockTaskManager(final Task runningTask) {
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
+        final TaskManager taskManager = mock(TaskManager.class);
         final TaskId taskId = new TaskId(0, 0);
 
-        expect(runningTask.state()).andStubReturn(Task.State.RUNNING);
-        expect(runningTask.id()).andStubReturn(taskId);
-        
expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId,
 runningTask));
-        
expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1);
+        when(runningTask.state()).thenReturn(Task.State.RUNNING);
+        
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
         return taskManager;
     }
 
     private TaskManager mockTaskManagerPurge(final int numberOfPurges) {
         final Task runningTask = mock(Task.class);
         final TaskManager taskManager = mockTaskManager(runningTask);
 
-        taskManager.maybePurgeCommittedRecords();
-        EasyMock.expectLastCall().times(numberOfPurges);

Review Comment:
   This needs to be transformed in a verification in Mockito.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to