loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1678174474
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ########## @@ -34,36 +40,106 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) public class ProcessorNodeTest { + private static final String TOPIC = "topic"; + private static final int PARTITION = 0; + private static final Long OFFSET = 0L; + private static final Long TIMESTAMP = 0L; + private static final TaskId TASK_ID = new TaskId(0, 0); + private static final String NAME = "name"; + private static final String KEY = "key"; + private static final String VALUE = "value"; @Test public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() { final ProcessorNode<Object, Object, Object, Object> node = - new ProcessorNode<>("name", new ExceptionalProcessor(), Collections.emptySet()); + new ProcessorNode<>(NAME, new ExceptionalProcessor(), Collections.emptySet()); assertThrows(StreamsException.class, () -> node.init(null)); } @Test public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() { final ProcessorNode<Object, Object, Object, Object> node = - new ProcessorNode<>("name", new ExceptionalProcessor(), Collections.emptySet()); + new ProcessorNode<>(NAME, new ExceptionalProcessor(), Collections.emptySet()); assertThrows(StreamsException.class, () -> node.init(null)); } + @Test + public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org