clolov commented on code in PR #13932:
URL: https://github.com/apache/kafka/pull/13932#discussion_r1397383304


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -203,6 +199,11 @@ public static Collection<Object[]> data() {
     @SuppressWarnings("unchecked")
     private final Consumer<byte[], byte[]> mainConsumer = (Consumer<byte[], 
byte[]>) Mockito.mock(Consumer.class);
 
+    @Mock
+    private Consumer<byte[], byte[]> consumer;
+
+    private StreamsMetadataState streamsMetadataState;
+

Review Comment:
   Yup - removed



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -698,14 +689,11 @@ public void 
shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing() thro
             config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
             mockTime
         );
-        
-        final Consumer<byte[], byte[]> mockConsumer = 
EasyMock.createNiceMock(Consumer.class);
-        
expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty());
+
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        
expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        EasyMock.replay(consumerGroupMetadata, mockConsumer);
-        final EasyMockConsumerClientSupplier mockClientSupplier = new 
EasyMockConsumerClientSupplier(mockConsumer);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final EasyMockConsumerClientSupplier mockClientSupplier = new 
EasyMockConsumerClientSupplier(consumer);

Review Comment:
   Yup - renamed!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3094,34 +2941,27 @@ public void 
shouldNotCommitNonRunningNonRestoringTasks() {
         final TaskId taskId2 = new TaskId(0, 2);
         final TaskId taskId3 = new TaskId(0, 3);
 
-        expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
-        expect(task1.id()).andReturn(taskId1).anyTimes();
-        expect(task2.state()).andReturn(Task.State.RESTORING).anyTimes();
-        expect(task2.id()).andReturn(taskId2).anyTimes();
-        expect(task3.state()).andReturn(Task.State.CREATED).anyTimes();
-        expect(task3.id()).andReturn(taskId3).anyTimes();
+        when(task1.state()).thenReturn(Task.State.RUNNING);
+        when(task2.state()).thenReturn(Task.State.RESTORING);
+        when(task3.state()).thenReturn(Task.State.CREATED);
 
-        expect(taskManager.allOwnedTasks()).andReturn(mkMap(
+        when(taskManager.allOwnedTasks()).thenReturn(mkMap(
             mkEntry(taskId1, task1),
             mkEntry(taskId2, task2),
             mkEntry(taskId3, task3)
-        )).anyTimes();
+        ));
 
         // expect not to try and commit task3, because it's not running.
-        expect(taskManager.commit(mkSet(task1, task2))).andReturn(2).times(1);

Review Comment:
   Done!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -361,41 +339,30 @@ public void shouldRecycleStoreAndReregisterChangelog() {
         
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
         assertThat(stateMgr.getStore(persistentStoreName), equalTo(store));
 
-        reset(context, store);
-        context.uninitialize();
-        expect(store.name()).andStubReturn(persistentStoreName);
-        replay(context, store);
+        when(store.name()).thenReturn(persistentStoreName);
 
         stateMgr.registerStateStores(singletonList(store), context);
 
-        verify(context, store);
+        verify(context, times(2)).uninitialize();
         
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
     }
 
     @Test
     public void shouldClearStoreCache() {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-        reset(storeMetadata);
-        final CachingStore store = EasyMock.createMock(CachingStore.class);
-        
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-        expect(storeMetadata.store()).andStubReturn(store);
-        expect(store.name()).andStubReturn(persistentStoreName);
-        store.clearCache();

Review Comment:
   This is my miss, apologies. Added as a verification.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2842,45 +2737,31 @@ void runOnceWithoutProcessingThreads() {
         thread.setStreamsUncaughtExceptionHandler((e, b) -> 
exceptionHandlerInvoked.set(true));
         thread.run();
 
-        verify(taskManager);
         assertThat(exceptionHandlerInvoked.get(), is(true));
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void 
shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() {
         final StreamsConfig config = new StreamsConfig(configProps(false));
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
-        consumer.unsubscribe();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.replay(consumerGroupMetadata);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        doNothing().when(consumer).subscribe((Collection<String>) any(), 
any());
+        doNothing().when(consumer).unsubscribe();

Review Comment:
   Done!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -570,27 +563,25 @@ public void shouldAlsoPurgeWhenNothingGetsCommitted() {
         props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, 
Long.toString(purgeInterval));
 
         final StreamsConfig config = new StreamsConfig(props);
-        final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
         final TaskId taskId = new TaskId(0, 0);
         final Task runningTask = statelessTask(taskId)
             .inState(Task.State.RUNNING).build();
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId,
 runningTask));
-        
expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(0);
-        taskManager.maybePurgeCommittedRecords();
-        EasyMock.replay(consumer, consumerGroupMetadata, taskManager);
+        final TaskManager taskManager = mock(TaskManager.class);
+        
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
+        
when(taskManager.commit(Collections.singleton(runningTask))).thenReturn(0);
+        doNothing().when(taskManager).maybePurgeCommittedRecords();

Review Comment:
   Okay, verified in the latest revision!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -125,11 +122,11 @@ public class ProcessorStateManagerTest {
     private OffsetCheckpoint checkpoint;
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateStore store;

Review Comment:
   Mine does as well - removed



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2701,32 +2622,20 @@ public void 
shouldThrowTaskMigratedExceptionHandlingRevocation() {
     @SuppressWarnings("unchecked")
     public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() {
         final StreamsConfig config = new StreamsConfig(configProps(false));
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
-        consumer.unsubscribe();

Review Comment:
   According to Mockito it is never invoked



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -311,48 +303,34 @@ public void shouldRestoreTimestampedStoreWithConverter() {
     @Test
     public void shouldUnregisterChangelogsDuringClose() {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-        reset(storeMetadata);
-        final StateStore store = EasyMock.createMock(StateStore.class);
-        
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-        expect(storeMetadata.store()).andStubReturn(store);
-        expect(store.name()).andStubReturn(persistentStoreName);
-
-        context.uninitialize();
-        store.init((StateStoreContext) context, store);
-        replay(storeMetadata, context, store);
+        final StateStore store = mock(StateStore.class);
+        when(store.name()).thenReturn(persistentStoreName);
 
         stateMgr.registerStateStores(singletonList(store), context);
-        verify(context, store);
+
+        verify(context).uninitialize();
+        verify(store).init((StateStoreContext) context, store);
 
         stateMgr.registerStore(store, noopStateRestoreCallback, null);
         
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
-        reset(store);
-        expect(store.name()).andStubReturn(persistentStoreName);
-        store.close();
-        replay(store);
+        when(store.name()).thenReturn(persistentStoreName);

Review Comment:
   Yup - removed



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2920,26 +2801,18 @@ void runOnceWithoutProcessingThreads() {
 
         thread.setState(StreamThread.State.STARTING);
         thread.runLoop();
-
-        verify(taskManager);
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void 
shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask() {
         final StreamsConfig config = new StreamsConfig(configProps(true));
-
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
-        consumer.unsubscribe();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.replay(consumerGroupMetadata);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        doNothing().when(consumer).subscribe((Collection<String>) any(), 
any());

Review Comment:
   Done!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2997,27 +2863,19 @@ void runOnceWithoutProcessingThreads() {
 
         thread.setState(StreamThread.State.STARTING);
         thread.runLoop();
-
-        verify(taskManager);
-        verify(consumer);
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void 
shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() {
         final StreamsConfig config = new StreamsConfig(configProps(true));
-
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
+        
when(taskManager.producerClientIds()).thenReturn(Collections.emptySet());
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
-        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
-        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
-        consumer.subscribe((Collection<String>) anyObject(), anyObject());
-        EasyMock.expectLastCall().anyTimes();
-        consumer.unsubscribe();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.replay(consumerGroupMetadata);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        doNothing().when(consumer).subscribe((Collection<String>) any(), 
any());

Review Comment:
   Done!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3494,32 +3323,27 @@ private StreamThread setUpThread(final Properties 
streamsConfigProps) {
     }
 
     private TaskManager mockTaskManager(final Task runningTask) {
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
+        final TaskManager taskManager = mock(TaskManager.class);
         final TaskId taskId = new TaskId(0, 0);
 
-        expect(runningTask.state()).andStubReturn(Task.State.RUNNING);
-        expect(runningTask.id()).andStubReturn(taskId);
-        
expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId,
 runningTask));
-        
expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1);
+        when(runningTask.state()).thenReturn(Task.State.RUNNING);
+        
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
         return taskManager;
     }
 
     private TaskManager mockTaskManagerPurge(final int numberOfPurges) {
         final Task runningTask = mock(Task.class);
         final TaskManager taskManager = mockTaskManager(runningTask);
 
-        taskManager.maybePurgeCommittedRecords();
-        EasyMock.expectLastCall().times(numberOfPurges);

Review Comment:
   Done!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2997,27 +2863,19 @@ void runOnceWithoutProcessingThreads() {
 
         thread.setState(StreamThread.State.STARTING);
         thread.runLoop();
-
-        verify(taskManager);
-        verify(consumer);
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void 
shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() {
         final StreamsConfig config = new StreamsConfig(configProps(true));
-
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final TaskManager taskManager = mock(TaskManager.class);
+        
when(taskManager.producerClientIds()).thenReturn(Collections.emptySet());

Review Comment:
   Done!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3494,32 +3323,27 @@ private StreamThread setUpThread(final Properties 
streamsConfigProps) {
     }
 
     private TaskManager mockTaskManager(final Task runningTask) {
-        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
+        final TaskManager taskManager = mock(TaskManager.class);
         final TaskId taskId = new TaskId(0, 0);
 
-        expect(runningTask.state()).andStubReturn(Task.State.RUNNING);
-        expect(runningTask.id()).andStubReturn(taskId);
-        
expect(taskManager.allOwnedTasks()).andStubReturn(Collections.singletonMap(taskId,
 runningTask));
-        
expect(taskManager.commit(Collections.singleton(runningTask))).andStubReturn(1);
+        when(runningTask.state()).thenReturn(Task.State.RUNNING);
+        
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
         return taskManager;
     }
 
     private TaskManager mockTaskManagerPurge(final int numberOfPurges) {
         final Task runningTask = mock(Task.class);
         final TaskManager taskManager = mockTaskManager(runningTask);
 
-        taskManager.maybePurgeCommittedRecords();
-        EasyMock.expectLastCall().times(numberOfPurges);
-        EasyMock.replay(taskManager, runningTask);
+        doNothing().when(taskManager).maybePurgeCommittedRecords();
         return taskManager;
     }
 
     private TaskManager mockTaskManagerCommit(final int commits) {
         final Task runningTask = mock(Task.class);
         final TaskManager taskManager = mockTaskManager(runningTask);
 
-        
expect(taskManager.commit(Collections.singleton(runningTask))).andReturn(commits).times(1);

Review Comment:
   Addressed in subsequent commits



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to