[ 
https://issues.apache.org/jira/browse/KAFKA-10262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10262:
----------------------------------------
    Description: 
As explicitly stated in the StateDirectory javadocs,  "This class is not 
thread-safe."

Despite this, a single StateDirectory is shared among all the StreamThreads of 
a client. Some of the more "dangerous" methods are indeed synchronized, but 
others are not. For example, the innocent-sounding #directoryForTask is not 
thread-safe and is called in a number of places. We call it during task 
creation, and we call it during task closure (through StateDirectory#lock). 
It's not uncommon for one thread to be closing a task while another is creating 
it after a rebalance.

In fact, we saw exactly that happen in our test application. This ultimately 
lead to the following exception

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: task directory 
[/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be 
created at 
org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
 at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:187)
 at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337)
{code}
 

The exception arises from this line in StateDirectory#directoryForTask:
{code:java}
if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) 
{code}
Presumably, if the taskDir did not exist when the two threads began this 
method, then they would both attempt to create the directory. One of them will 
get there first, leaving the other to return unsuccessfully from mkdir and 
ultimately throw the above ProcessorStateException.

I've only confirmed that this affects 2.6 so far, but the unsafe methods are 
present in earlier versions. It's possible we made the problem worse somehow 
during "The Refactor" so that it's easier to hit this race condition.

  was:
As explicitly stated in the StateDirectory javadocs,  "This class is not 
thread-safe."

Despite this, a single StateDirectory is shared among all the StreamThreads of 
a client. Some of the more "dangerous" methods are indeed synchronized, but 
others are not. For example, the innocent-sounding #directoryForTask is not 
thread-safe and is called in a number of places. We call it during task 
creation, and we call it during task closure when we check 
`directoryForTaskIsEmpty`. It's not uncommon for one thread to be closing a 
task while another is creating it after a rebalance.

In fact, we saw exactly that happen in our test application. This ultimately 
lead to the following exception

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: task directory 
[/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be 
created at 
org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
 at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:187)
 at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337)
{code}
 

The exception arises from this line in StateDirectory#directoryForTask:
{code:java}
if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) 
{code}
Presumably, if the taskDir did not exist when the two threads began this 
method, then they would both attempt to create the directory. One of them will 
get there first, leaving the other to return unsuccessfully from mkdir and 
ultimately throw the above ProcessorStateException.

I've only confirmed that this affects 2.6 so far, but the unsafe methods are 
present in earlier versions. It's possible we made the problem worse somehow 
during "The Refactor" so that it's easier to hit this race condition.


> StateDirectory is not thread-safe
> ---------------------------------
>
>                 Key: KAFKA-10262
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10262
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Sophie Blee-Goldman
>            Priority: Major
>
> As explicitly stated in the StateDirectory javadocs,  "This class is not 
> thread-safe."
> Despite this, a single StateDirectory is shared among all the StreamThreads 
> of a client. Some of the more "dangerous" methods are indeed synchronized, 
> but others are not. For example, the innocent-sounding #directoryForTask is 
> not thread-safe and is called in a number of places. We call it during task 
> creation, and we call it during task closure (through StateDirectory#lock). 
> It's not uncommon for one thread to be closing a task while another is 
> creating it after a rebalance.
> In fact, we saw exactly that happen in our test application. This ultimately 
> lead to the following exception
>  
> {code:java}
> org.apache.kafka.streams.errors.ProcessorStateException: task directory 
> [/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be 
> created at 
> org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:187)
>  at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337)
> {code}
>  
> The exception arises from this line in StateDirectory#directoryForTask:
> {code:java}
> if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) 
> {code}
> Presumably, if the taskDir did not exist when the two threads began this 
> method, then they would both attempt to create the directory. One of them 
> will get there first, leaving the other to return unsuccessfully from mkdir 
> and ultimately throw the above ProcessorStateException.
> I've only confirmed that this affects 2.6 so far, but the unsafe methods are 
> present in earlier versions. It's possible we made the problem worse somehow 
> during "The Refactor" so that it's easier to hit this race condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to