Re: Occasional NPE in NamedCache

2016-10-18 Thread Damian Guy
Hi Frank,

Which version of kafka are you running? The line numbers in the stack trace
don't match up with what i am seeing on 0.10.1 or on trunk.

FYI - I created a JIRA for this here:
https://issues.apache.org/jira/browse/KAFKA-4311

Thanks,
Damian

On Tue, 18 Oct 2016 at 15:52 Damian Guy  wrote:

> Also, it'd be great if you could share your streams topology.
>
> Thanks,
> Damian
>
> On Tue, 18 Oct 2016 at 15:48 Damian Guy  wrote:
>
> Hi Frank,
>
> Are you able to reproduce this? I'll have a look into it, but it is not
> immediately clear how it could get into this state.
>
> Thanks,
> Damian
>
>
> On Tue, 18 Oct 2016 at 11:08 Frank Lyaruu  wrote:
>
> I might have run into a related problem:
>
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
>
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
>
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
>
> ... 7 more
>
> I haven't done much research and it is quite possible there is a bug on my
> side, but I don't think I should be seeing this.
>
>
>
> On Thu, Oct 13, 2016 at 10:18 PM, Guozhang Wang 
> wrote:
>
> > BTW this is tracked and resolved as
> > https://issues.apache.org/jira/browse/KAFKA-4300.
> >
> > On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang 
> wrote:
> >
> > > Thanks Frank for reporting the bug, and many thanks to Damian for the
> > > quick catch!
> > >
> > > On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu 
> > wrote:
> > >
> > >> The issue seems to be gone. Amazing work, thanks...!
> > >>
> > >> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy 
> > wrote:
> > >>
> > >> > Hi, i believe i found the problem. If possible could you please try
> > with
> > >> > this: https://github.com/dguy/kafka/tree/cache-bug
> > >> >
> > >> > Thanks,
> > >> > Damian
> > >> >
> > >> > On Thu, 13 Oct 2016 at 17:46 Damian Guy 
> wrote:
> > >> >
> > >> > > Hi Frank,
> > >> > >
> > >> > > Thanks for reporting. Can you provide a sample of the join you are
> > >> > > running?
> > >> > >
> > >> > > Thanks,
> > >> > > Damian
> > >> > >
> > >> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu 
> > wrote:
> > >> > >
> > >> > > Hi Kafka people,
> > >> > >
> > >> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the
> > >> Kafka
> > >> > > 0.10.1 release candidate.
> > >> > >
> > >> > > It runs ok for a few thousand of messages, and then it dies with
> the
> > >> > > following exception:
> > >> > >
> > >> > > Exception in thread "StreamThread-1"
> java.lang.NullPointerException
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.NamedCache.
> > >> > evict(NamedCache.java:194)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > >> > maybeEvict(ThreadCache.java:190)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > >> > put(ThreadCache.java:121)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > >> > CachingKeyValueStore.java:147)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > >> > CachingKeyValueStore.java:134)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
> > >> 

Re: Occasional NPE in NamedCache

2016-10-18 Thread Damian Guy
Also, it'd be great if you could share your streams topology.

Thanks,
Damian

On Tue, 18 Oct 2016 at 15:48 Damian Guy  wrote:

