GitHub user caneGuy opened a pull request:

    https://github.com/apache/spark/pull/18739

    [WIP][SPARK-21539][CORE] Job should not be aborted when dynamic allocation 
is en…

    …abled or spark.executor.instances larger then current allocated number 
by yarn
    
    ## What changes were proposed in this pull request?
    
    For spark on yarn.
    Right now, when TaskSet can not run on any node or host.Which means 
blacklistedEverywhere is true in TaskSetManager#abortIfCompleteBlacklisted.
    However, if dynamic allocation is enabled, we should wait for yarn to 
allocate new nodemanager in order to execute job successfully.And we should 
report this information to yarn in case of assign same node which blacklisted 
by TaskScheduler.
    How to reproduce?
    1、Set up a yarn cluster with  5 nodes.And assign a node1 with much larger 
cpu core and memory,which can let yarn launch container on this node even it is 
blacklisted by TaskScheduler.
    2、modify BlockManager#registerWithExternalShuffleServer
    
        val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
        val SLEEP_TIME_SECS = 5
    
        for (i <- 1 to MAX_ATTEMPTS) {
          try {
            {color:red}if (shuffleId.host.equals("node1's address")) {
                 throw new Exception
            }{color}
            // Synchronous and will throw an exception if we cannot connect.
            
shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer(
              shuffleServerId.host, shuffleServerId.port, 
shuffleServerId.executorId, shuffleConfig)
            return
          } catch {
            case e: Exception if i < MAX_ATTEMPTS =>
              logError(s"Failed to connect to external shuffle server, will 
retry ${MAX_ATTEMPTS - i}"
                + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
              Thread.sleep(SLEEP_TIME_SECS * 1000)
            case NonFatal(e) =>
              throw new SparkException("Unable to register with external 
shuffle server due to : " +
                e.getMessage, e)
          }
        }
    }
    
    add logic in red.
    3、set shuffle service enable as true and open shuffle service for yarn.
    Then yarn will always launch executor on node1 but failed since shuffle 
service can not register success.
    Then job will be aborted.
    So we should not abort when dynamic allocation is enable.And we should 
update blacklist node for yarn when blacklist everywhere.
    ## How was this patch tested?
    Test in production cluster and unit test


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/caneGuy/spark zhoukang/improve-abort

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18739.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18739
    
----
commit 47e0d8dd8e876a11d913f7d340dcab608df5c9fa
Author: zhoukang <[email protected]>
Date:   2017-07-26T12:41:43Z

    [SPARK][CORE] Job should not be aborted when dynamic allocation is enabled 
or spark.executor.instances larger then current allocated number by yarn

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to