Hi, I've upgraded our 0.11 based stream application to the trunk version,
and I get an intermittent NPE. It's is quite a big topology, and I haven't
succeeded in reproducing it on a simpler topology.
It builds the topology, starts Kafka Streams, runs for about 20s., and then
it terminates
It seems that the 'currentNode' in the ProcessorContext is null.

Does this ring a bell for anyone?

[lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-4f17-a684-995320fd426d-StreamThread-12]
ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
stream-thread
[lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-4f17-a684-995320fd426d-StreamThread-12]
Failed to process stream task 0_0 due to the following error:
java.lang.NullPointerException
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:114)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at
com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.forwardMessage(OneToManyGroupedProcessor.java:125)
    at
com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.forwardJoin(OneToManyGroupedProcessor.java:101)
    at
com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.process(OneToManyGroupedProcessor.java:70)
    at
com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.process(OneToManyGroupedProcessor.java:1)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at
com.dexels.kafka.streams.remotejoin.PreJoinProcessor.process(PreJoinProcessor.java:25)
    at
com.dexels.kafka.streams.remotejoin.PreJoinProcessor.process(PreJoinProcessor.java:1)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at
com.dexels.kafka.streams.remotejoin.StoreProcessor.process(StoreProcessor.java:48)
    at
com.dexels.kafka.streams.remotejoin.StoreProcessor.process(StoreProcessor.java:1)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at
com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(XmlTransformerProcessor.java:52)
    at
com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(XmlTransformerProcessor.java:1)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
    at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
    at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
    at
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:952)
    at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:827)
    at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

Reply via email to