> Hi Frank,
>
> Are you able to reproduce this? I'll have a look into it, but it is not
> immediately clear how it could get into this state.
>
> Thanks,
> Damian
>
>
> On Tue, 18 Oct 2016 at 11:08 Frank Lyaruu  wrote:
>
> I might have run into a related problem:
>
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
>
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
>
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
>
> ... 7 more
>
> I haven't done much research and it is quite possible there is a bug on my
> side, but I don't think I should be seeing this.
>
>
>
> On Thu, Oct 13, 2016 at 10:18 PM, Guozhang Wang 
> wrote:
>
> > BTW this is tracked and resolved as
> > https://issues.apache.org/jira/browse/KAFKA-4300.
> >
> > On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang 
> wrote:
> >
> > > Thanks Frank for reporting the bug, and many thanks to Damian for the
> > > quick catch!
> > >
> > > On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu 
> > wrote:
> > >
> > >> The issue seems to be gone. Amazing work, thanks...!
> > >>
> > >> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy 
> > wrote:
> > >>
> > >> > Hi, i believe i found the problem. If possible could you please try
> > with
> > >> > this: https://github.com/dguy/kafka/tree/cache-bug
> > >> >
> > >> > Thanks,
> > >> > Damian
> > >> >
> > >> > On Thu, 13 Oct 2016 at 17:46 Damian Guy 
> wrote:
> > >> >
> > >> > > Hi Frank,
> > >> > >
> > >> > > Thanks for reporting. Can you provide a sample of the join you are
> > >> > > running?
> > >> > >
> > >> > > Thanks,
> > >> > > Damian
> > >> > >
> > >> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu 
> > wrote:
> > >> > >
> > >> > > Hi Kafka people,
> > >> > >
> > >> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the
> > >> Kafka
> > >> > > 0.10.1 release candidate.
> > >> > >
> > >> > > It runs ok for a few thousand of messages, and then it dies with
> the
> > >> > > following exception:
> > >> > >
> > >> > > Exception in thread "StreamThread-1"
> java.lang.NullPointerException
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.NamedCache.
> > >> > evict(NamedCache.java:194)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > >> > maybeEvict(ThreadCache.java:190)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > >> > put(ThreadCache.java:121)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > >> > CachingKeyValueStore.java:147)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > >> > CachingKeyValueStore.java:134)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
> > >> > KTableAggregateValueGetter.get(KTableReduce.java:121)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> > >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$

Re: Occasional NPE in NamedCache

2016-10-18 Thread Damian Guy
Hi Frank,

Are you able to reproduce this? I'll have a look into it, but it is not
immediately clear how it could get into this state.

Thanks,
Damian


On Tue, 18 Oct 2016 at 11:08 Frank Lyaruu  wrote:

> I might have run into a related problem:
>
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
>
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
>
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
>
> ... 7 more
>
> I haven't done much research and it is quite possible there is a bug on my
> side, but I don't think I should be seeing this.
>
>
>
> On Thu, Oct 13, 2016 at 10:18 PM, Guozhang Wang 
> wrote:
>
> > BTW this is tracked and resolved as
> > https://issues.apache.org/jira/browse/KAFKA-4300.
> >
> > On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang 
> wrote:
> >
> > > Thanks Frank for reporting the bug, and many thanks to Damian for the
> > > quick catch!
> > >
> > > On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu 
> > wrote:
> > >
> > >> The issue seems to be gone. Amazing work, thanks...!
> > >>
> > >> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy 
> > wrote:
> > >>
> > >> > Hi, i believe i found the problem. If possible could you please try
> > with
> > >> > this: https://github.com/dguy/kafka/tree/cache-bug
> > >> >
> > >> > Thanks,
> > >> > Damian
> > >> >
> > >> > On Thu, 13 Oct 2016 at 17:46 Damian Guy 
> wrote:
> > >> >
> > >> > > Hi Frank,
> > >> > >
> > >> > > Thanks for reporting. Can you provide a sample of the join you are
> > >> > > running?
> > >> > >
> > >> > > Thanks,
> > >> > > Damian
> > >> > >
> > >> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu 
> > wrote:
> > >> > >
> > >> > > Hi Kafka people,
> > >> > >
> > >> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the
> > >> Kafka
> > >> > > 0.10.1 release candidate.
> > >> > >
> > >> > > It runs ok for a few thousand of messages, and then it dies with
> the
> > >> > > following exception:
> > >> > >
> > >> > > Exception in thread "StreamThread-1"
> java.lang.NullPointerException
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.NamedCache.
> > >> > evict(NamedCache.java:194)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > >> > maybeEvict(ThreadCache.java:190)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > >> > put(ThreadCache.java:121)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > >> > CachingKeyValueStore.java:147)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > >> > CachingKeyValueStore.java:134)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
> > >> > KTableAggregateValueGetter.get(KTableReduce.java:121)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> > >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> > >> > > at
> > >> > >
> > >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> > >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> > >> > > at
> > >> > >
> > >> > >
> 

