cadonna commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1650786044
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ########## @@ -99,30 +81,57 @@ public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() { public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() { final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ProcessingExceptionProcessor(), Collections.emptySet()); - node.setProcessingExceptionHandler(new LogAndFailProcessingExceptionHandler()); + node.setProcessingExceptionHandler(new ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL)); + + final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); node.init(internalProcessorContext); - assertThrows(StreamsException.class, () -> node.process(new Record<>("key", "value", 0))); + final StreamsException processingException = assertThrows(StreamsException.class, + () -> node.process(new Record<>("key", "value", 0))); + + assertEquals("Processing exception handler is set to fail upon" + + " a processing error. If you would rather have the streaming pipeline" + + " continue after a processing error, please set the " + + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", processingException.getMessage()); + + assertTrue(processingException.getCause() instanceof RuntimeException); + assertEquals("Processing exception should be caught and handled by the processing exception handler.", + processingException.getCause().getMessage()); } @Test public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() { final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ProcessingExceptionProcessor(), Collections.emptySet()); - node.setProcessingExceptionHandler(new LogAndContinueProcessingExceptionHandler()); + node.setProcessingExceptionHandler(new ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE)); + + final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); node.init(internalProcessorContext); assertDoesNotThrow(() -> node.process(new Record<>("key", "value", 0))); } @Test + @SuppressWarnings("unchecked") public void shouldNotHandleStreamsExceptionAsProcessingException() { + final ProcessingExceptionHandler processingExceptionHandler = spy(ProcessingExceptionHandler.class); Review Comment: Please do not use spies! They are bad practice, because they do not really decouple the code to test from other code. Here a `mock(ProcessingExceptionHandler.class)` should be fine since `ProcessingExceptionHandler` is an interface. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ########## @@ -99,30 +81,57 @@ public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() { public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() { final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ProcessingExceptionProcessor(), Collections.emptySet()); - node.setProcessingExceptionHandler(new LogAndFailProcessingExceptionHandler()); + node.setProcessingExceptionHandler(new ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL)); + + final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); node.init(internalProcessorContext); - assertThrows(StreamsException.class, () -> node.process(new Record<>("key", "value", 0))); + final StreamsException processingException = assertThrows(StreamsException.class, + () -> node.process(new Record<>("key", "value", 0))); + + assertEquals("Processing exception handler is set to fail upon" + + " a processing error. If you would rather have the streaming pipeline" + + " continue after a processing error, please set the " + + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", processingException.getMessage()); + + assertTrue(processingException.getCause() instanceof RuntimeException); + assertEquals("Processing exception should be caught and handled by the processing exception handler.", + processingException.getCause().getMessage()); } @Test public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() { final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ProcessingExceptionProcessor(), Collections.emptySet()); - node.setProcessingExceptionHandler(new LogAndContinueProcessingExceptionHandler()); + node.setProcessingExceptionHandler(new ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE)); + + final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); node.init(internalProcessorContext); assertDoesNotThrow(() -> node.process(new Record<>("key", "value", 0))); } @Test + @SuppressWarnings("unchecked") public void shouldNotHandleStreamsExceptionAsProcessingException() { + final ProcessingExceptionHandler processingExceptionHandler = spy(ProcessingExceptionHandler.class); + final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new StreamsExceptionProcessor(), Collections.emptySet()); + node.setProcessingExceptionHandler(processingExceptionHandler); + + final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class); + when(internalProcessorContext.taskId()).thenReturn(new TaskId(0, 0)); + when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST, new MockTime())); node.init(internalProcessorContext); - assertThrows("Streams exception", StreamsException.class, + final StreamsException streamsException = assertThrows(StreamsException.class, () -> node.process(new Record<>("key", "value", 0))); + + assertEquals("Streams exception should not be caught and handled by the processing exception handler.", + streamsException.getMessage()); + Review Comment: nit: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ########## @@ -63,6 +95,36 @@ public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() { assertThrows(StreamsException.class, () -> node.init(null)); } + @Test + public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() { + final ProcessorNode<Object, Object, Object, Object> node = + new ProcessorNode<>("name", new ProcessingExceptionProcessor(), Collections.emptySet()); + node.setProcessingExceptionHandler(new LogAndFailProcessingExceptionHandler()); Review Comment: I am fine if you want to use `ProcessingExceptionHandlerMockTest` but could you please rename it to `ProcessingExceptionHandlerMock` so that it is clear that it is a mock and not a test. Could you also use constants in `mockInternalProcessorContext()` for the returning values of the stubs and correspondingly use the same constants in the assertions in `handle()` of `ProcessingExceptionHandlerMock`. It is not important that `context.topic()` returns `"topic"`, it is important that it returns the same value as set in the processor context. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ########## @@ -63,6 +95,36 @@ public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() { assertThrows(StreamsException.class, () -> node.init(null)); } + @Test + public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() { + final ProcessorNode<Object, Object, Object, Object> node = + new ProcessorNode<>("name", new ProcessingExceptionProcessor(), Collections.emptySet()); + node.setProcessingExceptionHandler(new LogAndFailProcessingExceptionHandler()); + node.init(internalProcessorContext); + + assertThrows(StreamsException.class, () -> node.process(new Record<>("key", "value", 0))); + } + + @Test + public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() { + final ProcessorNode<Object, Object, Object, Object> node = + new ProcessorNode<>("name", new ProcessingExceptionProcessor(), Collections.emptySet()); + node.setProcessingExceptionHandler(new LogAndContinueProcessingExceptionHandler()); + node.init(internalProcessorContext); + + assertDoesNotThrow(() -> node.process(new Record<>("key", "value", 0))); + } + + @Test + public void shouldNotHandleStreamsExceptionAsProcessingException() { + final ProcessorNode<Object, Object, Object, Object> node = + new ProcessorNode<>("name", new StreamsExceptionProcessor(), Collections.emptySet()); + node.init(internalProcessorContext); Review Comment: Sorry, I probably made a mistake while writing my comment. I wanted to say to verify that `handle()` is NOT called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org