[ https://issues.apache.org/jira/browse/KAFKA-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-9566: ----------------------------------- Affects Version/s: (was: 2.2.0) 1.0.0 > ProcessorContextImpl#forward throws NullPointerException if invoked from > DeserializationExceptionHandler > -------------------------------------------------------------------------------------------------------- > > Key: KAFKA-9566 > URL: https://issues.apache.org/jira/browse/KAFKA-9566 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Tomas Mi > Priority: Major > > Hi, I am trying to implement custom DeserializationExceptionHandler which > would forward an exception to downstream processor(s), but > ProcessorContextImpl#forward throws a NullPointerException if invoked from > this custom handler. > Handler implementation: > {code:title=MyDeserializationExceptionHandler.java} > public class MyDeserializationExceptionHandler implements > DeserializationExceptionHandler { > @Override > public void configure(Map<String, ?> configs) { > } > @Override > public DeserializationHandlerResponse handle(ProcessorContext context, > ConsumerRecord<byte[], byte[]> record, Exception exception) { > context.forward(null, exception, To.child("error-processor")); > return DeserializationHandlerResponse.CONTINUE; > } > } > {code} > Handler is wired as default deserialization exception handler: > {code} > private TopologyTestDriver initializeTestDriver(StreamsBuilder > streamBuilder) { > Topology topology = streamBuilder.build(); > Properties props = new Properties(); > props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, > "my-test-application"); > props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "dummy:1234"); > props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE); > > props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > MyDeserializationExceptionHandler.class.getName()); > return new TopologyTestDriver(topology, props); > } > {code} >  > Exception stacktrace: > {noformat} > org.apache.kafka.streams.errors.StreamsException: Fatal user code error in > deserialization error callback > at > org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76) > at > org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160) > at > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) > at > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) > at > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742) > at > org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392) > ... > Caused by: java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165) > at > MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204) > at > org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70) > ... 33 more > {noformat} > Neither DeserializationExceptionHandler, nor ProcessorContext javadocs > mention that ProcessorContext#forward(...) must not be invoked from > DeserializationExceptionHandler, so I assume that this is a defect. -- This message was sent by Atlassian Jira (v8.3.4#803005)