Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/655#discussion_r13227671
  
    --- Diff: 
yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 ---
    @@ -105,278 +96,222 @@ private[yarn] class YarnAllocationHandler(
     
       def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
     
    -  def isResourceConstraintSatisfied(container: Container): Boolean = {
    -    container.getResource.getMemory >= (executorMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
    -  }
    -
       def releaseContainer(container: Container) {
         val containerId = container.getId
    -    pendingReleaseContainers.put(containerId, true)
         amClient.releaseAssignedContainer(containerId)
       }
     
    +  /**
    +   * Heartbeat to the ResourceManager. Passes along any ContainerRequests 
we've added to the
    +   * AMRMClient. If there are no pending requests, lets the 
ResourceManager know we're still alive.
    +   */
       def allocateResources() {
    -    // We have already set the container request. Poll the ResourceManager 
for a response.
    -    // This doubles as a heartbeat if there are no pending container 
requests.
         val progressIndicator = 0.1f
         val allocateResponse = amClient.allocate(progressIndicator)
     
         val allocatedContainers = allocateResponse.getAllocatedContainers()
         if (allocatedContainers.size > 0) {
    -      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * 
allocatedContainers.size)
    -
    -      if (numPendingAllocateNow < 0) {
    -        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * 
numPendingAllocateNow)
    -      }
    -
           logDebug("""
             Allocated containers: %d
             Current executor count: %d
             Containers released: %s
    -        Containers to-be-released: %s
             Cluster resources: %s
    +               """.format(
    +        allocatedContainers.size,
    +        numExecutorsRunning.get(),
    +        releasedContainerList,
    +        allocateResponse.getAvailableResources))
    +
    +      handleAllocatedContainers(allocatedContainers)
    +    }
    +
    +    val completedContainers = 
allocateResponse.getCompletedContainersStatuses()
    +    if (completedContainers.size > 0) {
    +      logDebug("Completed %d containers".format(completedContainers.size))
    +
    +      processCompletedContainers(completedContainers)
    +
    +      logDebug("""
    +        Finished processing %d completed containers.
    +        Current number of executors running: %d,
    +        releasedContainerList: %s,
             """.format(
    -          allocatedContainers.size,
    +          completedContainers.size,
               numExecutorsRunning.get(),
    -          releasedContainerList,
    -          pendingReleaseContainers,
    -          allocateResponse.getAvailableResources))
    -
    -      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
    -
    -      for (container <- allocatedContainers) {
    -        if (isResourceConstraintSatisfied(container)) {
    -          // Add the accepted `container` to the host's list of already 
accepted,
    -          // allocated containers
    -          val host = container.getNodeId.getHost
    -          val containersForHost = hostToContainers.getOrElseUpdate(host,
    -            new ArrayBuffer[Container]())
    -          containersForHost += container
    -        } else {
    -          // Release container, since it doesn't satisfy resource 
constraints.
    -          releaseContainer(container)
    -        }
    +          releasedContainerList))
    +    }
    +  }
    +
    +  def handleAllocatedContainers(allocatedContainers: Seq[Container]) {
    +    val numPendingAllocateNow = 
numPendingAllocate.addAndGet(-allocatedContainers.size)
    +
    +    if (numPendingAllocateNow < 0) {
    +      numPendingAllocate.addAndGet(-numPendingAllocateNow)
    +    }
    +
    +    val containersToUse = new 
ArrayBuffer[Container](allocatedContainers.size)
    +
    +    // Match incoming requests by host
    +    val remainingAfterHostMatches = new ArrayBuffer[Container]()
    +    for (allocatedContainer <- allocatedContainers) {
    +      matchContainerToRequest(allocatedContainer, 
allocatedContainer.getNodeId.getHost,
    +        containersToUse, remainingAfterHostMatches)
    +    }
    +
    +    // Match remaining by rack
    +    val remainingAfterRackMatches = new ArrayBuffer[Container]()
    +    for (allocatedContainer <- remainingAfterHostMatches) {
    +      val rack = RackResolver.resolve(conf, 
allocatedContainer.getNodeId.getHost).getNetworkLocation
    +      matchContainerToRequest(allocatedContainer, rack, containersToUse,
    +        remainingAfterRackMatches)
    +    }
    +
    +    // Assign remaining that are neither node-local nor rack-local
    +    val remainingAfterOffRackMatches = new ArrayBuffer[Container]()
    +    for (allocatedContainer <- remainingAfterRackMatches) {
    +      matchContainerToRequest(allocatedContainer, "*", containersToUse,
    +        remainingAfterOffRackMatches)
    +    }
    +
    +    if (!remainingAfterOffRackMatches.isEmpty) {
    +      logWarning("Received containers that did not satisfy resource 
constraints: "
    +        + remainingAfterOffRackMatches)
    +      for (container <- remainingAfterOffRackMatches) {
    +        amClient.releaseAssignedContainer(container.getId)
           }
    +    }
     
    -       // Find the appropriate containers to use.
    -      // TODO: Cleanup this group-by...
    -      val dataLocalContainers = new HashMap[String, 
ArrayBuffer[Container]]()
    -      val rackLocalContainers = new HashMap[String, 
ArrayBuffer[Container]]()
    -      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    -
    -      for (candidateHost <- hostToContainers.keySet) {
    -        val maxExpectedHostCount = 
preferredHostToCount.getOrElse(candidateHost, 0)
    -        val requiredHostCount = maxExpectedHostCount - 
allocatedContainersOnHost(candidateHost)
    -
    -        val remainingContainersOpt = hostToContainers.get(candidateHost)
    -        assert(remainingContainersOpt.isDefined)
    -        var remainingContainers = remainingContainersOpt.get
    -
    -        if (requiredHostCount >= remainingContainers.size) {
    -          // Since we have <= required containers, add all remaining 
containers to
    -          // `dataLocalContainers`.
    -          dataLocalContainers.put(candidateHost, remainingContainers)
    -          // There are no more free containers remaining.
    -          remainingContainers = null
    -        } else if (requiredHostCount > 0) {
    -          // Container list has more containers than we need for data 
locality.
    -          // Split the list into two: one based on the data local 
container count,
    -          // (`remainingContainers.size` - `requiredHostCount`), and the 
other to hold remaining
    -          // containers.
    -          val (dataLocal, remaining) = remainingContainers.splitAt(
    -            remainingContainers.size - requiredHostCount)
    -          dataLocalContainers.put(candidateHost, dataLocal)
    -
    -          // Invariant: remainingContainers == remaining
    -
    -          // YARN has a nasty habit of allocating a ton of containers on a 
host - discourage this.
    -          // Add each container in `remaining` to list of containers to 
release. If we have an
    -          // insufficient number of containers, then the next allocation 
cycle will reallocate
    -          // (but won't treat it as data local).
    -          // TODO(harvey): Rephrase this comment some more.
    -          for (container <- remaining) releaseContainer(container)
    -          remainingContainers = null
    -        }
    +    runAllocatedContainers(containersToUse)
    +
    +    logDebug("""
    +        Finished allocating %s containers (from %s originally).
    +        Current number of executors running: %d,
    +        releasedContainerList: %s,
    +             """.format(
    +      containersToUse,
    +      allocatedContainers,
    +      numExecutorsRunning.get(),
    +      releasedContainerList))
    +  }
    +
    +  def runAllocatedContainers(containersToUse: ArrayBuffer[Container]) {
    +    for (container <- containersToUse) {
    +      val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
    +      val executorHostname = container.getNodeId.getHost
    +      val containerId = container.getId
    +
    +      val executorMemoryWithOverhead = (executorMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
    +      assert(container.getResource.getMemory >= executorMemoryWithOverhead)
    +
    +      if (numExecutorsRunningNow > maxExecutors) {
    +        logInfo("""Ignoring container %s at host %s, since we already have 
the required number of
    +            containers.""".format(containerId, executorHostname))
    +        amClient.releaseAssignedContainer(container.getId)
    +        numExecutorsRunning.decrementAndGet()
    +      } else {
    +        val executorId = executorIdCounter.incrementAndGet().toString
    +        val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
    +          sparkConf.get("spark.driver.host"),
    +          sparkConf.get("spark.driver.port"),
    +          CoarseGrainedSchedulerBackend.ACTOR_NAME)
    +
    +        logInfo("Launching container %s for on host 
%s".format(containerId, executorHostname))
    +
    +        val rack = RackResolver.resolve(conf, 
executorHostname).getNetworkLocation
    +        allocatedHostToContainersMap.synchronized {
    +          val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
    +            new HashSet[ContainerId]())
    +
    +          containerSet += containerId
    +          allocatedContainerToHostMap.put(containerId, executorHostname)
     
    -        // For rack local containers
    -        if (remainingContainers != null) {
    -          val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
               if (rack != null) {
    -            val maxExpectedRackCount = 
preferredRackToCount.getOrElse(rack, 0)
    -            val requiredRackCount = maxExpectedRackCount - 
allocatedContainersOnRack(rack) -
    -              rackLocalContainers.getOrElse(rack, List()).size
    -
    -            if (requiredRackCount >= remainingContainers.size) {
    -              // Add all remaining containers to to `dataLocalContainers`.
    -              dataLocalContainers.put(rack, remainingContainers)
    -              remainingContainers = null
    -            } else if (requiredRackCount > 0) {
    -              // Container list has more containers that we need for data 
locality.
    -              // Split the list into two: one based on the data local 
container count,
    -              // (`remainingContainers.size` - `requiredHostCount`), and 
the other to hold remaining
    -              // containers.
    -              val (rackLocal, remaining) = remainingContainers.splitAt(
    -                remainingContainers.size - requiredRackCount)
    -              val existingRackLocal = 
rackLocalContainers.getOrElseUpdate(rack,
    -                new ArrayBuffer[Container]())
    -
    -              existingRackLocal ++= rackLocal
    -
    -              remainingContainers = remaining
    -            }
    +            allocatedRackCount.put(rack, 
allocatedRackCount.getOrElse(rack, 0) + 1)
               }
             }
    -
    -        if (remainingContainers != null) {
    -          // Not all containers have been consumed - add them to the list 
of off-rack containers.
    -          offRackContainers.put(candidateHost, remainingContainers)
    -        }
    +        logInfo("Launching ExecutorRunnable. driverUrl: %s,  
executorHostname: %s".format(
    --- End diff --
    
    Two Spaces :`driverUrl: %s,  executorHostname`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to