Hello Nitay, I have not heard someone else reporting similar things that may point to a bug still.. Maybe you could try to reproduce the issue by first starting a brand new app in 2.5, and then follow the upgrade path (with config overrides) to 2.6 and see if it is easily reproducible, and if yes create a JIRA ticket?
Guozhang On Wed, Jun 23, 2021 at 3:50 PM Nitay Kufert <nita...@is.com.invalid> wrote: > Bumping for the off chance that during this time some sort of a bug was > reported that might explain this behaviour.. i will feel more comfortable > bumping our kafka versions this way :) > > On Wed, Feb 24, 2021 at 12:48 PM Nitay Kufert <nita...@ironsrc.com> wrote: > > > I guess it's possible but very unlikely because it works perfectly with > > all the previous versions and the current one? (2.5.1) > > Why did a change in the version introduce NULLS there? > > > > On Tue, Feb 23, 2021 at 9:16 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Is it possible that the flattened values contain `null` and hence > >> `_.split` > >> throws? > >> > >> On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert <nita...@ironsrc.com> > wrote: > >> > >> > Hey, missed your replay - but the code i've shared above the logs is > the > >> > code around those lines (removed some identifiers to make it a little > >> bit > >> > more generic): > >> > > >> > > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k -> > >> > > ${v.printForDebug}")}) # return type KStream[Windowed[String], > >> > > SingleInputMessage] > >> > > >> > > >> > On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > > >> > > Could you share your code around > >> > > > >> > > > > >> > > > >> > > > >> > > >> > com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91) > >> > > > >> > > That seems to be where NPE is thrown. > >> > > > >> > > > >> > > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <nita...@ironsrc.com> > >> > wrote: > >> > > > >> > > > Hey, > >> > > > *Without any code change*, just by bumping the kafka version from > >> 2.5.1 > >> > > to > >> > > > 2.6.1 (clients only) - my stream application started throwing > >> > > > NullPointerException (sometimes, not in a predicted pattern). > >> > > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" > >> conf > >> > > > that was forgotten there from the older versions. > >> > > > > >> > > > We are using Scala 2.12, and the line that throws this exception > is > >> > using > >> > > > flatMapValues: > >> > > > > >> > > > > >> > > > > inputStream.flatMapValues(_.split) # return type > >> > > > > KStream[Windowed[String], SingleInputMessage] > >> > > > > >> > > > > >> > > > Where inputStream is of type: KStream[Windowed[String], > >> InputMessage] > >> > and > >> > > > the split method splits this InputMessage into several > >> > > > SingleInputMessage messages (hence the flat - to avoid > >> > > > List[SingleInputMessage]). > >> > > > > >> > > > The exception: > >> > > > > >> > > > > java.lang.NullPointerException: null Wrapped by: > >> > > > > org.apache.kafka.streams.errors.StreamsException: Exception > >> caught in > >> > > > > process. taskId=2_2, > >> > processor=unique_input_message-repartition-source, > >> > > > > topic=service-unique_input_message-repartition, partition=2, > >> > > > > offset=318846738, stacktrace=java.lang.NullPointerException > >> > > > > > >> > > > > >> > > > java.lang.NullPointerException: null at > >> > > > > > >> > > > > >> > > > >> > > >> > com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678) > >> > > > > ... 4 common frames omitted Wrapped by: > >> > > > > org.apache.kafka.streams.errors.StreamsException: Exception > >> caught in > >> > > > > process. taskId=2_2, > >> > processor=unique_input_message-repartition-source, > >> > > > > topic=service-unique_input_message-repartition, partition=2, > >> > > > > offset=318846738, stacktrace=java.lang.NullPointerException at > >> > > > > > >> > > > > >> > > > >> > > >> > com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > >> > > > > at > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > >> > > > > > >> > > > > >> > > > * 2nd line of the exception is because we are using Scala > >> > > > (FunctionsCompatConversions.scala:62) > >> > > > > >> > > > > implicit class FlatValueMapperFromFunction[V, VR](val f: V => > >> > > > > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V, > >> > > > > JIterable[VR]] = (value: V) => f(value).*asJava* } > >> > > > > > >> > > > > >> > > > The main thing here is that we didn't change anything in the app > >> code.. > >> > > so > >> > > > i wonder if it's a new bug OR our implementation bug that somehow > >> > didn't > >> > > > happen in 2.5.1 (or previous versions, since this logic is old) > >> > > > > >> > > > Thanks and let me know what else can help (i wish i knew how to > >> > > reproduce, > >> > > > it happened 6 times during the last day and no idea why.. i'll try > >> to > >> > > catch > >> > > > it with logs) > >> > > > > >> > > > -- > >> > > > > >> > > > Nitay Kufert > >> > > > Backend Team Leader > >> > > > [image: ironSource] <http://www.ironsrc.com> > >> > > > > >> > > > email nita...@ironsrc.com > >> > > > mobile +972-54-5480021 > >> > > > fax +972-77-5448273 > >> > > > skype nitay.kufert.ssa > >> > > > 121 Menachem Begin St., Tel Aviv, Israel > >> > > > ironsrc.com <http://www.ironsrc.com> > >> > > > [image: linkedin] <https://www.linkedin.com/company/ironsource> > >> > [image: > >> > > > twitter] <https://twitter.com/ironsource> [image: facebook] > >> > > > <https://www.facebook.com/ironSource> [image: googleplus] > >> > > > <https://plus.google.com/+ironsrc> > >> > > > This email (including any attachments) is for the sole use of the > >> > > intended > >> > > > recipient and may contain confidential information which may be > >> > protected > >> > > > by legal privilege. If you are not the intended recipient, or the > >> > > employee > >> > > > or agent responsible for delivering it to the intended recipient, > >> you > >> > are > >> > > > hereby notified that any use, dissemination, distribution or > >> copying of > >> > > > this communication and/or its content is strictly prohibited. If > you > >> > are > >> > > > not the intended recipient, please immediately notify us by reply > >> email > >> > > or > >> > > > by telephone, delete this email and destroy any copies. Thank you. > >> > > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > > >> > -- > >> > > >> > Nitay Kufert > >> > Backend Team Leader > >> > [image: ironSource] <http://www.ironsrc.com> > >> > > >> > email nita...@ironsrc.com > >> > mobile +972-54-5480021 > >> > fax +972-77-5448273 > >> > skype nitay.kufert.ssa > >> > 121 Menachem Begin St., Tel Aviv, Israel > >> > ironsrc.com <http://www.ironsrc.com> > >> > [image: linkedin] <https://www.linkedin.com/company/ironsource> > [image: > >> > twitter] <https://twitter.com/ironsource> [image: facebook] > >> > <https://www.facebook.com/ironSource> [image: googleplus] > >> > <https://plus.google.com/+ironsrc> > >> > This email (including any attachments) is for the sole use of the > >> intended > >> > recipient and may contain confidential information which may be > >> protected > >> > by legal privilege. If you are not the intended recipient, or the > >> employee > >> > or agent responsible for delivering it to the intended recipient, you > >> are > >> > hereby notified that any use, dissemination, distribution or copying > of > >> > this communication and/or its content is strictly prohibited. If you > are > >> > not the intended recipient, please immediately notify us by reply > email > >> or > >> > by telephone, delete this email and destroy any copies. Thank you. > >> > > >> > >> > >> -- > >> -- Guozhang > >> > > > > > > -- > > > > Nitay Kufert > > Backend Team Leader > > [image: ironSource] <http://www.ironsrc.com> > > > > email nita...@ironsrc.com > > mobile +972-54-5480021 > > fax +972-77-5448273 > > skype nitay.kufert.ssa > > 121 Menachem Begin St., Tel Aviv, Israel > > ironsrc.com <http://www.ironsrc.com> > > [image: linkedin] <https://www.linkedin.com/company/ironsource> [image: > > twitter] <https://twitter.com/ironsource> [image: facebook] > > <https://www.facebook.com/ironSource> [image: googleplus] > > <https://plus.google.com/+ironsrc> > > This email (including any attachments) is for the sole use of the > intended > > recipient and may contain confidential information which may be protected > > by legal privilege. If you are not the intended recipient, or the > employee > > or agent responsible for delivering it to the intended recipient, you are > > hereby notified that any use, dissemination, distribution or copying of > > this communication and/or its content is strictly prohibited. If you are > > not the intended recipient, please immediately notify us by reply email > or > > by telephone, delete this email and destroy any copies. Thank you. > > > -- -- Guozhang