cadonna commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1555453187


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -293,17 +288,12 @@ public void cleanup() throws IOException {
             task = null;
         }
         Utils.delete(BASE_DIR);
-        mockito.finishMocking();
     }
 
     @Test
-    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws 
IOException {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false);
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
-        stateManager.registerStore(stateStore, 
stateStore.stateRestoreCallback, null);
-        EasyMock.expectLastCall();
-        EasyMock.replay(stateDirectory, stateManager);
+    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() {
+        stateDirectory = mock(StateDirectory.class);

Review Comment:
   I am wondering whether we leak resources, if we assign a mock to 
`stateDirectory` without closing the state directory before.
   In `setup()` an actual state directory is created with 
   ```
   stateDirectory = new StateDirectory(createConfig("100"), new MockTime(), 
true, false);  
   ```
   It has been there before this PR, but we should fix it.
   
   You can either close the state directory before the mock is assigned or 
remove the creation of the state directory from `setup()` and create it in each 
test method that uses it. 
   
   Same is true for the other occurrences in this test class.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1912,42 +1823,32 @@ public void shouldThrowIfPostCommittingOnIllegalState() 
{
 
     @Test
     public void shouldSkipCheckpointingSuspendedCreatedTask() {
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Should not have 
tried to checkpoint"));
-        
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
-
         task = createStatefulTask(createConfig("100"), true);
         task.suspend();
         task.postCommit(true);
+
+        verify(stateManager, never()).checkpoint();
     }
 
     @Test
     public void shouldCheckpointForSuspendedTask() {
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(singletonMap(partition1, 1L));
-        
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-        EasyMock.replay(stateManager, recordCollector);
+        when(stateManager.changelogOffsets())
+                .thenReturn(singletonMap(partition1, 1L));

Review Comment:
   nit:
   ```suggestion
           
when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1, 1L));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -411,25 +384,22 @@ public void seek(final TopicPartition partition, final 
long offset) {
 
         shouldNotSeek.set(new AssertionError("Should not seek"));
 
+        @SuppressWarnings("unchecked")
         final java.util.function.Consumer<Set<TopicPartition>> resetter =
-            EasyMock.mock(java.util.function.Consumer.class);
-        resetter.accept(Collections.singleton(partition1));
-        EasyMock.expectLastCall();
-        EasyMock.replay(resetter);
+            mock(java.util.function.Consumer.class);
+        doNothing().when(resetter).accept(Collections.singleton(partition1));

Review Comment:
   This should be a verification. However, there is an issue here. If I add it 
to the verifications with 
   ```java
   verify(resetter).accept(Collections.singleton(partition1));
   ```
   the test fails.
   The reason is that when ` accept()` is called, the argument is indeed 
`Collections.singleton(partition1)` but after the call the collection is 
cleared:
   
   
https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L955
   
   At the time the call is verified the argument changed. Apparently, Mockito 
stores the reference to the argument in the invocation.
   
   One way to solve this is the following:
   
   ```java
   final java.util.function.Consumer<Set<TopicPartition>> resetter =
               mock(java.util.function.Consumer.class);
   final Set<TopicPartition> partitionsAtCall = new HashSet<>();
   doAnswer(
       invocation -> {
           partitionsAtCall.addAll(invocation.getArgument(0));
           return null;
       }
   ).when(resetter).accept(Collections.singleton(partition1));
   
   task.initializeIfNeeded();
   task.completeRestoration(resetter);
   
   // because we mocked the `resetter` positions don't change
   assertThat(consumer.position(partition1), equalTo(5L));
   assertThat(consumer.position(partition2), equalTo(15L));
   assertThat(partitionsAtCall, equalTo(Collections.singleton(partition1)));
   ```
   You could also just keep your code and trust in `StrictStubs`. 
   
   Either way, please put a comment to explain why we cannot simply verify the 
call.
   
   



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2229,24 +2070,17 @@ public void shouldThrowOnCloseCleanFlushError() {
         final double expectedCloseTaskMetric = 0.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
 
-        EasyMock.verify(stateManager);
-        EasyMock.reset(stateManager);
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.replay(stateManager);
+        verify(stateManager).flush();
+        verify(stateManager).checkpoint();
+        verify(stateManager, never()).close();
     }
 
     @Test
     public void shouldThrowOnCloseCleanCheckpointError() {
         final long offset = 54300L;
-        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap());
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().andThrow(new 
ProcessorStateException("KABOOM!")).anyTimes();
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Close should 
not be called!")).anyTimes();
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(singletonMap(partition1, offset));
-        EasyMock.replay(recordCollector, stateManager);
+        doThrow(new 
ProcessorStateException("KABOOM!")).when(stateManager).checkpoint();
+        when(stateManager.changelogOffsets())
+                .thenReturn(singletonMap(partition1, offset));

Review Comment:
   nit:
   ```suggestion
           
when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1, 
offset));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2265,36 +2099,22 @@ public void shouldThrowOnCloseCleanCheckpointError() {
         final double expectedCloseTaskMetric = 0.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
 
-        EasyMock.verify(stateManager);
-        EasyMock.reset(stateManager);
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        stateManager.close();
-        EasyMock.expectLastCall();
-        EasyMock.replay(stateManager);
+        verify(stateManager, never()).close();
     }
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
         task = createOptimizedStatefulTask(createConfig("100"), consumer);
         task.initializeIfNeeded();
 
         task.suspend();
         task.closeDirty();

Review Comment:
   Use the following to be more explicit:
   ```java
   assertDoesNotThrow(() -> task.closeDirty());
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1958,64 +1859,58 @@ public void 
shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
 
         task.suspend();
         task.postCommit(false);
-        EasyMock.verify(stateManager);
+
+        verify(stateManager).checkpoint(); // checkpoint should only be called 
once

Review Comment:
   Is this comment necessary? If you want to make it more explicit use
   ```java
   verify(stateManager, times(1)).checkpoint();
   ```
   instead.
   
   But I think, `verify(stateManager).checkpoint()` is clear enough. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to