Re: Occasional NPE in NamedCache
Hi Frank, Which version of kafka are you running? The line numbers in the stack trace don't match up with what i am seeing on 0.10.1 or on trunk. FYI - I created a JIRA for this here: https://issues.apache.org/jira/browse/KAFKA-4311 Thanks, Damian On Tue, 18 Oct 2016 at 15:52 Damian Guywrote: > Also, it'd be great if you could share your streams topology. > > Thanks, > Damian > > On Tue, 18 Oct 2016 at 15:48 Damian Guy wrote: > > Hi Frank, > > Are you able to reproduce this? I'll have a look into it, but it is not > immediately clear how it could get into this state. > > Thanks, > Damian > > > On Tue, 18 Oct 2016 at 11:08 Frank Lyaruu wrote: > > I might have run into a related problem: > > [StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Failed to close state manager for StreamTask 0_0: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed > to close state store addr-organization > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342) > at > > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121) > at > > org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341) > at > > org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322) > at > > org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338) > at > > org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299) > at > > org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245) > Caused by: java.lang.IllegalStateException: Key found in dirty key set, but > entry is null > > at > > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112) > at > > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117) > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340) > > ... 7 more > > I haven't done much research and it is quite possible there is a bug on my > side, but I don't think I should be seeing this. > > > > On Thu, Oct 13, 2016 at 10:18 PM, Guozhang Wang > wrote: > > > BTW this is tracked and resolved as > > https://issues.apache.org/jira/browse/KAFKA-4300. > > > > On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang > wrote: > > > > > Thanks Frank for reporting the bug, and many thanks to Damian for the > > > quick catch! > > > > > > On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu > > wrote: > > > > > >> The issue seems to be gone. Amazing work, thanks...! > > >> > > >> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy > > wrote: > > >> > > >> > Hi, i believe i found the problem. If possible could you please try > > with > > >> > this: https://github.com/dguy/kafka/tree/cache-bug > > >> > > > >> > Thanks, > > >> > Damian > > >> > > > >> > On Thu, 13 Oct 2016 at 17:46 Damian Guy > wrote: > > >> > > > >> > > Hi Frank, > > >> > > > > >> > > Thanks for reporting. Can you provide a sample of the join you are > > >> > > running? > > >> > > > > >> > > Thanks, > > >> > > Damian > > >> > > > > >> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu > > wrote: > > >> > > > > >> > > Hi Kafka people, > > >> > > > > >> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the > > >> Kafka > > >> > > 0.10.1 release candidate. > > >> > > > > >> > > It runs ok for a few thousand of messages, and then it dies with > the > > >> > > following exception: > > >> > > > > >> > > Exception in thread "StreamThread-1" > java.lang.NullPointerException > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.NamedCache. > > >> > evict(NamedCache.java:194) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. > > >> > maybeEvict(ThreadCache.java:190) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. > > >> > put(ThreadCache.java:121) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > >> > CachingKeyValueStore.java:147) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > >> > CachingKeyValueStore.java:134) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$ > > >>
Re: Occasional NPE in NamedCache
Also, it'd be great if you could share your streams topology. Thanks, Damian On Tue, 18 Oct 2016 at 15:48 Damian Guywrote: > Hi Frank, > > Are you able to reproduce this? I'll have a look into it, but it is not > immediately clear how it could get into this state. > > Thanks, > Damian > > > On Tue, 18 Oct 2016 at 11:08 Frank Lyaruu wrote: > > I might have run into a related problem: > > [StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Failed to close state manager for StreamTask 0_0: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed > to close state store addr-organization > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342) > at > > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121) > at > > org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341) > at > > org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322) > at > > org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338) > at > > org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299) > at > > org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245) > Caused by: java.lang.IllegalStateException: Key found in dirty key set, but > entry is null > > at > > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112) > at > > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117) > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340) > > ... 7 more > > I haven't done much research and it is quite possible there is a bug on my > side, but I don't think I should be seeing this. > > > > On Thu, Oct 13, 2016 at 10:18 PM, Guozhang Wang > wrote: > > > BTW this is tracked and resolved as > > https://issues.apache.org/jira/browse/KAFKA-4300. > > > > On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang > wrote: > > > > > Thanks Frank for reporting the bug, and many thanks to Damian for the > > > quick catch! > > > > > > On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu > > wrote: > > > > > >> The issue seems to be gone. Amazing work, thanks...! > > >> > > >> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy > > wrote: > > >> > > >> > Hi, i believe i found the problem. If possible could you please try > > with > > >> > this: https://github.com/dguy/kafka/tree/cache-bug > > >> > > > >> > Thanks, > > >> > Damian > > >> > > > >> > On Thu, 13 Oct 2016 at 17:46 Damian Guy > wrote: > > >> > > > >> > > Hi Frank, > > >> > > > > >> > > Thanks for reporting. Can you provide a sample of the join you are > > >> > > running? > > >> > > > > >> > > Thanks, > > >> > > Damian > > >> > > > > >> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu > > wrote: > > >> > > > > >> > > Hi Kafka people, > > >> > > > > >> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the > > >> Kafka > > >> > > 0.10.1 release candidate. > > >> > > > > >> > > It runs ok for a few thousand of messages, and then it dies with > the > > >> > > following exception: > > >> > > > > >> > > Exception in thread "StreamThread-1" > java.lang.NullPointerException > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.NamedCache. > > >> > evict(NamedCache.java:194) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. > > >> > maybeEvict(ThreadCache.java:190) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. > > >> > put(ThreadCache.java:121) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > >> > CachingKeyValueStore.java:147) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > >> > CachingKeyValueStore.java:134) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$ > > >> > KTableAggregateValueGetter.get(KTableReduce.java:121) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
Re: Occasional NPE in NamedCache
Hi Frank, Are you able to reproduce this? I'll have a look into it, but it is not immediately clear how it could get into this state. Thanks, Damian On Tue, 18 Oct 2016 at 11:08 Frank Lyaruuwrote: > I might have run into a related problem: > > [StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Failed to close state manager for StreamTask 0_0: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed > to close state store addr-organization > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342) > at > > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121) > at > > org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341) > at > > org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322) > at > > org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338) > at > > org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299) > at > > org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245) > Caused by: java.lang.IllegalStateException: Key found in dirty key set, but > entry is null > > at > > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112) > at > > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117) > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340) > > ... 7 more > > I haven't done much research and it is quite possible there is a bug on my > side, but I don't think I should be seeing this. > > > > On Thu, Oct 13, 2016 at 10:18 PM, Guozhang Wang > wrote: > > > BTW this is tracked and resolved as > > https://issues.apache.org/jira/browse/KAFKA-4300. > > > > On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang > wrote: > > > > > Thanks Frank for reporting the bug, and many thanks to Damian for the > > > quick catch! > > > > > > On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu > > wrote: > > > > > >> The issue seems to be gone. Amazing work, thanks...! > > >> > > >> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy > > wrote: > > >> > > >> > Hi, i believe i found the problem. If possible could you please try > > with > > >> > this: https://github.com/dguy/kafka/tree/cache-bug > > >> > > > >> > Thanks, > > >> > Damian > > >> > > > >> > On Thu, 13 Oct 2016 at 17:46 Damian Guy > wrote: > > >> > > > >> > > Hi Frank, > > >> > > > > >> > > Thanks for reporting. Can you provide a sample of the join you are > > >> > > running? > > >> > > > > >> > > Thanks, > > >> > > Damian > > >> > > > > >> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu > > wrote: > > >> > > > > >> > > Hi Kafka people, > > >> > > > > >> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the > > >> Kafka > > >> > > 0.10.1 release candidate. > > >> > > > > >> > > It runs ok for a few thousand of messages, and then it dies with > the > > >> > > following exception: > > >> > > > > >> > > Exception in thread "StreamThread-1" > java.lang.NullPointerException > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.NamedCache. > > >> > evict(NamedCache.java:194) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. > > >> > maybeEvict(ThreadCache.java:190) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. > > >> > put(ThreadCache.java:121) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > >> > CachingKeyValueStore.java:147) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > >> > CachingKeyValueStore.java:134) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$ > > >> > KTableAggregateValueGetter.get(KTableReduce.java:121) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) > > >> > > at > > >> > > > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) > > >> > > at > > >> > > > > >> > > >
Re: Occasional NPE in NamedCache
I might have run into a related problem: [StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] Failed to close state manager for StreamTask 0_0: org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to close state store addr-organization at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342) at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121) at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341) at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322) at org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338) at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299) at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245) Caused by: java.lang.IllegalStateException: Key found in dirty key set, but entry is null at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340) ... 7 more I haven't done much research and it is quite possible there is a bug on my side, but I don't think I should be seeing this. On Thu, Oct 13, 2016 at 10:18 PM, Guozhang Wangwrote: > BTW this is tracked and resolved as > https://issues.apache.org/jira/browse/KAFKA-4300. > > On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang wrote: > > > Thanks Frank for reporting the bug, and many thanks to Damian for the > > quick catch! > > > > On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu > wrote: > > > >> The issue seems to be gone. Amazing work, thanks...! > >> > >> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy > wrote: > >> > >> > Hi, i believe i found the problem. If possible could you please try > with > >> > this: https://github.com/dguy/kafka/tree/cache-bug > >> > > >> > Thanks, > >> > Damian > >> > > >> > On Thu, 13 Oct 2016 at 17:46 Damian Guy wrote: > >> > > >> > > Hi Frank, > >> > > > >> > > Thanks for reporting. Can you provide a sample of the join you are > >> > > running? > >> > > > >> > > Thanks, > >> > > Damian > >> > > > >> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu > wrote: > >> > > > >> > > Hi Kafka people, > >> > > > >> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the > >> Kafka > >> > > 0.10.1 release candidate. > >> > > > >> > > It runs ok for a few thousand of messages, and then it dies with the > >> > > following exception: > >> > > > >> > > Exception in thread "StreamThread-1" java.lang.NullPointerException > >> > > at > >> > > > >> > > org.apache.kafka.streams.state.internals.NamedCache. > >> > evict(NamedCache.java:194) > >> > > at > >> > > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. > >> > maybeEvict(ThreadCache.java:190) > >> > > at > >> > > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. > >> > put(ThreadCache.java:121) > >> > > at > >> > > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > >> > CachingKeyValueStore.java:147) > >> > > at > >> > > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > >> > CachingKeyValueStore.java:134) > >> > > at > >> > > > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$ > >> > KTableAggregateValueGetter.get(KTableReduce.java:121) > >> > > at > >> > > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) > >> > > at > >> > > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) > >> > > at > >> > > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > >> > ProcessorNode.java:82) > >> > > at > >> > > > >> > > org.apache.kafka.streams.processor.internals. > >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> > > at > >> > > > >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$ > >> > KTableFilterProcessor.process(KTableFilter.java:83) > >> > > at > >> > > > >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$ > >> > KTableFilterProcessor.process(KTableFilter.java:73) > >> > > at > >> > > > >> >
Re: Occasional NPE in NamedCache
BTW this is tracked and resolved as https://issues.apache.org/jira/browse/KAFKA-4300. On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wangwrote: > Thanks Frank for reporting the bug, and many thanks to Damian for the > quick catch! > > On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu wrote: > >> The issue seems to be gone. Amazing work, thanks...! >> >> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy wrote: >> >> > Hi, i believe i found the problem. If possible could you please try with >> > this: https://github.com/dguy/kafka/tree/cache-bug >> > >> > Thanks, >> > Damian >> > >> > On Thu, 13 Oct 2016 at 17:46 Damian Guy wrote: >> > >> > > Hi Frank, >> > > >> > > Thanks for reporting. Can you provide a sample of the join you are >> > > running? >> > > >> > > Thanks, >> > > Damian >> > > >> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu wrote: >> > > >> > > Hi Kafka people, >> > > >> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the >> Kafka >> > > 0.10.1 release candidate. >> > > >> > > It runs ok for a few thousand of messages, and then it dies with the >> > > following exception: >> > > >> > > Exception in thread "StreamThread-1" java.lang.NullPointerException >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.NamedCache. >> > evict(NamedCache.java:194) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. >> > maybeEvict(ThreadCache.java:190) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.ThreadCache. >> > put(ThreadCache.java:121) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( >> > CachingKeyValueStore.java:147) >> > > at >> > > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( >> > CachingKeyValueStore.java:134) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$ >> > KTableAggregateValueGetter.get(KTableReduce.java:121) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$ >> > KTableFilterProcessor.process(KTableFilter.java:83) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$ >> > KTableFilterProcessor.process(KTableFilter.java:73) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$ >> > KTableFilterProcessor.process(KTableFilter.java:83) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$ >> > KTableFilterProcessor.process(KTableFilter.java:73) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$ >> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$ >> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$ >> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$ >> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > > at >> > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) >> > > at >> > > >> > > org.apache.kafka.streams.kstream.internals.ForwardingCacheFl >> ushListener. >> > apply(ForwardingCacheFlushListener.java:35) >> > >
Re: Occasional NPE in NamedCache
The issue seems to be gone. Amazing work, thanks...! On Thu, Oct 13, 2016 at 6:56 PM, Damian Guywrote: > Hi, i believe i found the problem. If possible could you please try with > this: https://github.com/dguy/kafka/tree/cache-bug > > Thanks, > Damian > > On Thu, 13 Oct 2016 at 17:46 Damian Guy wrote: > > > Hi Frank, > > > > Thanks for reporting. Can you provide a sample of the join you are > > running? > > > > Thanks, > > Damian > > > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu wrote: > > > > Hi Kafka people, > > > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka > > 0.10.1 release candidate. > > > > It runs ok for a few thousand of messages, and then it dies with the > > following exception: > > > > Exception in thread "StreamThread-1" java.lang.NullPointerException > > at > > > > org.apache.kafka.streams.state.internals.NamedCache. > evict(NamedCache.java:194) > > at > > > > org.apache.kafka.streams.state.internals.ThreadCache. > maybeEvict(ThreadCache.java:190) > > at > > > > org.apache.kafka.streams.state.internals.ThreadCache. > put(ThreadCache.java:121) > > at > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > CachingKeyValueStore.java:147) > > at > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > CachingKeyValueStore.java:134) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableReduce$ > KTableAggregateValueGetter.get(KTableReduce.java:121) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$ > KTableFilterProcessor.process(KTableFilter.java:83) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$ > KTableFilterProcessor.process(KTableFilter.java:73) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$ > KTableFilterProcessor.process(KTableFilter.java:83) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$ > KTableFilterProcessor.process(KTableFilter.java:73) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$ > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$ > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$ > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83) > > at > > > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$ > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > at > > > > org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener. > apply(ForwardingCacheFlushListener.java:35) > > at > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore. > maybeForward(CachingKeyValueStore.java:97) > > at > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ > 000(CachingKeyValueStore.java:34) > > at > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply( > CachingKeyValueStore.java:84) > > at > > > > org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:117) > > at > > > > org.apache.kafka.streams.state.internals.NamedCache. > evict(NamedCache.java:196) > > at > > > > org.apache.kafka.streams.state.internals.ThreadCache. > maybeEvict(ThreadCache.java:190) > > at > > > > org.apache.kafka.streams.state.internals.ThreadCache. > put(ThreadCache.java:121) > >
Re: Occasional NPE in NamedCache
Hi, i believe i found the problem. If possible could you please try with this: https://github.com/dguy/kafka/tree/cache-bug Thanks, Damian On Thu, 13 Oct 2016 at 17:46 Damian Guywrote: > Hi Frank, > > Thanks for reporting. Can you provide a sample of the join you are > running? > > Thanks, > Damian > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu wrote: > > Hi Kafka people, > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka > 0.10.1 release candidate. > > It runs ok for a few thousand of messages, and then it dies with the > following exception: > > Exception in thread "StreamThread-1" java.lang.NullPointerException > at > > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194) > at > > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) > at > > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134) > at > > org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83) > at > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83) > at > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84) > at > > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) > at > > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196) > at > > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) > at > > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182) > at > > org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92) > at > > org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52) > at > >
Re: Occasional NPE in NamedCache
Hi Frank, Thanks for reporting. Can you provide a sample of the join you are running? Thanks, Damian On Thu, 13 Oct 2016 at 16:10 Frank Lyaruuwrote: > Hi Kafka people, > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka > 0.10.1 release candidate. > > It runs ok for a few thousand of messages, and then it dies with the > following exception: > > Exception in thread "StreamThread-1" java.lang.NullPointerException > at > > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194) > at > > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) > at > > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134) > at > > org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83) > at > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83) > at > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > > org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84) > at > > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) > at > > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196) > at > > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) > at > > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187) > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182) > at > > org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92) > at > > org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > >