[ 
https://issues.apache.org/jira/browse/KAFKA-10262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10262:
------------------------------------
    Priority: Blocker  (was: Major)

> StateDirectory is not thread-safe
> ---------------------------------
>
>                 Key: KAFKA-10262
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10262
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Sophie Blee-Goldman
>            Assignee: Matthias J. Sax
>            Priority: Blocker
>
> As explicitly stated in the StateDirectory javadocs,  "This class is not 
> thread-safe."
> Despite this, a single StateDirectory is shared among all the StreamThreads 
> of a client. Some of the more "dangerous" methods are indeed synchronized, 
> but others are not. For example, the innocent-sounding #directoryForTask is 
> not thread-safe and is called in a number of places. We call it during task 
> creation, and we call it during task closure (through StateDirectory#lock). 
> It's not uncommon for one thread to be closing a task while another is 
> creating it after a rebalance.
> In fact, we saw exactly that happen in our test application. This ultimately 
> lead to the following exception
>  
> {code:java}
> org.apache.kafka.streams.errors.ProcessorStateException: task directory 
> [/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be 
> created at 
> org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:187)
>  at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337)
> {code}
>  
> The exception arises from this line in StateDirectory#directoryForTask:
> {code:java}
> if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) 
> {code}
> Presumably, if the taskDir did not exist when the two threads began this 
> method, then they would both attempt to create the directory. One of them 
> will get there first, leaving the other to return unsuccessfully from mkdir 
> and ultimately throw the above ProcessorStateException.
> I've only confirmed that this affects 2.6 so far, but the unsafe methods are 
> present in earlier versions. It's possible we made the problem worse somehow 
> during "The Refactor" so that it's easier to hit this race condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to