BTW this is tracked and resolved as https://issues.apache.org/jira/browse/KAFKA-4300.
On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Thanks Frank for reporting the bug, and many thanks to Damian for the > quick catch! > > On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu <flya...@gmail.com> wrote: > >> The issue seems to be gone. Amazing work, thanks...! >> >> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy <damian....@gmail.com> wrote: >> >> > Hi, i believe i found the problem. If possible could you please try with >> > this: https://github.com/dguy/kafka/tree/cache-bug >> > >> > Thanks, >> > Damian >> > >> > On Thu, 13 Oct 2016 at 17:46 Damian Guy <damian....@gmail.com> wrote: >> > >> > > Hi Frank, >> > > >> > > Thanks for reporting. Can you provide a sample of the join you are >> > > running? >> > > >> > > Thanks, >> > > Damian >> > > >> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu <flya...@gmail.com> wrote: >> > > >> > > Hi Kafka people, >> > > >> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the >> Kafka >> > > 0.10.1 release candidate. >> > > >> > > It runs ok for a few thousand of messages, and then it dies with the >> > > following exception: >> > > >> > > Exception in thread "StreamThread-1" java.lang.NullPointerException >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.NamedCache. >> > evict(NamedCache.java:194) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. >> > maybeEvict(ThreadCache.java:190) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. >> > put(ThreadCache.java:121) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( >> > CachingKeyValueStore.java:147) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( >> > CachingKeyValueStore.java:134) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$ >> > KTableAggregateValueGetter.get(KTableReduce.java:121) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$ >> > KTableFilterProcessor.process(KTableFilter.java:83) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$ >> > KTableFilterProcessor.process(KTableFilter.java:73) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$ >> > KTableFilterProcessor.process(KTableFilter.java:83) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$ >> > KTableFilterProcessor.process(KTableFilter.java:73) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$ >> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$ >> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$ >> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$ >> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.ForwardingCacheFl >> ushListener. >> > apply(ForwardingCacheFlushListener.java:35) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore. >> > maybeForward(CachingKeyValueStore.java:97) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ >> > 000(CachingKeyValueStore.java:34) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStor >> e$1.apply( >> > CachingKeyValueStore.java:84) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.NamedCache. >> > flush(NamedCache.java:117) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.NamedCache. >> > evict(NamedCache.java:196) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. >> > maybeEvict(ThreadCache.java:190) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. >> > put(ThreadCache.java:121) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put( >> > CachingKeyValueStore.java:187) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put( >> > CachingKeyValueStore.java:182) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$ >> > KTableReduceProcessor.process(KTableReduce.java:92) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$ >> > KTableReduceProcessor.process(KTableReduce.java:52) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > SourceNode.process(SourceNode.java:66) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > StreamTask.process(StreamTask.java:177) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> > StreamThread.java:427) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > StreamThread.run(StreamThread.java:235) >> > > >> > > I know this isn't a great bug report, as I can't seem to reproduce >> this >> > in >> > > a more sandboxed situation. Any tips / ideas for further steps? >> > > >> > > >> > >> > > > > -- > -- Guozhang > -- -- Guozhang