sebastienviale commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1650931604


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -99,30 +81,57 @@ public void 
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
     public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-        node.setProcessingExceptionHandler(new 
LogAndFailProcessingExceptionHandler());
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL));
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
         node.init(internalProcessorContext);
 
-        assertThrows(StreamsException.class, () -> node.process(new 
Record<>("key", "value", 0)));
+        final StreamsException processingException = 
assertThrows(StreamsException.class,
+            () -> node.process(new Record<>("key", "value", 0)));
+
+        assertEquals("Processing exception handler is set to fail upon" +
+            " a processing error. If you would rather have the streaming 
pipeline" +
+            " continue after a processing error, please set the " +
+            PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", 
processingException.getMessage());
+
+        assertTrue(processingException.getCause() instanceof RuntimeException);
+        assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
+            processingException.getCause().getMessage());
     }
 
     @Test
     public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() 
{
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-        node.setProcessingExceptionHandler(new 
LogAndContinueProcessingExceptionHandler());
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE));
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
         node.init(internalProcessorContext);
 
         assertDoesNotThrow(() -> node.process(new Record<>("key", "value", 
0)));
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void shouldNotHandleStreamsExceptionAsProcessingException() {
+        final ProcessingExceptionHandler processingExceptionHandler = 
spy(ProcessingExceptionHandler.class);

Review Comment:
   I changed for mock(ProcessingExceptionHandler.class) to check if handle() 
method is called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to