cadonna commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1650786044


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -99,30 +81,57 @@ public void 
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
     public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-        node.setProcessingExceptionHandler(new 
LogAndFailProcessingExceptionHandler());
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL));
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
         node.init(internalProcessorContext);
 
-        assertThrows(StreamsException.class, () -> node.process(new 
Record<>("key", "value", 0)));
+        final StreamsException processingException = 
assertThrows(StreamsException.class,
+            () -> node.process(new Record<>("key", "value", 0)));
+
+        assertEquals("Processing exception handler is set to fail upon" +
+            " a processing error. If you would rather have the streaming 
pipeline" +
+            " continue after a processing error, please set the " +
+            PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", 
processingException.getMessage());
+
+        assertTrue(processingException.getCause() instanceof RuntimeException);
+        assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
+            processingException.getCause().getMessage());
     }
 
     @Test
     public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() 
{
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-        node.setProcessingExceptionHandler(new 
LogAndContinueProcessingExceptionHandler());
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE));
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
         node.init(internalProcessorContext);
 
         assertDoesNotThrow(() -> node.process(new Record<>("key", "value", 
0)));
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void shouldNotHandleStreamsExceptionAsProcessingException() {
+        final ProcessingExceptionHandler processingExceptionHandler = 
spy(ProcessingExceptionHandler.class);

Review Comment:
   Please do not use spies! They are bad practice, because they do not really 
decouple the code to test from other code. Here a 
`mock(ProcessingExceptionHandler.class)` should be fine since 
`ProcessingExceptionHandler` is an interface.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -99,30 +81,57 @@ public void 
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
     public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-        node.setProcessingExceptionHandler(new 
LogAndFailProcessingExceptionHandler());
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL));
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
         node.init(internalProcessorContext);
 
-        assertThrows(StreamsException.class, () -> node.process(new 
Record<>("key", "value", 0)));
+        final StreamsException processingException = 
assertThrows(StreamsException.class,
+            () -> node.process(new Record<>("key", "value", 0)));
+
+        assertEquals("Processing exception handler is set to fail upon" +
+            " a processing error. If you would rather have the streaming 
pipeline" +
+            " continue after a processing error, please set the " +
+            PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", 
processingException.getMessage());
+
+        assertTrue(processingException.getCause() instanceof RuntimeException);
+        assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
+            processingException.getCause().getMessage());
     }
 
     @Test
     public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() 
{
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-        node.setProcessingExceptionHandler(new 
LogAndContinueProcessingExceptionHandler());
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE));
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
         node.init(internalProcessorContext);
 
         assertDoesNotThrow(() -> node.process(new Record<>("key", "value", 
0)));
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void shouldNotHandleStreamsExceptionAsProcessingException() {
+        final ProcessingExceptionHandler processingExceptionHandler = 
spy(ProcessingExceptionHandler.class);
+
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>("name", new StreamsExceptionProcessor(), 
Collections.emptySet());
+        node.setProcessingExceptionHandler(processingExceptionHandler);
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mock(InternalProcessorContext.class);
+        when(internalProcessorContext.taskId()).thenReturn(new TaskId(0, 0));
+        when(internalProcessorContext.metrics()).thenReturn(new 
StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, 
new MockTime()));
         node.init(internalProcessorContext);
 
-        assertThrows("Streams exception", StreamsException.class,
+        final StreamsException streamsException = 
assertThrows(StreamsException.class,
             () -> node.process(new Record<>("key", "value", 0)));
+
+        assertEquals("Streams exception should not be caught and handled by 
the processing exception handler.",
+            streamsException.getMessage());
+

Review Comment:
   nit:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -63,6 +95,36 @@ public void 
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
+    @Test
+    public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
+        node.setProcessingExceptionHandler(new 
LogAndFailProcessingExceptionHandler());

Review Comment:
   I am fine if you want to use `ProcessingExceptionHandlerMockTest` but could 
you please rename it to `ProcessingExceptionHandlerMock` so that it is clear 
that it is a mock and not a test. 
   Could you also use constants in `mockInternalProcessorContext()` for the 
returning values of the stubs and correspondingly use the same constants in the 
assertions in `handle()` of `ProcessingExceptionHandlerMock`. It is not 
important that `context.topic()` returns `"topic"`, it is important that it 
returns the same value as set in the processor context.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -63,6 +95,36 @@ public void 
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
+    @Test
+    public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
+        node.setProcessingExceptionHandler(new 
LogAndFailProcessingExceptionHandler());
+        node.init(internalProcessorContext);
+
+        assertThrows(StreamsException.class, () -> node.process(new 
Record<>("key", "value", 0)));
+    }
+
+    @Test
+    public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() 
{
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
+        node.setProcessingExceptionHandler(new 
LogAndContinueProcessingExceptionHandler());
+        node.init(internalProcessorContext);
+
+        assertDoesNotThrow(() -> node.process(new Record<>("key", "value", 
0)));
+    }
+
+    @Test
+    public void shouldNotHandleStreamsExceptionAsProcessingException() {
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>("name", new StreamsExceptionProcessor(), 
Collections.emptySet());
+        node.init(internalProcessorContext);

Review Comment:
   Sorry, I probably made a mistake while writing my comment. I wanted to say 
to verify that `handle()` is NOT called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to