sebastienviale commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1650931604
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ########## @@ -99,30 +81,57 @@ public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() { public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() { final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ProcessingExceptionProcessor(), Collections.emptySet()); - node.setProcessingExceptionHandler(new LogAndFailProcessingExceptionHandler()); + node.setProcessingExceptionHandler(new ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL)); + + final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); node.init(internalProcessorContext); - assertThrows(StreamsException.class, () -> node.process(new Record<>("key", "value", 0))); + final StreamsException processingException = assertThrows(StreamsException.class, + () -> node.process(new Record<>("key", "value", 0))); + + assertEquals("Processing exception handler is set to fail upon" + + " a processing error. If you would rather have the streaming pipeline" + + " continue after a processing error, please set the " + + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", processingException.getMessage()); + + assertTrue(processingException.getCause() instanceof RuntimeException); + assertEquals("Processing exception should be caught and handled by the processing exception handler.", + processingException.getCause().getMessage()); } @Test public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() { final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ProcessingExceptionProcessor(), Collections.emptySet()); - node.setProcessingExceptionHandler(new LogAndContinueProcessingExceptionHandler()); + node.setProcessingExceptionHandler(new ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE)); + + final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); node.init(internalProcessorContext); assertDoesNotThrow(() -> node.process(new Record<>("key", "value", 0))); } @Test + @SuppressWarnings("unchecked") public void shouldNotHandleStreamsExceptionAsProcessingException() { + final ProcessingExceptionHandler processingExceptionHandler = spy(ProcessingExceptionHandler.class); Review Comment: I changed for mock(ProcessingExceptionHandler.class) to check if handle() method is called -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org