[ 
https://issues.apache.org/jira/browse/KAFKA-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039444#comment-17039444
 ] 

Tomas Mi commented on KAFKA-9566:
---------------------------------

Wouldn't it be possible to support such use case instead of throwing 
UnsupportedOperationException? Per my understanding the issue relates to 
missing child processors (or only partly initialized ProcessorContext), but 
durring message deserialization phaze I think child processors should be known, 
since that should happen in source processor. I am trying to implement 
exception handling while maintaining exactly once delivery guaratee and it 
looks like that this would be an ideal way. Otherwise do you know if there is 
another way to achieve the same?

> ProcessorContextImpl#forward throws NullPointerException if invoked from 
> DeserializationExceptionHandler
> --------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9566
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Tomas Mi
>            Priority: Major
>
> Hi, I am trying to implement custom DeserializationExceptionHandler which 
> would forward an exception to downstream processor(s), but 
> ProcessorContextImpl#forward throws a NullPointerException if invoked from 
> this custom handler.
> Handler implementation:
> {code:title=MyDeserializationExceptionHandler.java}
> public class MyDeserializationExceptionHandler implements 
> DeserializationExceptionHandler {
>     @Override
>     public void configure(Map<String, ?> configs) {
>     }
>     @Override
>     public DeserializationHandlerResponse handle(ProcessorContext context, 
> ConsumerRecord<byte[], byte[]> record, Exception exception) {
>         context.forward(null, exception, To.child("error-processor"));
>         return DeserializationHandlerResponse.CONTINUE;
>     }
> }
> {code}
> Handler is wired as default deserialization exception handler:
> {code}
>     private TopologyTestDriver initializeTestDriver(StreamsBuilder 
> streamBuilder) {
>         Topology topology = streamBuilder.build();
>         Properties props = new Properties();
>         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-test-application");
>         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "dummy:1234");
>         props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE);
>         
> props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
>  MyDeserializationExceptionHandler.class.getName());
>         return new TopologyTestDriver(topology, props);
>     }
> {code}
>  
> Exception stacktrace:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: Fatal user code error in 
> deserialization error callback
>     at 
> org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76)
>     at 
> org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
>     at 
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
>     at 
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
>     at 
> org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392)
>     ...
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165)
>     at 
> MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204)
>     at 
> org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70)
>  ... 33 more
> {noformat}
> Neither DeserializationExceptionHandler, nor ProcessorContext javadocs 
> mention that ProcessorContext#forward(...) must not be invoked from 
> DeserializationExceptionHandler, so I assume that this is a defect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to