Re: Occasional NPE in NamedCache

2016-10-18 Thread Frank Lyaruu
I might have run into a related problem:

[StreamThread-1] ERROR
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[StreamThread-1] Failed to close state manager for StreamTask 0_0:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
to close state store addr-organization

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
at
org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
at
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
at
org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
entry is null

at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)

... 7 more

I haven't done much research and it is quite possible there is a bug on my
side, but I don't think I should be seeing this.



On Thu, Oct 13, 2016 at 10:18 PM, Guozhang Wang  wrote:

> BTW this is tracked and resolved as
> https://issues.apache.org/jira/browse/KAFKA-4300.
>
> On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang  wrote:
>
> > Thanks Frank for reporting the bug, and many thanks to Damian for the
> > quick catch!
> >
> > On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu 
> wrote:
> >
> >> The issue seems to be gone. Amazing work, thanks...!
> >>
> >> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy 
> wrote:
> >>
> >> > Hi, i believe i found the problem. If possible could you please try
> with
> >> > this: https://github.com/dguy/kafka/tree/cache-bug
> >> >
> >> > Thanks,
> >> > Damian
> >> >
> >> > On Thu, 13 Oct 2016 at 17:46 Damian Guy  wrote:
> >> >
> >> > > Hi Frank,
> >> > >
> >> > > Thanks for reporting. Can you provide a sample of the join you are
> >> > > running?
> >> > >
> >> > > Thanks,
> >> > > Damian
> >> > >
> >> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu 
> wrote:
> >> > >
> >> > > Hi Kafka people,
> >> > >
> >> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the
> >> Kafka
> >> > > 0.10.1 release candidate.
> >> > >
> >> > > It runs ok for a few thousand of messages, and then it dies with the
> >> > > following exception:
> >> > >
> >> > > Exception in thread "StreamThread-1" java.lang.NullPointerException
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.state.internals.NamedCache.
> >> > evict(NamedCache.java:194)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.state.internals.ThreadCache.
> >> > maybeEvict(ThreadCache.java:190)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.state.internals.ThreadCache.
> >> > put(ThreadCache.java:121)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> >> > CachingKeyValueStore.java:147)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> >> > CachingKeyValueStore.java:134)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
> >> > KTableAggregateValueGetter.get(KTableReduce.java:121)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> >> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >> > ProcessorNode.java:82)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
> >> > KTableFilterProcessor.process(KTableFilter.java:83)
> >> > > at
> >> > >
> >> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
> >> > KTableFilterProcessor.process(KTableFilter.java:73)
> >> > > at
> >> > >
> >> > 

Re: Occasional NPE in NamedCache

2016-10-13 Thread Guozhang Wang
BTW this is tracked and resolved as
https://issues.apache.org/jira/browse/KAFKA-4300.

On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang  wrote:

> Thanks Frank for reporting the bug, and many thanks to Damian for the
> quick catch!
>
> On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu  wrote:
>
>> The issue seems to be gone. Amazing work, thanks...!
>>
>> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy  wrote:
>>
>> > Hi, i believe i found the problem. If possible could you please try with
>> > this: https://github.com/dguy/kafka/tree/cache-bug
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Thu, 13 Oct 2016 at 17:46 Damian Guy  wrote:
>> >
>> > > Hi Frank,
>> > >
>> > > Thanks for reporting. Can you provide a sample of the join you are
>> > > running?
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu  wrote:
>> > >
>> > > Hi Kafka people,
>> > >
>> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the
>> Kafka
>> > > 0.10.1 release candidate.
>> > >
>> > > It runs ok for a few thousand of messages, and then it dies with the
>> > > following exception:
>> > >
>> > > Exception in thread "StreamThread-1" java.lang.NullPointerException
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.NamedCache.
>> > evict(NamedCache.java:194)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.ThreadCache.
>> > maybeEvict(ThreadCache.java:190)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.ThreadCache.
>> > put(ThreadCache.java:121)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
>> > CachingKeyValueStore.java:147)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
>> > CachingKeyValueStore.java:134)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
>> > KTableAggregateValueGetter.get(KTableReduce.java:121)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
>> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
>> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:83)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:83)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
>> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
>> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
>> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
>> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.ForwardingCacheFl
>> ushListener.
>> > apply(ForwardingCacheFlushListener.java:35)
>> > > 

