loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1678188796
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ########## @@ -34,36 +40,106 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) public class ProcessorNodeTest { + private static final String TOPIC = "topic"; + private static final int PARTITION = 0; + private static final Long OFFSET = 0L; + private static final Long TIMESTAMP = 0L; + private static final TaskId TASK_ID = new TaskId(0, 0); + private static final String NAME = "name"; + private static final String KEY = "key"; + private static final String VALUE = "value"; @Test public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() { final ProcessorNode<Object, Object, Object, Object> node = - new ProcessorNode<>("name", new ExceptionalProcessor(), Collections.emptySet()); + new ProcessorNode<>(NAME, new ExceptionalProcessor(), Collections.emptySet()); assertThrows(StreamsException.class, () -> node.init(null)); } @Test public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() { final ProcessorNode<Object, Object, Object, Object> node = - new ProcessorNode<>("name", new ExceptionalProcessor(), Collections.emptySet()); + new ProcessorNode<>(NAME, new ExceptionalProcessor(), Collections.emptySet()); assertThrows(StreamsException.class, () -> node.init(null)); } + @Test + public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() { + final ProcessorNode<Object, Object, Object, Object> node = + new ProcessorNode<>(NAME, new ProcessingExceptionProcessor(), Collections.emptySet()); + + final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); + node.setProcessingExceptionHandler(new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext)); + node.init(internalProcessorContext); + + final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class, + () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); + + assertEquals("java.lang.RuntimeException: Processing exception should be caught and handled by the processing exception handler.", + failedProcessingException.getMessage()); + + assertTrue(failedProcessingException.getCause() instanceof RuntimeException); + assertEquals("Processing exception should be caught and handled by the processing exception handler.", + failedProcessingException.getCause().getMessage()); + } + + @Test + public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() { + final ProcessorNode<Object, Object, Object, Object> node = + new ProcessorNode<>(NAME, new ProcessingExceptionProcessor(), Collections.emptySet()); + + final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext(); + node.setProcessingExceptionHandler(new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext)); + node.init(internalProcessorContext); + + assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldNotHandleInternalFailedProcessingException() { + final ProcessingExceptionHandler processingExceptionHandler = mock(ProcessingExceptionHandler.class); + + final ProcessorNode<Object, Object, Object, Object> node = + new ProcessorNode<>(NAME, new FailedProcessingExceptionProcessor(), Collections.emptySet()); + node.setProcessingExceptionHandler(processingExceptionHandler); + + final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class); Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org