GitHub user caneGuy opened a pull request:
https://github.com/apache/spark/pull/18739
[WIP][SPARK-21539][CORE] Job should not be aborted when dynamic allocation
is enâ¦
â¦abled or spark.executor.instances larger then current allocated number
by yarn
## What changes were proposed in this pull request?
For spark on yarn.
Right now, when TaskSet can not run on any node or host.Which means
blacklistedEverywhere is true in TaskSetManager#abortIfCompleteBlacklisted.
However, if dynamic allocation is enabled, we should wait for yarn to
allocate new nodemanager in order to execute job successfully.And we should
report this information to yarn in case of assign same node which blacklisted
by TaskScheduler.
How to reproduce?
1ãSet up a yarn cluster with 5 nodes.And assign a node1 with much larger
cpu core and memory,which can let yarn launch container on this node even it is
blacklisted by TaskScheduler.
2ãmodify BlockManager#registerWithExternalShuffleServer
val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
val SLEEP_TIME_SECS = 5
for (i <- 1 to MAX_ATTEMPTS) {
try {
{color:red}if (shuffleId.host.equals("node1's address")) {
throw new Exception
}{color}
// Synchronous and will throw an exception if we cannot connect.
shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
shuffleServerId.host, shuffleServerId.port,
shuffleServerId.executorId, shuffleConfig)
return
} catch {
case e: Exception if i < MAX_ATTEMPTS =>
logError(s"Failed to connect to external shuffle server, will
retry ${MAX_ATTEMPTS - i}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
case NonFatal(e) =>
throw new SparkException("Unable to register with external
shuffle server due to : " +
e.getMessage, e)
}
}
}
add logic in red.
3ãset shuffle service enable as true and open shuffle service for yarn.
Then yarn will always launch executor on node1 but failed since shuffle
service can not register success.
Then job will be aborted.
So we should not abort when dynamic allocation is enable.And we should
update blacklist node for yarn when blacklist everywhere.
## How was this patch tested?
Test in production cluster and unit test
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/caneGuy/spark zhoukang/improve-abort
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/18739.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #18739
----
commit 47e0d8dd8e876a11d913f7d340dcab608df5c9fa
Author: zhoukang <[email protected]>
Date: 2017-07-26T12:41:43Z
[SPARK][CORE] Job should not be aborted when dynamic allocation is enabled
or spark.executor.instances larger then current allocated number by yarn
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]