[ 
https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217261#comment-17217261
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10616:
------------------------------------------------

By the way, we should actually fix this in 2.6 as well. There's no 
IllegalStateException killing the thread in that branch, but the underlying 
issue is still there: flushing records downstream to closed processors. AFAICT 
the only effect this has in the DSL operators is that these records would not 
be recorded in any metrics, but there could be more severe consequences if a 
PAPI user does something more interesting in the processor's close() method.

It's possible we'll need a separate PR for the 2.6 branch, depending on what 
the fix for 2.7/trunk looks like. [~sagarrao] if we do need a separate PR then 
feel free to pick it up

> StreamThread killed by "IllegalStateException: The processor is already 
> closed"
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-10616
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10616
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: Guozhang Wang
>            Priority: Blocker
>             Fix For: 2.7.0
>
>
> Application is hitting "java.lang.IllegalStateException: The processor is 
> already closed". Over the course of about a day, this exception killed 21/100 
> of the queries (StreamThreads). The (slightly trimmed) stacktrace:
>  
> {code:java}
> java.lang.RuntimeException: Caught an exception while closing caching window 
> store for store Aggregate-Aggregate-Materialize at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626)
>  … Caused by: java.lang.IllegalStateException: The processor is already 
> closed at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
>  at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
>  at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>  at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>  at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427)
>  at 
> org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28)
>  at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426)
> {code}
>  
> I'm guessing we close the topology before closing the state states, so 
> records that get flushed during the caching store's close() will run into an 
> already-closed processor. During a clean close we should always flush before 
> closing anything (during prepareCommit()), but since this was a 
> handleLostAll() we would just skip right to suspend() and close the topology.
> Presumably the right thing to do here is to flush the caches before closing 
> anything during a dirty close.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to