Can you show the related code from OneToManyGroupedProcessor ?

Thanks

On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <flya...@gmail.com> wrote:

> Hi, I've upgraded our 0.11 based stream application to the trunk version,
> and I get an intermittent NPE. It's is quite a big topology, and I haven't
> succeeded in reproducing it on a simpler topology.
> It builds the topology, starts Kafka Streams, runs for about 20s., and then
> it terminates
> It seems that the 'currentNode' in the ProcessorContext is null.
>
> Does this ring a bell for anyone?
>
> [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> 4f17-a684-995320fd426d-StreamThread-12]
> ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
> stream-thread
> [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> 4f17-a684-995320fd426d-StreamThread-12]
> Failed to process stream task 0_0 due to the following error:
> java.lang.NullPointerException
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:114)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:90)
>     at
> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> forwardMessage(OneToManyGroupedProcessor.java:125)
>     at
> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> forwardJoin(OneToManyGroupedProcessor.java:101)
>     at
> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> process(OneToManyGroupedProcessor.java:70)
>     at
> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> process(OneToManyGroupedProcessor.java:1)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:50)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.
> runAndMeasureLatency(ProcessorNode.java:244)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:133)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:143)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:126)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:90)
>     at
> com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> process(PreJoinProcessor.java:25)
>     at
> com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> process(PreJoinProcessor.java:1)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:50)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.
> runAndMeasureLatency(ProcessorNode.java:244)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:133)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:143)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:126)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:90)
>     at
> com.dexels.kafka.streams.remotejoin.StoreProcessor.
> process(StoreProcessor.java:48)
>     at
> com.dexels.kafka.streams.remotejoin.StoreProcessor.
> process(StoreProcessor.java:1)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:50)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.
> runAndMeasureLatency(ProcessorNode.java:244)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:133)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:143)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:126)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:90)
>     at
> com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> XmlTransformerProcessor.java:52)
>     at
> com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> XmlTransformerProcessor.java:1)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:50)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.
> runAndMeasureLatency(ProcessorNode.java:244)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:133)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:143)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:126)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:90)
>     at
> org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:87)
>     at
> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:288)
>     at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(
> AssignedStreamsTasks.java:94)
>     at
> org.apache.kafka.streams.processor.internals.TaskManager.process(
> TaskManager.java:409)
>     at
> org.apache.kafka.streams.processor.internals.StreamThread.
> processAndMaybeCommit(StreamThread.java:952)
>     at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:827)
>     at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:767)
>     at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:736)
>

Reply via email to