loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1678171866


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -34,36 +40,106 @@
 import org.apache.kafka.test.StreamsTestUtils;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+@ExtendWith(MockitoExtension.class)
 public class ProcessorNodeTest {
+    private static final String TOPIC = "topic";
+    private static final int PARTITION = 0;
+    private static final Long OFFSET = 0L;
+    private static final Long TIMESTAMP = 0L;
+    private static final TaskId TASK_ID = new TaskId(0, 0);
+    private static final String NAME = "name";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
     @Test
     public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
         final ProcessorNode<Object, Object, Object, Object> node =
-            new ProcessorNode<>("name", new ExceptionalProcessor(), 
Collections.emptySet());
+            new ProcessorNode<>(NAME, new ExceptionalProcessor(), 
Collections.emptySet());
         assertThrows(StreamsException.class, () -> node.init(null));
     }
 
+    @Test
+    public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>(NAME, new ProcessingExceptionProcessor(), 
Collections.emptySet());
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
+        node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
 internalProcessorContext));
+        node.init(internalProcessorContext);
+
+        final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
+            () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
+
+        assertEquals("java.lang.RuntimeException: Processing exception should 
be caught and handled by the processing exception handler.",
+            failedProcessingException.getMessage());

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to