Can you show the related code from OneToManyGroupedProcessor ? Thanks
On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <flya...@gmail.com> wrote: > Hi, I've upgraded our 0.11 based stream application to the trunk version, > and I get an intermittent NPE. It's is quite a big topology, and I haven't > succeeded in reproducing it on a simpler topology. > It builds the topology, starts Kafka Streams, runs for about 20s., and then > it terminates > It seems that the 'currentNode' in the ProcessorContext is null. > > Does this ring a bell for anyone? > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b- > 4f17-a684-995320fd426d-StreamThread-12] > ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - > stream-thread > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b- > 4f17-a684-995320fd426d-StreamThread-12] > Failed to process stream task 0_0 due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:114) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:90) > at > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > forwardMessage(OneToManyGroupedProcessor.java:125) > at > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > forwardJoin(OneToManyGroupedProcessor.java:101) > at > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > process(OneToManyGroupedProcessor.java:70) > at > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > process(OneToManyGroupedProcessor.java:1) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:50) > at > org.apache.kafka.streams.processor.internals.ProcessorNode. > runAndMeasureLatency(ProcessorNode.java:244) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:143) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:126) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:90) > at > com.dexels.kafka.streams.remotejoin.PreJoinProcessor. > process(PreJoinProcessor.java:25) > at > com.dexels.kafka.streams.remotejoin.PreJoinProcessor. > process(PreJoinProcessor.java:1) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:50) > at > org.apache.kafka.streams.processor.internals.ProcessorNode. > runAndMeasureLatency(ProcessorNode.java:244) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:143) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:126) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:90) > at > com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:48) > at > com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:1) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:50) > at > org.apache.kafka.streams.processor.internals.ProcessorNode. > runAndMeasureLatency(ProcessorNode.java:244) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:143) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:126) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:90) > at > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process( > XmlTransformerProcessor.java:52) > at > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process( > XmlTransformerProcessor.java:1) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:50) > at > org.apache.kafka.streams.processor.internals.ProcessorNode. > runAndMeasureLatency(ProcessorNode.java:244) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:143) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:126) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:90) > at > org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:87) > at > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:288) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process( > AssignedStreamsTasks.java:94) > at > org.apache.kafka.streams.processor.internals.TaskManager.process( > TaskManager.java:409) > at > org.apache.kafka.streams.processor.internals.StreamThread. > processAndMaybeCommit(StreamThread.java:952) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:827) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:767) > at > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:736) >