Tomas Mi created KAFKA-9566:
-------------------------------

             Summary: ProcessorContextImpl#forward throws NullPointerException 
if invoked from DeserializationExceptionHandler
                 Key: KAFKA-9566
                 URL: https://issues.apache.org/jira/browse/KAFKA-9566
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.2.0
            Reporter: Tomas Mi


Hi, I am trying to implement custom DeserializationExceptionHandler which would 
forward an exception to downstream processor(s), but 
ProcessorContextImpl#forward throws a NullPointerException if invoked from this 
custom handler.

Handler implementation:
{code:title=MyDeserializationExceptionHandler.java}

public class MyDeserializationExceptionHandler implements 
DeserializationExceptionHandler {

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public DeserializationHandlerResponse handle(ProcessorContext context, 
ConsumerRecord<byte[], byte[]> record, Exception exception) {
        context.forward(null, exception, To.child("error-processor"));
        return DeserializationHandlerResponse.CONTINUE;
    }
}
{code}

Handler is wired as default deserialization exception handler:
{code}
    private TopologyTestDriver initializeTestDriver(StreamsBuilder 
streamBuilder) {
        Topology topology = streamBuilder.build();
        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-test-application");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
        
props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 MyDeserializationExceptionHandler.class.getName());
        return new TopologyTestDriver(topology, props);
    }
{code}
 
Exception stacktrace:
{noformat}
org.apache.kafka.streams.errors.StreamsException: Fatal user code error in 
deserialization error callback
    at 
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76)
    at 
org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
    at 
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
    at 
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
    at 
org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392)
    ...

Caused by: java.lang.NullPointerException
    at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165)
    at 
MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204)
    at 
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70)
 ... 33 more
{noformat}

Neither DeserializationExceptionHandler, nor ProcessorContext javadocs mention 
that ProcessorContext#forward(...) must not be invoked from 
DeserializationExceptionHandler, so I assume that this is a defect.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to