cadonna commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1677593645


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -185,6 +236,10 @@ public boolean isTerminalNode() {
         return children.isEmpty();
     }
 
+    public void setProcessingExceptionHandler(final ProcessingExceptionHandler 
processingExceptionHandler) {
+        this.processingExceptionHandler = processingExceptionHandler;
+    }

Review Comment:
   I try to avoid adding methods just for test setup. 
   Looking at the tests in `ProcessorNodeTest`, I do not see why you need this 
method. You can equally well pass the processing exception handler into 
`init()`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -34,36 +40,106 @@
 import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+@ExtendWith(MockitoExtension.class)
 public class ProcessorNodeTest {
+    private static final String TOPIC = "topic";
+    private static final int PARTITION = 0;
+    private static final Long OFFSET = 0L;
+    private static final Long TIMESTAMP = 0L;
+    private static final TaskId TASK_ID = new TaskId(0, 0);
+    private static final String NAME = "name";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
+    @Test
+    public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>(NAME, new ProcessingExceptionProcessor(), 
Collections.emptySet());
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
 internalProcessorContext));
+        node.init(internalProcessorContext);
+
+        final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
+            () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
+
+        assertEquals("java.lang.RuntimeException: Processing exception should 
be caught and handled by the processing exception handler.",
+            failedProcessingException.getMessage());
+
+        assertTrue(failedProcessingException.getCause() instanceof 
RuntimeException);
+        assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
+            failedProcessingException.getCause().getMessage());
+    }
+
+    @Test
+    public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() 
{
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>(NAME, new ProcessingExceptionProcessor(), 
Collections.emptySet());
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
 internalProcessorContext));
+        node.init(internalProcessorContext);
+
+        assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, 
TIMESTAMP)));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldNotHandleInternalFailedProcessingException() {
+        final ProcessingExceptionHandler processingExceptionHandler = 
mock(ProcessingExceptionHandler.class);
+
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>(NAME, new 
FailedProcessingExceptionProcessor(), Collections.emptySet());
+        node.setProcessingExceptionHandler(processingExceptionHandler);
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mock(InternalProcessorContext.class);

Review Comment:
   Why do you not use `mockInternalProcessorContext()` here?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -174,6 +199,32 @@ public void process(final Record<KIn, VIn> record) {
                     keyClass,
                     valueClass),
                 e);
+        } catch (final FailedProcessingException | TaskCorruptedException e) {

Review Comment:
   Are you sure, you do not also need to consider `TaskMigratedException` here? 
I think, you need. Could you please also check other exceptions?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -34,36 +40,106 @@
 import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+@ExtendWith(MockitoExtension.class)
 public class ProcessorNodeTest {
+    private static final String TOPIC = "topic";
+    private static final int PARTITION = 0;
+    private static final Long OFFSET = 0L;
+    private static final Long TIMESTAMP = 0L;
+    private static final TaskId TASK_ID = new TaskId(0, 0);
+    private static final String NAME = "name";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
+    @Test
+    public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>(NAME, new ProcessingExceptionProcessor(), 
Collections.emptySet());
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
 internalProcessorContext));
+        node.init(internalProcessorContext);
+
+        final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
+            () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
+
+        assertEquals("java.lang.RuntimeException: Processing exception should 
be caught and handled by the processing exception handler.",
+            failedProcessingException.getMessage());
+
+        assertTrue(failedProcessingException.getCause() instanceof 
RuntimeException);
+        assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
+            failedProcessingException.getCause().getMessage());
+    }
+
+    @Test
+    public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() 
{

Review Comment:
   Please rename to 
`shouldNotThrowWhenProcessingExceptionHandlerResponsesWithContinue` or similar.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -34,36 +40,106 @@
 import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+@ExtendWith(MockitoExtension.class)
 public class ProcessorNodeTest {
+    private static final String TOPIC = "topic";
+    private static final int PARTITION = 0;
+    private static final Long OFFSET = 0L;
+    private static final Long TIMESTAMP = 0L;
+    private static final TaskId TASK_ID = new TaskId(0, 0);
+    private static final String NAME = "name";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
+    @Test
+    public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>(NAME, new ProcessingExceptionProcessor(), 
Collections.emptySet());
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
 internalProcessorContext));
+        node.init(internalProcessorContext);
+
+        final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
+            () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
+
+        assertEquals("java.lang.RuntimeException: Processing exception should 
be caught and handled by the processing exception handler.",
+            failedProcessingException.getMessage());
+
+        assertTrue(failedProcessingException.getCause() instanceof 
RuntimeException);
+        assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
+            failedProcessingException.getCause().getMessage());
+    }
+
+    @Test
+    public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() 
{
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>(NAME, new ProcessingExceptionProcessor(), 
Collections.emptySet());
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
 internalProcessorContext));
+        node.init(internalProcessorContext);
+
+        assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, 
TIMESTAMP)));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldNotHandleInternalFailedProcessingException() {
+        final ProcessingExceptionHandler processingExceptionHandler = 
mock(ProcessingExceptionHandler.class);
+
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>(NAME, new 
FailedProcessingExceptionProcessor(), Collections.emptySet());
+        node.setProcessingExceptionHandler(processingExceptionHandler);
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mock(InternalProcessorContext.class);
+        when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
+        when(internalProcessorContext.metrics()).thenReturn(new 
StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, 
new MockTime()));
+        node.init(internalProcessorContext);
+
+        final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
+            () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
+
+        assertEquals("java.lang.RuntimeException: FailedProcessingException 
should not be caught and handled by the processing exception handler.",
+            failedProcessingException.getMessage());

Review Comment:
   I would prefer to verify the cause and the message of the cause. I am not 
sure the format of the message when the exception does not have its own message 
but uses the message of the cause is standardized in Java. Different java 
implementations might have slightly different formats that would fail the test. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -34,36 +40,106 @@
 import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+@ExtendWith(MockitoExtension.class)
 public class ProcessorNodeTest {
+    private static final String TOPIC = "topic";
+    private static final int PARTITION = 0;
+    private static final Long OFFSET = 0L;
+    private static final Long TIMESTAMP = 0L;
+    private static final TaskId TASK_ID = new TaskId(0, 0);
+    private static final String NAME = "name";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
+    @Test
+    public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {

Review Comment:
   Could you please rename this method to 
`shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerResponsesWithFail()`?
 Or something similar that mentions the correct exception and when it is thrown.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -34,36 +40,106 @@
 import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+@ExtendWith(MockitoExtension.class)
 public class ProcessorNodeTest {
+    private static final String TOPIC = "topic";
+    private static final int PARTITION = 0;
+    private static final Long OFFSET = 0L;
+    private static final Long TIMESTAMP = 0L;
+    private static final TaskId TASK_ID = new TaskId(0, 0);
+    private static final String NAME = "name";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
+    @Test
+    public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>(NAME, new ProcessingExceptionProcessor(), 
Collections.emptySet());
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
 internalProcessorContext));
+        node.init(internalProcessorContext);
+
+        final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
+            () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
+
+        assertEquals("java.lang.RuntimeException: Processing exception should 
be caught and handled by the processing exception handler.",
+            failedProcessingException.getMessage());

Review Comment:
   I would remove this since it duplicates verifications. I think it is enough 
to verify cause and the message of the cause as you do below. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to