Re: Occasional NPE in NamedCache

2016-10-13 Thread Frank Lyaruu
The issue seems to be gone. Amazing work, thanks...!

On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy  wrote:

> Hi, i believe i found the problem. If possible could you please try with
> this: https://github.com/dguy/kafka/tree/cache-bug
>
> Thanks,
> Damian
>
> On Thu, 13 Oct 2016 at 17:46 Damian Guy  wrote:
>
> > Hi Frank,
> >
> > Thanks for reporting. Can you provide a sample of the join you are
> > running?
> >
> > Thanks,
> > Damian
> >
> > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu  wrote:
> >
> > Hi Kafka people,
> >
> > I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> > 0.10.1 release candidate.
> >
> > It runs ok for a few thousand of messages, and then it dies with the
> > following exception:
> >
> > Exception in thread "StreamThread-1" java.lang.NullPointerException
> > at
> >
> > org.apache.kafka.streams.state.internals.NamedCache.
> evict(NamedCache.java:194)
> > at
> >
> > org.apache.kafka.streams.state.internals.ThreadCache.
> maybeEvict(ThreadCache.java:190)
> > at
> >
> > org.apache.kafka.streams.state.internals.ThreadCache.
> put(ThreadCache.java:121)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> CachingKeyValueStore.java:147)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> CachingKeyValueStore.java:134)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableReduce$
> KTableAggregateValueGetter.get(KTableReduce.java:121)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableFilter$
> KTableFilterProcessor.process(KTableFilter.java:83)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableFilter$
> KTableFilterProcessor.process(KTableFilter.java:73)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableFilter$
> KTableFilterProcessor.process(KTableFilter.java:83)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableFilter$
> KTableFilterProcessor.process(KTableFilter.java:73)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
> KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
> KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
> KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
> KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:82)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.
> apply(ForwardingCacheFlushListener.java:35)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> maybeForward(CachingKeyValueStore.java:97)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> 000(CachingKeyValueStore.java:34)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> CachingKeyValueStore.java:84)
> > at
> >
> > org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:117)
> > at
> >
> > org.apache.kafka.streams.state.internals.NamedCache.
> evict(NamedCache.java:196)
> > at
> >
> > org.apache.kafka.streams.state.internals.ThreadCache.
> maybeEvict(ThreadCache.java:190)
> > at
> >
> > org.apache.kafka.streams.state.internals.ThreadCache.
> put(ThreadCache.java:121)
> > 

Re: Occasional NPE in NamedCache

2016-10-13 Thread Damian Guy
Hi, i believe i found the problem. If possible could you please try with
this: https://github.com/dguy/kafka/tree/cache-bug

Thanks,
Damian

On Thu, 13 Oct 2016 at 17:46 Damian Guy  wrote:

> Hi Frank,
>
> Thanks for reporting. Can you provide a sample of the join you are
> running?
>
> Thanks,
> Damian
>
> On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu  wrote:
>
> Hi Kafka people,
>
> I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> 0.10.1 release candidate.
>
> It runs ok for a few thousand of messages, and then it dies with the
> following exception:
>
> Exception in thread "StreamThread-1" java.lang.NullPointerException
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52)
> at
>
> 

Re: Occasional NPE in NamedCache

2016-10-13 Thread Damian Guy
Hi Frank,

Thanks for reporting. Can you provide a sample of the join you are running?

Thanks,
Damian

On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu  wrote:

> Hi Kafka people,
>
> I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> 0.10.1 release candidate.
>
> It runs ok for a few thousand of messages, and then it dies with the
> following exception:
>
> Exception in thread "StreamThread-1" java.lang.NullPointerException
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
>
>