[jira] [Commented] (KAFKA-10616) StreamThread killed by "IllegalStateException: The processor is already closed"

2020-10-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217813#comment-17217813
 ] 

Guozhang Wang commented on KAFKA-10616:
---

I think this is a long lurking bug caused by 
https://github.com/apache/kafka/pull/9083.

Generally speaking, before we close the topology we should always try to flush 
the state store manager (we actually do not really need to flush the whole 
store, but just the cache). I have this fix piggy-backed in my other PR 
https://github.com/apache/kafka/pull/8988 but it is dragging very long and 
every 3-4 days I'd have to rebase it again, so I do not feel that it could be 
merged soon.

What I can do is to extract that fix along from the PR and merge it for 2.7 / 
2.6.

> StreamThread killed by "IllegalStateException: The processor is already 
> closed"
> ---
>
> Key: KAFKA-10616
> URL: https://issues.apache.org/jira/browse/KAFKA-10616
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Application is hitting "java.lang.IllegalStateException: The processor is 
> already closed". Over the course of about a day, this exception killed 21/100 
> of the queries (StreamThreads). The (slightly trimmed) stacktrace:
>  
> {code:java}
> java.lang.RuntimeException: Caught an exception while closing caching window 
> store for store Aggregate-Aggregate-Materialize at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626)
>  … Caused by: java.lang.IllegalStateException: The processor is already 
> closed at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
>  at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>  at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427)
>  at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426)
> {code}
>  
> I'm guessing we close the topology before closing the state states, so 
> records that get flushed during the caching store's close() will run into an 
> already-closed processor. During a clean close we should always flush before 
> closing anything (during prepareCommit()), but since this was a 
> handleLostAll() we would just skip right to suspend() and close the topology.
> Presumably the right thing to do here is to flush the caches before closing 
> anything during a dirty close.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10616) StreamThread killed by "IllegalStateException: The processor is already closed"

2020-10-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217271#comment-17217271
 ] 

Sagar Rao commented on KAFKA-10616:
---

Alright thanks [~ableegoldman], Let me know if we would need a separate PR for 
this.. We might need another issue to be create for that?

> StreamThread killed by "IllegalStateException: The processor is already 
> closed"
> ---
>
> Key: KAFKA-10616
> URL: https://issues.apache.org/jira/browse/KAFKA-10616
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Application is hitting "java.lang.IllegalStateException: The processor is 
> already closed". Over the course of about a day, this exception killed 21/100 
> of the queries (StreamThreads). The (slightly trimmed) stacktrace:
>  
> {code:java}
> java.lang.RuntimeException: Caught an exception while closing caching window 
> store for store Aggregate-Aggregate-Materialize at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626)
>  … Caused by: java.lang.IllegalStateException: The processor is already 
> closed at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
>  at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>  at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427)
>  at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426)
> {code}
>  
> I'm guessing we close the topology before closing the state states, so 
> records that get flushed during the caching store's close() will run into an 
> already-closed processor. During a clean close we should always flush before 
> closing anything (during prepareCommit()), but since this was a 
> handleLostAll() we would just skip right to suspend() and close the topology.
> Presumably the right thing to do here is to flush the caches before closing 
> anything during a dirty close.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10616) StreamThread killed by "IllegalStateException: The processor is already closed"

2020-10-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217261#comment-17217261
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10616:


By the way, we should actually fix this in 2.6 as well. There's no 
IllegalStateException killing the thread in that branch, but the underlying 
issue is still there: flushing records downstream to closed processors. AFAICT 
the only effect this has in the DSL operators is that these records would not 
be recorded in any metrics, but there could be more severe consequences if a 
PAPI user does something more interesting in the processor's close() method.

It's possible we'll need a separate PR for the 2.6 branch, depending on what 
the fix for 2.7/trunk looks like. [~sagarrao] if we do need a separate PR then 
feel free to pick it up

> StreamThread killed by "IllegalStateException: The processor is already 
> closed"
> ---
>
> Key: KAFKA-10616
> URL: https://issues.apache.org/jira/browse/KAFKA-10616
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Application is hitting "java.lang.IllegalStateException: The processor is 
> already closed". Over the course of about a day, this exception killed 21/100 
> of the queries (StreamThreads). The (slightly trimmed) stacktrace:
>  
> {code:java}
> java.lang.RuntimeException: Caught an exception while closing caching window 
> store for store Aggregate-Aggregate-Materialize at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626)
>  … Caused by: java.lang.IllegalStateException: The processor is already 
> closed at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
>  at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>  at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427)
>  at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426)
> {code}
>  
> I'm guessing we close the topology before closing the state states, so 
> records that get flushed during the caching store's close() will run into an 
> already-closed processor. During a clean close we should always flush before 
> closing anything (during prepareCommit()), but since this was a 
> handleLostAll() we would just skip right to suspend() and close the topology.
> Presumably the right thing to do here is to flush the caches before closing 
> anything during a dirty close.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10616) StreamThread killed by "IllegalStateException: The processor is already closed"

2020-10-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217258#comment-17217258
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10616:


I spoke with [~guozhang]  earlier and he said he already has a fix for this 
that he had to implement in some other work when he ran into the same issue. 
He's just going to cherrypick it into a new PR for this – sorry, forgot to 
assign the ticket to him 

> StreamThread killed by "IllegalStateException: The processor is already 
> closed"
> ---
>
> Key: KAFKA-10616
> URL: https://issues.apache.org/jira/browse/KAFKA-10616
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Application is hitting "java.lang.IllegalStateException: The processor is 
> already closed". Over the course of about a day, this exception killed 21/100 
> of the queries (StreamThreads). The (slightly trimmed) stacktrace:
>  
> {code:java}
> java.lang.RuntimeException: Caught an exception while closing caching window 
> store for store Aggregate-Aggregate-Materialize at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626)
>  … Caused by: java.lang.IllegalStateException: The processor is already 
> closed at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
>  at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>  at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427)
>  at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426)
> {code}
>  
> I'm guessing we close the topology before closing the state states, so 
> records that get flushed during the caching store's close() will run into an 
> already-closed processor. During a clean close we should always flush before 
> closing anything (during prepareCommit()), but since this was a 
> handleLostAll() we would just skip right to suspend() and close the topology.
> Presumably the right thing to do here is to flush the caches before closing 
> anything during a dirty close.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10616) StreamThread killed by "IllegalStateException: The processor is already closed"

2020-10-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217208#comment-17217208
 ] 

Sagar Rao commented on KAFKA-10616:
---

hey [~ableegoldman], Can I take this one up?

> StreamThread killed by "IllegalStateException: The processor is already 
> closed"
> ---
>
> Key: KAFKA-10616
> URL: https://issues.apache.org/jira/browse/KAFKA-10616
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Application is hitting "java.lang.IllegalStateException: The processor is 
> already closed". Over the course of about a day, this exception killed 21/100 
> of the queries (StreamThreads). The (slightly trimmed) stacktrace:
>  
> {code:java}
> java.lang.RuntimeException: Caught an exception while closing caching window 
> store for store Aggregate-Aggregate-Materialize at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626)
>  … Caused by: java.lang.IllegalStateException: The processor is already 
> closed at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
>  at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>  at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427)
>  at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426)
> {code}
>  
> I'm guessing we close the topology before closing the state states, so 
> records that get flushed during the caching store's close() will run into an 
> already-closed processor. During a clean close we should always flush before 
> closing anything (during prepareCommit()), but since this was a 
> handleLostAll() we would just skip right to suspend() and close the topology.
> Presumably the right thing to do here is to flush the caches before closing 
> anything during a dirty close.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)