GitHub user JoshRosen opened a pull request:
https://github.com/apache/spark/pull/12712
[SPARK-14930][SPARK-13693 Fix race condition in CheckpointWriter.stop()
CheckpointWriter.stop() is prone to a race condition: if one thread calls
`stop()` right as a checkpoint write task begins to execute, that write task
may become blocked when trying to access `fs`, the shared Hadoop FileSystem,
since both the `fs` getter and `stop` method synchronize on the same lock.
Here's a thread-dump excerpt which illustrates the problem:
```java
"pool-31-thread-1" #156 prio=5 os_prio=31 tid=0x00007fea02cd2000 nid=0x5c0b
waiting for monitor entry [0x000000013bc4c000]
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.spark.streaming.CheckpointWriter.org$apache$spark$streaming$CheckpointWriter$$fs(Checkpoint.scala:302)
- waiting to lock <0x00000007bf53ee78> (a
org.apache.spark.streaming.CheckpointWriter)
at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:224)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"pool-1-thread-1-ScalaTest-running-MapWithStateSuite" #11 prio=5 os_prio=31
tid=0x00007fe9ff879800 nid=0x5703 waiting on condition [0x000000012e54c000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007bf564568> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
at
org.apache.spark.streaming.CheckpointWriter.stop(Checkpoint.scala:291)
- locked <0x00000007bf53ee78> (a
org.apache.spark.streaming.CheckpointWriter)
at
org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:159)
- locked <0x00000007bf53ea90> (a
org.apache.spark.streaming.scheduler.JobGenerator)
at
org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:115)
- locked <0x00000007bf53d3f0> (a
org.apache.spark.streaming.scheduler.JobScheduler)
at
org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:680)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1219)
at
org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:679)
- locked <0x00000007bf516a70> (a
org.apache.spark.streaming.StreamingContext)
at
org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:644)
- locked <0x00000007bf516a70> (a
org.apache.spark.streaming.StreamingContext)
[...]
```
We can fix this problem by having `stop` and `fs` be synchronized on
different locks: the synchronization on `stop` only needs to guard against
multiple threads calling `stop` at the same time, whereas the synchronization
on `fs` is only necessary for cross-thread visibility. There's only ever a
single active checkpoint writer thread at a time, so we don't need to guard
against concurrent access to `fs`. Thus, `fs` can simply become a `@volatile`
var, similar to `lastCheckpointTime`.
This change should fix
[SPARK-13693](https://issues.apache.org/jira/browse/SPARK-13693), a flaky
`MapWithStateSuite` test suite which has recently been failing several times
per day. It also results in a huge test speedup: prior to this patch,
`MapWithStateSuite` took about 80 seconds to run, whereas it now runs in less
than 10 seconds. For the `streaming` project's tests as a whole, they now run
in ~220 seconds vs. ~354 before.
/cc @zsxwing and @tdas for review.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/JoshRosen/spark fix-checkpoint-writer-race
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/12712.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #12712
----
commit 61a388a62c3701dd305df12a6ff6af5a512e6cc7
Author: Josh Rosen <[email protected]>
Date: 2016-04-26T21:08:28Z
Fix race condition in CheckpointWriter shutdown.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]