cadonna commented on code in PR #13932: URL: https://github.com/apache/kafka/pull/13932#discussion_r1396975782
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ########## @@ -125,11 +122,11 @@ public class ProcessorStateManagerTest { private OffsetCheckpoint checkpoint; private StateDirectory stateDirectory; - @Mock(type = MockType.NICE) + @Mock private StateStore store; Review Comment: My IDE tells me that this field is never used. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ########## @@ -361,41 +339,30 @@ public void shouldRecycleStoreAndReregisterChangelog() { assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition)); assertThat(stateMgr.getStore(persistentStoreName), equalTo(store)); - reset(context, store); - context.uninitialize(); - expect(store.name()).andStubReturn(persistentStoreName); - replay(context, store); + when(store.name()).thenReturn(persistentStoreName); stateMgr.registerStateStores(singletonList(store), context); - verify(context, store); + verify(context, times(2)).uninitialize(); assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); } @Test public void shouldClearStoreCache() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); - reset(storeMetadata); - final CachingStore store = EasyMock.createMock(CachingStore.class); - expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition); - expect(storeMetadata.store()).andStubReturn(store); - expect(store.name()).andStubReturn(persistentStoreName); - store.clearCache(); Review Comment: I am missing this verification with Mockito and I assume this is the most important one given the name of the test. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1430,16 +1385,11 @@ public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedE @Test public void shouldShutdownTaskManagerOnClose() { - final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - EasyMock.replay(consumerGroupMetadata); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); - taskManager.shutdown(true); - EasyMock.expectLastCall(); - EasyMock.replay(taskManager, consumer); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + final TaskManager taskManager = mock(TaskManager.class); + doNothing().when(taskManager).shutdown(true); Review Comment: Please change this to verification instead of a stub. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -766,14 +750,10 @@ public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing() throws Interrupt mockTime ); - final Consumer<byte[], byte[]> mockConsumer = EasyMock.createNiceMock(Consumer.class); - expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty()); Review Comment: You cannot remove this stub without replacement. In this way a poll on the consumer will return `null`. Some tests are failing because of that. You haven't seen failing them because of the removal of `@Parametrized`. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2701,32 +2622,20 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation() { @SuppressWarnings("unchecked") public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() { final StreamsConfig config = new StreamsConfig(configProps(false)); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - consumer.subscribe((Collection<String>) anyObject(), anyObject()); - EasyMock.expectLastCall().anyTimes(); - consumer.unsubscribe(); - EasyMock.expectLastCall().anyTimes(); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - EasyMock.replay(consumerGroupMetadata); + doNothing().when(consumer).subscribe((Collection<String>) any(), any()); Review Comment: Please change this to verification instead of a stub or remove it if it was not verified before the migration. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2920,26 +2801,18 @@ void runOnceWithoutProcessingThreads() { thread.setState(StreamThread.State.STARTING); thread.runLoop(); - - verify(taskManager); } @Test @SuppressWarnings("unchecked") public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask() { final StreamsConfig config = new StreamsConfig(configProps(true)); - - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - consumer.subscribe((Collection<String>) anyObject(), anyObject()); - EasyMock.expectLastCall().anyTimes(); - consumer.unsubscribe(); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(consumerGroupMetadata); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + doNothing().when(consumer).subscribe((Collection<String>) any(), any()); Review Comment: Please change this to verification instead of a stub or remove it if it was not verified before the migration. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ########## @@ -311,48 +303,34 @@ public void shouldRestoreTimestampedStoreWithConverter() { @Test public void shouldUnregisterChangelogsDuringClose() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); - reset(storeMetadata); - final StateStore store = EasyMock.createMock(StateStore.class); - expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition); - expect(storeMetadata.store()).andStubReturn(store); - expect(store.name()).andStubReturn(persistentStoreName); - - context.uninitialize(); - store.init((StateStoreContext) context, store); - replay(storeMetadata, context, store); + final StateStore store = mock(StateStore.class); + when(store.name()).thenReturn(persistentStoreName); stateMgr.registerStateStores(singletonList(store), context); - verify(context, store); + + verify(context).uninitialize(); + verify(store).init((StateStoreContext) context, store); stateMgr.registerStore(store, noopStateRestoreCallback, null); assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); - reset(store); - expect(store.name()).andStubReturn(persistentStoreName); - store.close(); - replay(store); + when(store.name()).thenReturn(persistentStoreName); Review Comment: I think you do not need this. You have already specified the same stub at line 307. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -3094,34 +2941,27 @@ public void shouldNotCommitNonRunningNonRestoringTasks() { final TaskId taskId2 = new TaskId(0, 2); final TaskId taskId3 = new TaskId(0, 3); - expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes(); - expect(task1.id()).andReturn(taskId1).anyTimes(); - expect(task2.state()).andReturn(Task.State.RESTORING).anyTimes(); - expect(task2.id()).andReturn(taskId2).anyTimes(); - expect(task3.state()).andReturn(Task.State.CREATED).anyTimes(); - expect(task3.id()).andReturn(taskId3).anyTimes(); + when(task1.state()).thenReturn(Task.State.RUNNING); + when(task2.state()).thenReturn(Task.State.RESTORING); + when(task3.state()).thenReturn(Task.State.CREATED); - expect(taskManager.allOwnedTasks()).andReturn(mkMap( + when(taskManager.allOwnedTasks()).thenReturn(mkMap( mkEntry(taskId1, task1), mkEntry(taskId2, task2), mkEntry(taskId3, task3) - )).anyTimes(); + )); // expect not to try and commit task3, because it's not running. - expect(taskManager.commit(mkSet(task1, task2))).andReturn(2).times(1); Review Comment: I think you also need to migrate the `times(1)` in form of a verification. ``` verify(taskManager, once()).commit(...); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -3494,32 +3323,27 @@ private StreamThread setUpThread(final Properties streamsConfigProps) { } private TaskManager mockTaskManager(final Task runningTask) { - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + final TaskManager taskManager = mock(TaskManager.class); final TaskId taskId = new TaskId(0, 0); - expect(runningTask.state()).andStubReturn(Task.State.RUNNING); - expect(runningTask.id()).andStubReturn(taskId); - expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask)); - expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1); + when(runningTask.state()).thenReturn(Task.State.RUNNING); + when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, runningTask)); return taskManager; } private TaskManager mockTaskManagerPurge(final int numberOfPurges) { final Task runningTask = mock(Task.class); final TaskManager taskManager = mockTaskManager(runningTask); - taskManager.maybePurgeCommittedRecords(); - EasyMock.expectLastCall().times(numberOfPurges); - EasyMock.replay(taskManager, runningTask); + doNothing().when(taskManager).maybePurgeCommittedRecords(); return taskManager; } private TaskManager mockTaskManagerCommit(final int commits) { final Task runningTask = mock(Task.class); final TaskManager taskManager = mockTaskManager(runningTask); - expect(taskManager.commit(Collections.singleton(runningTask))).andReturn(commits).times(1); Review Comment: I think the `times(1)` needs to be transformed in a verification in Mockito. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -162,10 +154,14 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; -@RunWith(Parameterized.class) +@RunWith(MockitoJUnitRunner.StrictStubs.class) Review Comment: I assume this happened during the rebase. You cannot simply replace `@RunWith(Parameterized.class)` with `@RunWith(MockitoJUnitRunner.StrictStubs.class)`. You need to keep `@RunWith(Parameterized.class)` and specify a rule like this: ``` @Rule public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); ``` Otherwise, the test class will not be run for all parameter values. If you do that, you will realize that there are some test failures. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -570,27 +563,25 @@ public void shouldAlsoPurgeWhenNothingGetsCommitted() { props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); final StreamsConfig config = new StreamsConfig(props); - final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); + @SuppressWarnings("unchecked") + final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskId taskId = new TaskId(0, 0); final Task runningTask = statelessTask(taskId) .inState(Task.State.RUNNING).build(); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask)); - expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(0); - taskManager.maybePurgeCommittedRecords(); - EasyMock.replay(consumer, consumerGroupMetadata, taskManager); + final TaskManager taskManager = mock(TaskManager.class); + when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, runningTask)); + when(taskManager.commit(Collections.singleton(runningTask))).thenReturn(0); + doNothing().when(taskManager).maybePurgeCommittedRecords(); Review Comment: Do you really need this stub? I think this is something that you need to verify with ``` verify(taskManager).maybePurgeCommittedRecords(); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -698,14 +689,11 @@ public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing() thro config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), mockTime ); - - final Consumer<byte[], byte[]> mockConsumer = EasyMock.createNiceMock(Consumer.class); - expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty()); + final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - EasyMock.replay(consumerGroupMetadata, mockConsumer); - final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(mockConsumer); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(consumer); Review Comment: Could you please rename this class to something more generic? It seems weird to have an `EasyMockConsumerClientSupplier` that supplies a Mockito mock. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1100,39 +1067,27 @@ public void shouldPurgeAfterPurgeInterval() { thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); - - verify(taskManager); } @Test public void shouldRecordCommitLatency() { - final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - expect(consumer.poll(anyObject())).andStubReturn(new ConsumerRecords<>(Collections.emptyMap())); - expect(consumer.assignment()).andStubReturn(emptySet()); - final Task task = niceMock(Task.class); - expect(task.id()).andStubReturn(task1); - expect(task.inputPartitions()).andStubReturn(Collections.singleton(t1p1)); - expect(task.committedOffsets()).andStubReturn(Collections.emptyMap()); - expect(task.highWaterMark()).andStubReturn(Collections.emptyMap()); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + when(consumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); Review Comment: nit: You could use `ConsumerRecords.empty()`. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1496,9 +1443,7 @@ public long restore(final Map<TaskId, Task> tasks) { }; } - taskManager.handleLostAll(); - - EasyMock.replay(taskManager, internalTopologyBuilder); + doNothing().when(taskManager).handleLostAll(); Review Comment: Please change this to verification instead of a stub. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ########## @@ -361,41 +339,30 @@ public void shouldRecycleStoreAndReregisterChangelog() { assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition)); assertThat(stateMgr.getStore(persistentStoreName), equalTo(store)); - reset(context, store); - context.uninitialize(); - expect(store.name()).andStubReturn(persistentStoreName); - replay(context, store); + when(store.name()).thenReturn(persistentStoreName); Review Comment: Same here. This stub is already specified on line 329. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1538,16 +1481,11 @@ public long restore(final Map<TaskId, Task> tasks) { @Test public void shouldShutdownTaskManagerOnCloseWithoutStart() { - final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - EasyMock.replay(consumerGroupMetadata); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); - taskManager.shutdown(true); - EasyMock.expectLastCall(); - EasyMock.replay(taskManager, consumer); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + final TaskManager taskManager = mock(TaskManager.class); + doNothing().when(taskManager).shutdown(true); Review Comment: Please change this to verification instead of a stub. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -203,6 +199,11 @@ public static Collection<Object[]> data() { @SuppressWarnings("unchecked") private final Consumer<byte[], byte[]> mainConsumer = (Consumer<byte[], byte[]>) Mockito.mock(Consumer.class); + @Mock + private Consumer<byte[], byte[]> consumer; + + private StreamsMetadataState streamsMetadataState; + Review Comment: My IDE says that this is never used. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1557,21 +1495,15 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() { thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) .updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); thread.shutdown(); - verify(taskManager); } @Test public void shouldOnlyShutdownOnce() { - final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - EasyMock.replay(consumerGroupMetadata); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); - taskManager.shutdown(true); - EasyMock.expectLastCall(); - EasyMock.replay(taskManager, consumer); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + final TaskManager taskManager = mock(TaskManager.class); + doNothing().when(taskManager).shutdown(true); Review Comment: Please change this to verification instead of a stub. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2842,45 +2737,31 @@ void runOnceWithoutProcessingThreads() { thread.setStreamsUncaughtExceptionHandler((e, b) -> exceptionHandlerInvoked.set(true)); thread.run(); - verify(taskManager); assertThat(exceptionHandlerInvoked.get(), is(true)); } @Test @SuppressWarnings("unchecked") public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() { final StreamsConfig config = new StreamsConfig(configProps(false)); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - consumer.subscribe((Collection<String>) anyObject(), anyObject()); - EasyMock.expectLastCall().anyTimes(); - consumer.unsubscribe(); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(consumerGroupMetadata); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + doNothing().when(consumer).subscribe((Collection<String>) any(), any()); + doNothing().when(consumer).unsubscribe(); Review Comment: Please change these to verifications instead of stubs or remove them if they where not verified before the migration. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2701,32 +2622,20 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation() { @SuppressWarnings("unchecked") public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() { final StreamsConfig config = new StreamsConfig(configProps(false)); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - consumer.subscribe((Collection<String>) anyObject(), anyObject()); - EasyMock.expectLastCall().anyTimes(); - consumer.unsubscribe(); Review Comment: Is this verification not needed anymore? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2765,41 +2674,27 @@ void runOnceWithoutProcessingThreads() { }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); thread.run(); - - verify(taskManager); } @Test @SuppressWarnings("unchecked") public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler() { final StreamsConfig config = new StreamsConfig(configProps(false)); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - consumer.subscribe((Collection<String>) anyObject(), anyObject()); - EasyMock.expectLastCall().anyTimes(); - consumer.unsubscribe(); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(consumerGroupMetadata); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + doNothing().when(consumer).subscribe((Collection<String>) any(), any()); + doNothing().when(consumer).unsubscribe(); Review Comment: Please change these to verifications instead of stubs or remove them if they where not verified before the migration. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2997,27 +2863,19 @@ void runOnceWithoutProcessingThreads() { thread.setState(StreamThread.State.STARTING); thread.runLoop(); - - verify(taskManager); - verify(consumer); } @Test @SuppressWarnings("unchecked") public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() { final StreamsConfig config = new StreamsConfig(configProps(true)); - - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); + when(taskManager.producerClientIds()).thenReturn(Collections.emptySet()); final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - consumer.subscribe((Collection<String>) anyObject(), anyObject()); - EasyMock.expectLastCall().anyTimes(); - consumer.unsubscribe(); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(consumerGroupMetadata); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + doNothing().when(consumer).subscribe((Collection<String>) any(), any()); Review Comment: Please change this to verification instead of a stub. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -177,6 +174,9 @@ public class StreamThreadTest { @Mock private Consumer<byte[], byte[]> mainConsumer; + @Mock + private Consumer<byte[], byte[]> consumer; Review Comment: I am wondering why we have two consumer mocks here. But this is not relevant for this PR since it seems it was there before. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2997,27 +2863,19 @@ void runOnceWithoutProcessingThreads() { thread.setState(StreamThread.State.STARTING); thread.runLoop(); - - verify(taskManager); - verify(consumer); } @Test @SuppressWarnings("unchecked") public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() { final StreamsConfig config = new StreamsConfig(configProps(true)); - - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); + when(taskManager.producerClientIds()).thenReturn(Collections.emptySet()); Review Comment: I think you can remove this since it is the default return value and you removed those stubs in other places. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -3494,32 +3323,27 @@ private StreamThread setUpThread(final Properties streamsConfigProps) { } private TaskManager mockTaskManager(final Task runningTask) { - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + final TaskManager taskManager = mock(TaskManager.class); final TaskId taskId = new TaskId(0, 0); - expect(runningTask.state()).andStubReturn(Task.State.RUNNING); - expect(runningTask.id()).andStubReturn(taskId); - expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask)); - expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1); + when(runningTask.state()).thenReturn(Task.State.RUNNING); + when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, runningTask)); return taskManager; } private TaskManager mockTaskManagerPurge(final int numberOfPurges) { final Task runningTask = mock(Task.class); final TaskManager taskManager = mockTaskManager(runningTask); - taskManager.maybePurgeCommittedRecords(); - EasyMock.expectLastCall().times(numberOfPurges); Review Comment: This needs to be transformed in a verification in Mockito. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org