JoshRosen opened a new pull request, #37260:
URL: https://github.com/apache/spark/pull/37260
### What changes were proposed in this pull request?
This PR fixes a race condition in `RocksDBLoader.loadLibrary()`, which can
occur if the thread which calls that method is interrupted.
One of our jobs experienced a failure in `RocksDBLoader`:
```
Caused by: java.lang.IllegalThreadStateException
at java.lang.Thread.start(Thread.java:708)
at
org.apache.spark.sql.execution.streaming.state.RocksDBLoader$.loadLibrary(RocksDBLoader.scala:51)
```
After investigation, we determined that this was due to task
cancellation/interruption: if the task which starts the RocksDB library loading
is interrupted, another thread may begin a load and crash with the thread state
exception:
- Although the `loadLibraryThread` child thread is is uninterruptible, the
task thread which calls loadLibrary is still interruptible.
- Let's say we have two tasks, A and B, both of which will call
`RocksDBLoader.loadLibrary()`
- Say that Task A wins the race to perform the load and enters the
`synchronized` block in `loadLibrary()`, starts the `loadLibraryThread`, then
blocks in the `loadLibraryThread.join()` call.
- If Task A is interrupted, an `InterruptedException` will be thrown and it
will exit the loadLibrary synchronized block.
- At this point, Task B enters the synchronized block of its `loadLibrary()
call and sees that `exception == null` because the `loadLibraryThread` started
by the other task is still running, so Task B calls `loadLibraryThread.start()`
and hits the thread state error because it tries to start an already-started
thread.
This PR fixes this issue by adding code to check `loadLibraryThread`'s state
before calling `start()`: if the thread has already been started then we will
skip the `start()` call and proceed directly to the `join()`. I also modified
the logging so that we can detect when this case occurs.
### Why are the changes needed?
Fix a bug that can lead to task or job failures.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
I reproduced the original race condition by adding a `Thread.sleep(10000)`
to `loadLibraryThread.run()` (so it wouldn't complete instantly), then ran
```scala
test("multi-threaded RocksDBLoader calls with interruption") {
val taskThread = new Thread("interruptible Task Thread 1") {
override def run(): Unit = {
RocksDBLoader.loadLibrary()
}
}
taskThread.start()
// Give the thread time to enter the `loadLibrary()` call:
Thread.sleep(1000)
taskThread.interrupt()
// Check that the load hasn't finished:
assert(RocksDBLoader.exception == null)
assert(RocksDBLoader.loadLibraryThread.getState != Thread.State.NEW)
// Simulate the second task thread starting the load:
RocksDBLoader.loadLibrary()
// The load should finish successfully:
RocksDBLoader.exception.isEmpty
}
```
This test failed prior to my changes and succeeds afterwards.
I don't want to actually commit this test because I'm concerned about
flakiness and false-negatives: in order to ensure that the test would have
failed before my change, we need to carefully control the thread interleaving.
This code rarely changes and is relatively simple, so I think the ROI on
spending time to write and commit a reliable test is low.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]