Github user lianhuiwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3765#discussion_r22933548
  
    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -115,26 +107,35 @@ private[yarn] class YarnAllocator(
         new ThreadFactoryBuilder().setNameFormat("ContainerLauncher 
#%d").setDaemon(true).build())
       launcherPool.allowCoreThreadTimeOut(true)
     
    -  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
    +  private val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
    +    sparkConf.get("spark.driver.host"),
    +    sparkConf.get("spark.driver.port"),
    +    CoarseGrainedSchedulerBackend.ACTOR_NAME)
    +
    +  // For testing
    +  private val launchContainers = 
sparkConf.getBoolean("spark.yarn.launchContainers", true)
    +
    +  def getNumExecutorsRunning: Int = numExecutorsRunning
    +
    +  def getNumExecutorsFailed: Int = numExecutorsFailed
     
    -  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
    +  /**
    +   * Number of container requests that have not yet been fulfilled.
    +   */
    +  def getNumPendingAllocate: Int = getNumPendingAtLocation(ANY_HOST)
    +
    +  /**
    +   * Number of container requests at the given location that have not yet 
been fulfilled.
    +   */
    +  private def getNumPendingAtLocation(location: String): Int =
    +    amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, 
resource).map(_.size).sum
     
       /**
        * Request as many executors from the ResourceManager as needed to reach 
the desired total.
        * This takes into account executors already running or pending.
        */
       def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
    -    val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
    -    if (requestedTotal > currentTotal) {
    -      maxExecutors += (requestedTotal - currentTotal)
    -      // We need to call `allocateResources` here to avoid the following 
race condition:
    -      // If we request executors twice before `allocateResources` is 
called, then we will end up
    -      // double counting the number requested because `numPendingAllocate` 
is not updated yet.
    -      allocateResources()
    -    } else {
    -      logInfo(s"Not allocating more executors because there are already 
$currentTotal " +
    -        s"(application requested $requestedTotal total)")
    -    }
    +    maxExecutors = requestedTotal
    --- End diff --
    
    i think there maybe is maxExecutors += requestedTotal. because maxExecutors 
is sum of current executors, including running and pending executors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to