[ https://issues.apache.org/jira/browse/KAFKA-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039444#comment-17039444 ]
Tomas Mi commented on KAFKA-9566: --------------------------------- Wouldn't it be possible to support such use case instead of throwing UnsupportedOperationException? Per my understanding the issue relates to missing child processors (or only partly initialized ProcessorContext), but durring message deserialization phaze I think child processors should be known, since that should happen in source processor. I am trying to implement exception handling while maintaining exactly once delivery guaratee and it looks like that this would be an ideal way. Otherwise do you know if there is another way to achieve the same? > ProcessorContextImpl#forward throws NullPointerException if invoked from > DeserializationExceptionHandler > -------------------------------------------------------------------------------------------------------- > > Key: KAFKA-9566 > URL: https://issues.apache.org/jira/browse/KAFKA-9566 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Tomas Mi > Priority: Major > > Hi, I am trying to implement custom DeserializationExceptionHandler which > would forward an exception to downstream processor(s), but > ProcessorContextImpl#forward throws a NullPointerException if invoked from > this custom handler. > Handler implementation: > {code:title=MyDeserializationExceptionHandler.java} > public class MyDeserializationExceptionHandler implements > DeserializationExceptionHandler { > @Override > public void configure(Map<String, ?> configs) { > } > @Override > public DeserializationHandlerResponse handle(ProcessorContext context, > ConsumerRecord<byte[], byte[]> record, Exception exception) { > context.forward(null, exception, To.child("error-processor")); > return DeserializationHandlerResponse.CONTINUE; > } > } > {code} > Handler is wired as default deserialization exception handler: > {code} > private TopologyTestDriver initializeTestDriver(StreamsBuilder > streamBuilder) { > Topology topology = streamBuilder.build(); > Properties props = new Properties(); > props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, > "my-test-application"); > props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "dummy:1234"); > props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE); > > props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > MyDeserializationExceptionHandler.class.getName()); > return new TopologyTestDriver(topology, props); > } > {code} >  > Exception stacktrace: > {noformat} > org.apache.kafka.streams.errors.StreamsException: Fatal user code error in > deserialization error callback > at > org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76) > at > org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160) > at > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) > at > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) > at > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742) > at > org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392) > ... > Caused by: java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165) > at > MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204) > at > org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70) > ... 33 more > {noformat} > Neither DeserializationExceptionHandler, nor ProcessorContext javadocs > mention that ProcessorContext#forward(...) must not be invoked from > DeserializationExceptionHandler, so I assume that this is a defect. -- This message was sent by Atlassian Jira (v8.3.4#803005)