BTW this is tracked and resolved as
https://issues.apache.org/jira/browse/KAFKA-4300.

On Thu, Oct 13, 2016 at 1:17 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Thanks Frank for reporting the bug, and many thanks to Damian for the
> quick catch!
>
> On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu <flya...@gmail.com> wrote:
>
>> The issue seems to be gone. Amazing work, thanks...!
>>
>> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy <damian....@gmail.com> wrote:
>>
>> > Hi, i believe i found the problem. If possible could you please try with
>> > this: https://github.com/dguy/kafka/tree/cache-bug
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Thu, 13 Oct 2016 at 17:46 Damian Guy <damian....@gmail.com> wrote:
>> >
>> > > Hi Frank,
>> > >
>> > > Thanks for reporting. Can you provide a sample of the join you are
>> > > running?
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu <flya...@gmail.com> wrote:
>> > >
>> > > Hi Kafka people,
>> > >
>> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the
>> Kafka
>> > > 0.10.1 release candidate.
>> > >
>> > > It runs ok for a few thousand of messages, and then it dies with the
>> > > following exception:
>> > >
>> > > Exception in thread "StreamThread-1" java.lang.NullPointerException
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.NamedCache.
>> > evict(NamedCache.java:194)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.ThreadCache.
>> > maybeEvict(ThreadCache.java:190)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.ThreadCache.
>> > put(ThreadCache.java:121)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
>> > CachingKeyValueStore.java:147)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
>> > CachingKeyValueStore.java:134)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
>> > KTableAggregateValueGetter.get(KTableReduce.java:121)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
>> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
>> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:83)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:83)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
>> > KTableFilterProcessor.process(KTableFilter.java:73)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
>> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
>> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
>> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
>> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.ForwardingCacheFl
>> ushListener.
>> > apply(ForwardingCacheFlushListener.java:35)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.
>> > maybeForward(CachingKeyValueStore.java:97)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
>> > 000(CachingKeyValueStore.java:34)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStor
>> e$1.apply(
>> > CachingKeyValueStore.java:84)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.NamedCache.
>> > flush(NamedCache.java:117)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.NamedCache.
>> > evict(NamedCache.java:196)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.ThreadCache.
>> > maybeEvict(ThreadCache.java:190)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.ThreadCache.
>> > put(ThreadCache.java:121)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(
>> > CachingKeyValueStore.java:187)
>> > > at
>> > >
>> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(
>> > CachingKeyValueStore.java:182)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
>> > KTableReduceProcessor.process(KTableReduce.java:92)
>> > > at
>> > >
>> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
>> > KTableReduceProcessor.process(KTableReduce.java:52)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > SourceNode.process(SourceNode.java:66)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > StreamTask.process(StreamTask.java:177)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > StreamThread.java:427)
>> > > at
>> > >
>> > > org.apache.kafka.streams.processor.internals.
>> > StreamThread.run(StreamThread.java:235)
>> > >
>> > > I know this isn't a great bug report, as I can't seem to reproduce
>> this
>> > in
>> > > a more sandboxed situation. Any tips / ideas for further steps?
>> > >
>> > >
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to