[ 
https://issues.apache.org/jira/browse/KAFKA-10262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17156093#comment-17156093
 ] 

Matthias J. Sax commented on KAFKA-10262:
-----------------------------------------

{quote}I've only confirmed that this affects 2.6 so far, but the unsafe methods 
are present in earlier versions. It's possible we made the problem worse 
somehow during "The Refactor" so that it's easier to hit this race condition.
{quote}
I looked into this, and we call {{directoryForTask}} only in one place: in the 
constructor of {{ProcessorStateManager}} that is created when a task is 
created. -- Thus, there is no race condition because a thread should always 
first revoke a task before the new thread creates it.

The difference to 2.6 is that there we call {{directoryForTask}} on some more 
places introducing the race condition.

> StateDirectory is not thread-safe
> ---------------------------------
>
>                 Key: KAFKA-10262
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10262
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Sophie Blee-Goldman
>            Assignee: Matthias J. Sax
>            Priority: Blocker
>
> As explicitly stated in the StateDirectory javadocs,  "This class is not 
> thread-safe."
> Despite this, a single StateDirectory is shared among all the StreamThreads 
> of a client. Some of the more "dangerous" methods are indeed synchronized, 
> but others are not. For example, the innocent-sounding #directoryForTask is 
> not thread-safe and is called in a number of places. We call it during task 
> creation, and we call it during task closure (through StateDirectory#lock). 
> It's not uncommon for one thread to be closing a task while another is 
> creating it after a rebalance.
> In fact, we saw exactly that happen in our test application. This ultimately 
> lead to the following exception
>  
> {code:java}
> org.apache.kafka.streams.errors.ProcessorStateException: task directory 
> [/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be 
> created at 
> org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:187)
>  at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337)
> {code}
>  
> The exception arises from this line in StateDirectory#directoryForTask:
> {code:java}
> if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) 
> {code}
> Presumably, if the taskDir did not exist when the two threads began this 
> method, then they would both attempt to create the directory. One of them 
> will get there first, leaving the other to return unsuccessfully from mkdir 
> and ultimately throw the above ProcessorStateException.
> I've only confirmed that this affects 2.6 so far, but the unsafe methods are 
> present in earlier versions. It's possible we made the problem worse somehow 
> during "The Refactor" so that it's easier to hit this race condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to