clolov commented on code in PR #13932: URL: https://github.com/apache/kafka/pull/13932#discussion_r1397383304
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -203,6 +199,11 @@ public static Collection<Object[]> data() { @SuppressWarnings("unchecked") private final Consumer<byte[], byte[]> mainConsumer = (Consumer<byte[], byte[]>) Mockito.mock(Consumer.class); + @Mock + private Consumer<byte[], byte[]> consumer; + + private StreamsMetadataState streamsMetadataState; + Review Comment: Yup - removed ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -698,14 +689,11 @@ public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing() thro config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), mockTime ); - - final Consumer<byte[], byte[]> mockConsumer = EasyMock.createNiceMock(Consumer.class); - expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty()); + final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - EasyMock.replay(consumerGroupMetadata, mockConsumer); - final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(mockConsumer); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(consumer); Review Comment: Yup - renamed! ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -3094,34 +2941,27 @@ public void shouldNotCommitNonRunningNonRestoringTasks() { final TaskId taskId2 = new TaskId(0, 2); final TaskId taskId3 = new TaskId(0, 3); - expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes(); - expect(task1.id()).andReturn(taskId1).anyTimes(); - expect(task2.state()).andReturn(Task.State.RESTORING).anyTimes(); - expect(task2.id()).andReturn(taskId2).anyTimes(); - expect(task3.state()).andReturn(Task.State.CREATED).anyTimes(); - expect(task3.id()).andReturn(taskId3).anyTimes(); + when(task1.state()).thenReturn(Task.State.RUNNING); + when(task2.state()).thenReturn(Task.State.RESTORING); + when(task3.state()).thenReturn(Task.State.CREATED); - expect(taskManager.allOwnedTasks()).andReturn(mkMap( + when(taskManager.allOwnedTasks()).thenReturn(mkMap( mkEntry(taskId1, task1), mkEntry(taskId2, task2), mkEntry(taskId3, task3) - )).anyTimes(); + )); // expect not to try and commit task3, because it's not running. - expect(taskManager.commit(mkSet(task1, task2))).andReturn(2).times(1); Review Comment: Done! ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ########## @@ -361,41 +339,30 @@ public void shouldRecycleStoreAndReregisterChangelog() { assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition)); assertThat(stateMgr.getStore(persistentStoreName), equalTo(store)); - reset(context, store); - context.uninitialize(); - expect(store.name()).andStubReturn(persistentStoreName); - replay(context, store); + when(store.name()).thenReturn(persistentStoreName); stateMgr.registerStateStores(singletonList(store), context); - verify(context, store); + verify(context, times(2)).uninitialize(); assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); } @Test public void shouldClearStoreCache() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); - reset(storeMetadata); - final CachingStore store = EasyMock.createMock(CachingStore.class); - expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition); - expect(storeMetadata.store()).andStubReturn(store); - expect(store.name()).andStubReturn(persistentStoreName); - store.clearCache(); Review Comment: This is my miss, apologies. Added as a verification. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2842,45 +2737,31 @@ void runOnceWithoutProcessingThreads() { thread.setStreamsUncaughtExceptionHandler((e, b) -> exceptionHandlerInvoked.set(true)); thread.run(); - verify(taskManager); assertThat(exceptionHandlerInvoked.get(), is(true)); } @Test @SuppressWarnings("unchecked") public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() { final StreamsConfig config = new StreamsConfig(configProps(false)); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - consumer.subscribe((Collection<String>) anyObject(), anyObject()); - EasyMock.expectLastCall().anyTimes(); - consumer.unsubscribe(); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(consumerGroupMetadata); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + doNothing().when(consumer).subscribe((Collection<String>) any(), any()); + doNothing().when(consumer).unsubscribe(); Review Comment: Done! ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -570,27 +563,25 @@ public void shouldAlsoPurgeWhenNothingGetsCommitted() { props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); final StreamsConfig config = new StreamsConfig(props); - final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); + @SuppressWarnings("unchecked") + final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskId taskId = new TaskId(0, 0); final Task runningTask = statelessTask(taskId) .inState(Task.State.RUNNING).build(); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask)); - expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(0); - taskManager.maybePurgeCommittedRecords(); - EasyMock.replay(consumer, consumerGroupMetadata, taskManager); + final TaskManager taskManager = mock(TaskManager.class); + when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, runningTask)); + when(taskManager.commit(Collections.singleton(runningTask))).thenReturn(0); + doNothing().when(taskManager).maybePurgeCommittedRecords(); Review Comment: Okay, verified in the latest revision! ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ########## @@ -125,11 +122,11 @@ public class ProcessorStateManagerTest { private OffsetCheckpoint checkpoint; private StateDirectory stateDirectory; - @Mock(type = MockType.NICE) + @Mock private StateStore store; Review Comment: Mine does as well - removed ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2701,32 +2622,20 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation() { @SuppressWarnings("unchecked") public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() { final StreamsConfig config = new StreamsConfig(configProps(false)); - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - consumer.subscribe((Collection<String>) anyObject(), anyObject()); - EasyMock.expectLastCall().anyTimes(); - consumer.unsubscribe(); Review Comment: According to Mockito it is never invoked ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ########## @@ -311,48 +303,34 @@ public void shouldRestoreTimestampedStoreWithConverter() { @Test public void shouldUnregisterChangelogsDuringClose() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); - reset(storeMetadata); - final StateStore store = EasyMock.createMock(StateStore.class); - expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition); - expect(storeMetadata.store()).andStubReturn(store); - expect(store.name()).andStubReturn(persistentStoreName); - - context.uninitialize(); - store.init((StateStoreContext) context, store); - replay(storeMetadata, context, store); + final StateStore store = mock(StateStore.class); + when(store.name()).thenReturn(persistentStoreName); stateMgr.registerStateStores(singletonList(store), context); - verify(context, store); + + verify(context).uninitialize(); + verify(store).init((StateStoreContext) context, store); stateMgr.registerStore(store, noopStateRestoreCallback, null); assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); - reset(store); - expect(store.name()).andStubReturn(persistentStoreName); - store.close(); - replay(store); + when(store.name()).thenReturn(persistentStoreName); Review Comment: Yup - removed ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2920,26 +2801,18 @@ void runOnceWithoutProcessingThreads() { thread.setState(StreamThread.State.STARTING); thread.runLoop(); - - verify(taskManager); } @Test @SuppressWarnings("unchecked") public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask() { final StreamsConfig config = new StreamsConfig(configProps(true)); - - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - consumer.subscribe((Collection<String>) anyObject(), anyObject()); - EasyMock.expectLastCall().anyTimes(); - consumer.unsubscribe(); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(consumerGroupMetadata); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + doNothing().when(consumer).subscribe((Collection<String>) any(), any()); Review Comment: Done! ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2997,27 +2863,19 @@ void runOnceWithoutProcessingThreads() { thread.setState(StreamThread.State.STARTING); thread.runLoop(); - - verify(taskManager); - verify(consumer); } @Test @SuppressWarnings("unchecked") public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() { final StreamsConfig config = new StreamsConfig(configProps(true)); - - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); + when(taskManager.producerClientIds()).thenReturn(Collections.emptySet()); final Consumer<byte[], byte[]> consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); - expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); - consumer.subscribe((Collection<String>) anyObject(), anyObject()); - EasyMock.expectLastCall().anyTimes(); - consumer.unsubscribe(); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(consumerGroupMetadata); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + doNothing().when(consumer).subscribe((Collection<String>) any(), any()); Review Comment: Done! ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -3494,32 +3323,27 @@ private StreamThread setUpThread(final Properties streamsConfigProps) { } private TaskManager mockTaskManager(final Task runningTask) { - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + final TaskManager taskManager = mock(TaskManager.class); final TaskId taskId = new TaskId(0, 0); - expect(runningTask.state()).andStubReturn(Task.State.RUNNING); - expect(runningTask.id()).andStubReturn(taskId); - expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask)); - expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1); + when(runningTask.state()).thenReturn(Task.State.RUNNING); + when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, runningTask)); return taskManager; } private TaskManager mockTaskManagerPurge(final int numberOfPurges) { final Task runningTask = mock(Task.class); final TaskManager taskManager = mockTaskManager(runningTask); - taskManager.maybePurgeCommittedRecords(); - EasyMock.expectLastCall().times(numberOfPurges); Review Comment: Done! ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2997,27 +2863,19 @@ void runOnceWithoutProcessingThreads() { thread.setState(StreamThread.State.STARTING); thread.runLoop(); - - verify(taskManager); - verify(consumer); } @Test @SuppressWarnings("unchecked") public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() { final StreamsConfig config = new StreamsConfig(configProps(true)); - - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); + final TaskManager taskManager = mock(TaskManager.class); + when(taskManager.producerClientIds()).thenReturn(Collections.emptySet()); Review Comment: Done! ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -3494,32 +3323,27 @@ private StreamThread setUpThread(final Properties streamsConfigProps) { } private TaskManager mockTaskManager(final Task runningTask) { - final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + final TaskManager taskManager = mock(TaskManager.class); final TaskId taskId = new TaskId(0, 0); - expect(runningTask.state()).andStubReturn(Task.State.RUNNING); - expect(runningTask.id()).andStubReturn(taskId); - expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId, runningTask)); - expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1); + when(runningTask.state()).thenReturn(Task.State.RUNNING); + when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, runningTask)); return taskManager; } private TaskManager mockTaskManagerPurge(final int numberOfPurges) { final Task runningTask = mock(Task.class); final TaskManager taskManager = mockTaskManager(runningTask); - taskManager.maybePurgeCommittedRecords(); - EasyMock.expectLastCall().times(numberOfPurges); - EasyMock.replay(taskManager, runningTask); + doNothing().when(taskManager).maybePurgeCommittedRecords(); return taskManager; } private TaskManager mockTaskManagerCommit(final int commits) { final Task runningTask = mock(Task.class); final TaskManager taskManager = mockTaskManager(runningTask); - expect(taskManager.commit(Collections.singleton(runningTask))).andReturn(commits).times(1); Review Comment: Addressed in subsequent commits -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org