Tomas Mi created KAFKA-9566: ------------------------------- Summary: ProcessorContextImpl#forward throws NullPointerException if invoked from DeserializationExceptionHandler Key: KAFKA-9566 URL: https://issues.apache.org/jira/browse/KAFKA-9566 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.2.0 Reporter: Tomas Mi
Hi, I am trying to implement custom DeserializationExceptionHandler which would forward an exception to downstream processor(s), but ProcessorContextImpl#forward throws a NullPointerException if invoked from this custom handler. Handler implementation: {code:title=MyDeserializationExceptionHandler.java} public class MyDeserializationExceptionHandler implements DeserializationExceptionHandler { @Override public void configure(Map<String, ?> configs) { } @Override public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) { context.forward(null, exception, To.child("error-processor")); return DeserializationHandlerResponse.CONTINUE; } } {code} Handler is wired as default deserialization exception handler: {code} private TopologyTestDriver initializeTestDriver(StreamsBuilder streamBuilder) { Topology topology = streamBuilder.build(); Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "my-test-application"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, MyDeserializationExceptionHandler.class.getName()); return new TopologyTestDriver(topology, props); } {code}  Exception stacktrace: {noformat} org.apache.kafka.streams.errors.StreamsException: Fatal user code error in deserialization error callback at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76) at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742) at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392) ... Caused by: java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165) at MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204) at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70) ... 33 more {noformat} Neither DeserializationExceptionHandler, nor ProcessorContext javadocs mention that ProcessorContext#forward(...) must not be invoked from DeserializationExceptionHandler, so I assume that this is a defect. -- This message was sent by Atlassian Jira (v8.3.4#803005)