Re: Attempting to put a clean entry for key [...] into NamedCache [...] when it already contains a dirty entry for the same key
Hi Mathieu, if you are happy to share your code privately it would help. At the moment i'm struggling to see how we can get into this situation, so i think your topology would be useful. Thanks, Damian On Mon, 5 Dec 2016 at 16:34 Mathieu Fenniakwrote: > Hi Damian, > > Yes... I can see how most of the stack trace is rather meaningless. > Unfortunately I don't have a minimal test case, and I don't want to burden > you by dumping the entire application. (I could share it privately, if > you'd like.) > > Based upon the stack trace, the relevant pieces involved are a multi-table > join (KTable [leftJoin KTable]*20) that accumulates pieces of data into a > map; one of the joined tables is an aggregation (KTable -> filter -> > groupBy -> aggregate "TimesheetNonBillableHours") that would have the > affected cache. > > Mathieu > > > On Mon, Dec 5, 2016 at 8:36 AM, Damian Guy wrote: > > > Hi Mathieu, > > > > I'm trying to make sense of the rather long stack trace in the gist you > > provided. Can you possibly share your streams topology with us? > > > > Thanks, > > Damian > > > > On Mon, 5 Dec 2016 at 14:14 Mathieu Fenniak < > mathieu.fenn...@replicon.com> > > wrote: > > > > > Hi Eno, > > > > > > This exception occurred w/ trunk @ e43bbce (current as-of Saturday). I > > was > > > bit by KAFKA-4311 (I believe) when trying to upgrade to 0.10.1.0, so > with > > > that issue now resolved I thought I'd check trunk out to see if any > other > > > issues remain. > > > > > > Mathieu > > > > > > > > > On Sun, Dec 4, 2016 at 12:37 AM, Eno Thereska > > > wrote: > > > > > > > Hi Mathieu, > > > > > > > > What version of Kafka are you using? There was recently a fix that > went > > > > into trunk, just checking if you're using an older version. > > > > (to make forward progress you can turn the cache off, like this: > > > > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > > BUFFERING_CONFIG, > > > > 0); > > > > ) > > > > > > > > Thanks > > > > Eno > > > > > On 4 Dec 2016, at 03:47, Mathieu Fenniak < > > mathieu.fenn...@replicon.com > > > > > > > > wrote: > > > > > > > > > > Hey all, > > > > > > > > > > I've just been running a quick test of my kafka-streams application > > on > > > > the > > > > > latest Kafka trunk (@e43bbce), and came across this error. I was > > > > wondering > > > > > if anyone has seen this error before, have any thoughts on what > might > > > > cause > > > > > it, or can suggest a direction to investigate it further. > > > > > > > > > > Full exception: > > > > > https://gist.github.com/mfenniak/509fb82dfcfda79a21cfc1b07dafa89c > > > > > > > > > > java.lang.IllegalStateException: Attempting to put a clean entry > for > > > key > > > > > [urn:replicon-tenant:strprc971e3ca9:timesheet: > > 97c0ce25-e039-4e8b-9f2c- > > > > d43f0668b755] > > > > > into NamedCache [0_0-TimesheetNonBillableHours] when it already > > > > contains a > > > > > dirty entry for the same key > > > > > at > > > > > org.apache.kafka.streams.state.internals.NamedCache. > > > > put(NamedCache.java:124) > > > > > at > > > > > org.apache.kafka.streams.state.internals.ThreadCache. > > > > put(ThreadCache.java:120) > > > > > at > > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > > > CachingKeyValueStore.java:146) > > > > > at > > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > > > CachingKeyValueStore.java:133) > > > > > at > > > > > org.apache.kafka.streams.kstream.internals.KTableAggregate$ > > > > KTableAggregateValueGetter.get(KTableAggregate.java:128) > > > > > at > > > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > > > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:81) > > > > > at > > > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > > > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:54) > > > > > at > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > > > ProcessorNode.java:82) > > > > > ... more ... > > > > > > > > > > > > > >
Re: Attempting to put a clean entry for key [...] into NamedCache [...] when it already contains a dirty entry for the same key
Hi Damian, Yes... I can see how most of the stack trace is rather meaningless. Unfortunately I don't have a minimal test case, and I don't want to burden you by dumping the entire application. (I could share it privately, if you'd like.) Based upon the stack trace, the relevant pieces involved are a multi-table join (KTable [leftJoin KTable]*20) that accumulates pieces of data into a map; one of the joined tables is an aggregation (KTable -> filter -> groupBy -> aggregate "TimesheetNonBillableHours") that would have the affected cache. Mathieu On Mon, Dec 5, 2016 at 8:36 AM, Damian Guywrote: > Hi Mathieu, > > I'm trying to make sense of the rather long stack trace in the gist you > provided. Can you possibly share your streams topology with us? > > Thanks, > Damian > > On Mon, 5 Dec 2016 at 14:14 Mathieu Fenniak > wrote: > > > Hi Eno, > > > > This exception occurred w/ trunk @ e43bbce (current as-of Saturday). I > was > > bit by KAFKA-4311 (I believe) when trying to upgrade to 0.10.1.0, so with > > that issue now resolved I thought I'd check trunk out to see if any other > > issues remain. > > > > Mathieu > > > > > > On Sun, Dec 4, 2016 at 12:37 AM, Eno Thereska > > wrote: > > > > > Hi Mathieu, > > > > > > What version of Kafka are you using? There was recently a fix that went > > > into trunk, just checking if you're using an older version. > > > (to make forward progress you can turn the cache off, like this: > > > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > BUFFERING_CONFIG, > > > 0); > > > ) > > > > > > Thanks > > > Eno > > > > On 4 Dec 2016, at 03:47, Mathieu Fenniak < > mathieu.fenn...@replicon.com > > > > > > wrote: > > > > > > > > Hey all, > > > > > > > > I've just been running a quick test of my kafka-streams application > on > > > the > > > > latest Kafka trunk (@e43bbce), and came across this error. I was > > > wondering > > > > if anyone has seen this error before, have any thoughts on what might > > > cause > > > > it, or can suggest a direction to investigate it further. > > > > > > > > Full exception: > > > > https://gist.github.com/mfenniak/509fb82dfcfda79a21cfc1b07dafa89c > > > > > > > > java.lang.IllegalStateException: Attempting to put a clean entry for > > key > > > > [urn:replicon-tenant:strprc971e3ca9:timesheet: > 97c0ce25-e039-4e8b-9f2c- > > > d43f0668b755] > > > > into NamedCache [0_0-TimesheetNonBillableHours] when it already > > > contains a > > > > dirty entry for the same key > > > > at > > > > org.apache.kafka.streams.state.internals.NamedCache. > > > put(NamedCache.java:124) > > > > at > > > > org.apache.kafka.streams.state.internals.ThreadCache. > > > put(ThreadCache.java:120) > > > > at > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > > CachingKeyValueStore.java:146) > > > > at > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > > CachingKeyValueStore.java:133) > > > > at > > > > org.apache.kafka.streams.kstream.internals.KTableAggregate$ > > > KTableAggregateValueGetter.get(KTableAggregate.java:128) > > > > at > > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:81) > > > > at > > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:54) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > > ProcessorNode.java:82) > > > > ... more ... > > > > > > > > >
Re: Attempting to put a clean entry for key [...] into NamedCache [...] when it already contains a dirty entry for the same key
Hi Mathieu, I'm trying to make sense of the rather long stack trace in the gist you provided. Can you possibly share your streams topology with us? Thanks, Damian On Mon, 5 Dec 2016 at 14:14 Mathieu Fenniakwrote: > Hi Eno, > > This exception occurred w/ trunk @ e43bbce (current as-of Saturday). I was > bit by KAFKA-4311 (I believe) when trying to upgrade to 0.10.1.0, so with > that issue now resolved I thought I'd check trunk out to see if any other > issues remain. > > Mathieu > > > On Sun, Dec 4, 2016 at 12:37 AM, Eno Thereska > wrote: > > > Hi Mathieu, > > > > What version of Kafka are you using? There was recently a fix that went > > into trunk, just checking if you're using an older version. > > (to make forward progress you can turn the cache off, like this: > > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > > 0); > > ) > > > > Thanks > > Eno > > > On 4 Dec 2016, at 03:47, Mathieu Fenniak > > > wrote: > > > > > > Hey all, > > > > > > I've just been running a quick test of my kafka-streams application on > > the > > > latest Kafka trunk (@e43bbce), and came across this error. I was > > wondering > > > if anyone has seen this error before, have any thoughts on what might > > cause > > > it, or can suggest a direction to investigate it further. > > > > > > Full exception: > > > https://gist.github.com/mfenniak/509fb82dfcfda79a21cfc1b07dafa89c > > > > > > java.lang.IllegalStateException: Attempting to put a clean entry for > key > > > [urn:replicon-tenant:strprc971e3ca9:timesheet:97c0ce25-e039-4e8b-9f2c- > > d43f0668b755] > > > into NamedCache [0_0-TimesheetNonBillableHours] when it already > > contains a > > > dirty entry for the same key > > > at > > > org.apache.kafka.streams.state.internals.NamedCache. > > put(NamedCache.java:124) > > > at > > > org.apache.kafka.streams.state.internals.ThreadCache. > > put(ThreadCache.java:120) > > > at > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > CachingKeyValueStore.java:146) > > > at > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > CachingKeyValueStore.java:133) > > > at > > > org.apache.kafka.streams.kstream.internals.KTableAggregate$ > > KTableAggregateValueGetter.get(KTableAggregate.java:128) > > > at > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:81) > > > at > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:54) > > > at > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > ... more ... > > > > >
Re: Attempting to put a clean entry for key [...] into NamedCache [...] when it already contains a dirty entry for the same key
Hi Eno, This exception occurred w/ trunk @ e43bbce (current as-of Saturday). I was bit by KAFKA-4311 (I believe) when trying to upgrade to 0.10.1.0, so with that issue now resolved I thought I'd check trunk out to see if any other issues remain. Mathieu On Sun, Dec 4, 2016 at 12:37 AM, Eno Thereskawrote: > Hi Mathieu, > > What version of Kafka are you using? There was recently a fix that went > into trunk, just checking if you're using an older version. > (to make forward progress you can turn the cache off, like this: > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0); > ) > > Thanks > Eno > > On 4 Dec 2016, at 03:47, Mathieu Fenniak > wrote: > > > > Hey all, > > > > I've just been running a quick test of my kafka-streams application on > the > > latest Kafka trunk (@e43bbce), and came across this error. I was > wondering > > if anyone has seen this error before, have any thoughts on what might > cause > > it, or can suggest a direction to investigate it further. > > > > Full exception: > > https://gist.github.com/mfenniak/509fb82dfcfda79a21cfc1b07dafa89c > > > > java.lang.IllegalStateException: Attempting to put a clean entry for key > > [urn:replicon-tenant:strprc971e3ca9:timesheet:97c0ce25-e039-4e8b-9f2c- > d43f0668b755] > > into NamedCache [0_0-TimesheetNonBillableHours] when it already > contains a > > dirty entry for the same key > > at > > org.apache.kafka.streams.state.internals.NamedCache. > put(NamedCache.java:124) > > at > > org.apache.kafka.streams.state.internals.ThreadCache. > put(ThreadCache.java:120) > > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > CachingKeyValueStore.java:146) > > at > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > CachingKeyValueStore.java:133) > > at > > org.apache.kafka.streams.kstream.internals.KTableAggregate$ > KTableAggregateValueGetter.get(KTableAggregate.java:128) > > at > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:81) > > at > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:54) > > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:82) > > ... more ... > >
Re: Attempting to put a clean entry for key [...] into NamedCache [...] when it already contains a dirty entry for the same key
Hi Mathieu, What version of Kafka are you using? There was recently a fix that went into trunk, just checking if you're using an older version. (to make forward progress you can turn the cache off, like this: streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); ) Thanks Eno > On 4 Dec 2016, at 03:47, Mathieu Fenniakwrote: > > Hey all, > > I've just been running a quick test of my kafka-streams application on the > latest Kafka trunk (@e43bbce), and came across this error. I was wondering > if anyone has seen this error before, have any thoughts on what might cause > it, or can suggest a direction to investigate it further. > > Full exception: > https://gist.github.com/mfenniak/509fb82dfcfda79a21cfc1b07dafa89c > > java.lang.IllegalStateException: Attempting to put a clean entry for key > [urn:replicon-tenant:strprc971e3ca9:timesheet:97c0ce25-e039-4e8b-9f2c-d43f0668b755] > into NamedCache [0_0-TimesheetNonBillableHours] when it already contains a > dirty entry for the same key > at > org.apache.kafka.streams.state.internals.NamedCache.put(NamedCache.java:124) > at > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:146) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:133) > at > org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateValueGetter.get(KTableAggregate.java:128) > at > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:81) > at > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:54) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ... more ...