[ https://issues.apache.org/jira/browse/KAFKA-16355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17952553#comment-17952553 ]
Mickael Maison commented on KAFKA-16355: ---------------------------------------- Since we've not heard from [~ksolves.kafka] in a while, I'll unassign them, so if anybody else is interested they can pick it up. > ConcurrentModificationException in > InMemoryTimeOrderedKeyValueBuffer.evictWhile > ------------------------------------------------------------------------------- > > Key: KAFKA-16355 > URL: https://issues.apache.org/jira/browse/KAFKA-16355 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.1 > Reporter: Mickael Maison > Assignee: Ksolves India Limited > Priority: Major > > While a Streams application was restoring its state after an outage, it hit > the following: > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_16, processor=KSTREAM-SOURCE-0000000000, topic=<TOPIC>, > partition=16, offset=454875695, > stacktrace=java.util.ConcurrentModificationException > at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507) > at > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158) > at > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252) > at > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302) > at > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200) > at > org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201) > at > org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) > at > org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) > at > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:767) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > That specific application is running 3.5.1, but looking at > InMemoryTimeOrderedKeyValueBuffer.evictWhile(), it seems the code has not > changed much since then so it may still happen. -- This message was sent by Atlassian Jira (v8.20.10#820010)