[jira] [Commented] (KAFKA-10616) StreamThread killed by "IllegalStateException: The processor is already closed"
[ https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217813#comment-17217813 ] Guozhang Wang commented on KAFKA-10616: --- I think this is a long lurking bug caused by https://github.com/apache/kafka/pull/9083. Generally speaking, before we close the topology we should always try to flush the state store manager (we actually do not really need to flush the whole store, but just the cache). I have this fix piggy-backed in my other PR https://github.com/apache/kafka/pull/8988 but it is dragging very long and every 3-4 days I'd have to rebase it again, so I do not feel that it could be merged soon. What I can do is to extract that fix along from the PR and merge it for 2.7 / 2.6. > StreamThread killed by "IllegalStateException: The processor is already > closed" > --- > > Key: KAFKA-10616 > URL: https://issues.apache.org/jira/browse/KAFKA-10616 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.7.0 > > > Application is hitting "java.lang.IllegalStateException: The processor is > already closed". Over the course of about a day, this exception killed 21/100 > of the queries (StreamThreads). The (slightly trimmed) stacktrace: > > {code:java} > java.lang.RuntimeException: Caught an exception while closing caching window > store for store Aggregate-Aggregate-Materialize at > org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527) > at > org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626) > … Caused by: java.lang.IllegalStateException: The processor is already > closed at > org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427) > at > org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426) > {code} > > I'm guessing we close the topology before closing the state states, so > records that get flushed during the caching store's close() will run into an > already-closed processor. During a clean close we should always flush before > closing anything (during prepareCommit()), but since this was a > handleLostAll() we would just skip right to suspend() and close the topology. > Presumably the right thing to do here is to flush the caches before closing > anything during a dirty close. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10616) StreamThread killed by "IllegalStateException: The processor is already closed"
[ https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217271#comment-17217271 ] Sagar Rao commented on KAFKA-10616: --- Alright thanks [~ableegoldman], Let me know if we would need a separate PR for this.. We might need another issue to be create for that? > StreamThread killed by "IllegalStateException: The processor is already > closed" > --- > > Key: KAFKA-10616 > URL: https://issues.apache.org/jira/browse/KAFKA-10616 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.7.0 > > > Application is hitting "java.lang.IllegalStateException: The processor is > already closed". Over the course of about a day, this exception killed 21/100 > of the queries (StreamThreads). The (slightly trimmed) stacktrace: > > {code:java} > java.lang.RuntimeException: Caught an exception while closing caching window > store for store Aggregate-Aggregate-Materialize at > org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527) > at > org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626) > … Caused by: java.lang.IllegalStateException: The processor is already > closed at > org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427) > at > org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426) > {code} > > I'm guessing we close the topology before closing the state states, so > records that get flushed during the caching store's close() will run into an > already-closed processor. During a clean close we should always flush before > closing anything (during prepareCommit()), but since this was a > handleLostAll() we would just skip right to suspend() and close the topology. > Presumably the right thing to do here is to flush the caches before closing > anything during a dirty close. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10616) StreamThread killed by "IllegalStateException: The processor is already closed"
[ https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217261#comment-17217261 ] A. Sophie Blee-Goldman commented on KAFKA-10616: By the way, we should actually fix this in 2.6 as well. There's no IllegalStateException killing the thread in that branch, but the underlying issue is still there: flushing records downstream to closed processors. AFAICT the only effect this has in the DSL operators is that these records would not be recorded in any metrics, but there could be more severe consequences if a PAPI user does something more interesting in the processor's close() method. It's possible we'll need a separate PR for the 2.6 branch, depending on what the fix for 2.7/trunk looks like. [~sagarrao] if we do need a separate PR then feel free to pick it up > StreamThread killed by "IllegalStateException: The processor is already > closed" > --- > > Key: KAFKA-10616 > URL: https://issues.apache.org/jira/browse/KAFKA-10616 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.7.0 > > > Application is hitting "java.lang.IllegalStateException: The processor is > already closed". Over the course of about a day, this exception killed 21/100 > of the queries (StreamThreads). The (slightly trimmed) stacktrace: > > {code:java} > java.lang.RuntimeException: Caught an exception while closing caching window > store for store Aggregate-Aggregate-Materialize at > org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527) > at > org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626) > … Caused by: java.lang.IllegalStateException: The processor is already > closed at > org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427) > at > org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426) > {code} > > I'm guessing we close the topology before closing the state states, so > records that get flushed during the caching store's close() will run into an > already-closed processor. During a clean close we should always flush before > closing anything (during prepareCommit()), but since this was a > handleLostAll() we would just skip right to suspend() and close the topology. > Presumably the right thing to do here is to flush the caches before closing > anything during a dirty close. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10616) StreamThread killed by "IllegalStateException: The processor is already closed"
[ https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217258#comment-17217258 ] A. Sophie Blee-Goldman commented on KAFKA-10616: I spoke with [~guozhang] earlier and he said he already has a fix for this that he had to implement in some other work when he ran into the same issue. He's just going to cherrypick it into a new PR for this – sorry, forgot to assign the ticket to him > StreamThread killed by "IllegalStateException: The processor is already > closed" > --- > > Key: KAFKA-10616 > URL: https://issues.apache.org/jira/browse/KAFKA-10616 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.7.0 > > > Application is hitting "java.lang.IllegalStateException: The processor is > already closed". Over the course of about a day, this exception killed 21/100 > of the queries (StreamThreads). The (slightly trimmed) stacktrace: > > {code:java} > java.lang.RuntimeException: Caught an exception while closing caching window > store for store Aggregate-Aggregate-Materialize at > org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527) > at > org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626) > … Caused by: java.lang.IllegalStateException: The processor is already > closed at > org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427) > at > org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426) > {code} > > I'm guessing we close the topology before closing the state states, so > records that get flushed during the caching store's close() will run into an > already-closed processor. During a clean close we should always flush before > closing anything (during prepareCommit()), but since this was a > handleLostAll() we would just skip right to suspend() and close the topology. > Presumably the right thing to do here is to flush the caches before closing > anything during a dirty close. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10616) StreamThread killed by "IllegalStateException: The processor is already closed"
[ https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217208#comment-17217208 ] Sagar Rao commented on KAFKA-10616: --- hey [~ableegoldman], Can I take this one up? > StreamThread killed by "IllegalStateException: The processor is already > closed" > --- > > Key: KAFKA-10616 > URL: https://issues.apache.org/jira/browse/KAFKA-10616 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.7.0 > > > Application is hitting "java.lang.IllegalStateException: The processor is > already closed". Over the course of about a day, this exception killed 21/100 > of the queries (StreamThreads). The (slightly trimmed) stacktrace: > > {code:java} > java.lang.RuntimeException: Caught an exception while closing caching window > store for store Aggregate-Aggregate-Materialize at > org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527) > at > org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626) > … Caused by: java.lang.IllegalStateException: The processor is already > closed at > org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427) > at > org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426) > {code} > > I'm guessing we close the topology before closing the state states, so > records that get flushed during the caching store's close() will run into an > already-closed processor. During a clean close we should always flush before > closing anything (during prepareCommit()), but since this was a > handleLostAll() we would just skip right to suspend() and close the topology. > Presumably the right thing to do here is to flush the caches before closing > anything during a dirty close. -- This message was sent by Atlassian Jira (v8.3.4#803005)