GitHub user JoshRosen opened a pull request:
https://github.com/apache/spark/pull/16866
[SPARK-19529] TransportClientFactory.createClient() shouldn't call
awaitUninterruptibly()
## What changes were proposed in this pull request?
This patch replaces a single `awaitUninterruptibly()` call with a plain
`await()` call in Spark's common network layer in order to fix a bug which may
cause tasks to be uncancellable.
In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls
`awaitUninterruptibly()` on a Netty future while waiting for a connection to be
established. This creates problem when a Spark task is interrupted while
blocking in this call (which can happen in the event of a slow connection which
will eventually time out). This has bad impacts on task cancellation when
`interruptOnCancel = true`.
As an example of the impact of this problem, I experienced significant
numbers of uncancellable "zombie tasks" on a production cluster where several
tasks were blocked trying to connect to a dead shuffle server and then
continued running as zombies after I cancelled the associated Spark stage. The
zombie tasks ran for several minutes with the following stack:
```
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:460)
io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607)
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
=> holding Monitor(java.lang.Object@1849476028})
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
350)
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[...]
```
As far as I can tell, `awaitUninterruptibly()` might have been used in
order to avoid having to declare that methods throw `InterruptedException`
(this code is written in Java, hence the need to use checked exceptions). This
patch simply replaces this with a regular, interruptible `await()` call,.
This required several interface changes to declare a new checked exception
(these are internal interfaces, though, and this change doesn't significantly
impact binary compatibility).
An alternative approach would be to wrap `InterruptedException` into
`IOException` in order to avoid having to change interfaces. The problem with
this approach is that the `network-shuffle` project's `RetryingBlockFetcher`
code treats `IOExceptions` as transitive failures when deciding whether to
retry fetches, so throwing a wrapped `IOException` might cause an interrupted
shuffle fetch to be retried, further prolonging the lifetime of a cancelled
zombie task.
## How was this patch tested?
Manually.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/JoshRosen/spark SPARK-19529
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/16866.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #16866
----
commit c1c4553e32826453ed39eaaefd1cd92ef0e36382
Author: Josh Rosen <[email protected]>
Date: 2017-02-09T07:25:29Z
Use await() instead of awaitUninterruptibly()
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]