Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3652#discussion_r22066685
  
    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -498,26 +494,160 @@ private[yarn] abstract class YarnAllocator(
        *
        * @param count Number of containers to allocate.
        *              If zero, should still contact RM (as a heartbeat).
    -   * @param pending Number of containers pending allocate. Only used on 
alpha.
        * @return Response to the allocation request.
        */
    -  protected def allocateContainers(count: Int, pending: Int): 
YarnAllocateResponse
    +  private def allocateContainers(count: Int): AllocateResponse = {
    +    addResourceRequests(count)
     
    -  /** Called to release a previously allocated container. */
    -  protected def releaseContainer(container: Container): Unit
    +    // We have already set the container request. Poll the ResourceManager 
for a response.
    +    // This doubles as a heartbeat if there are no pending container 
requests.
    +    val progressIndicator = 0.1f
    +    amClient.allocate(progressIndicator)
    +  }
     
    -  /**
    -   * Defines the interface for an allocate response from the RM. This is 
needed since the alpha
    -   * and stable interfaces differ here in ways that cannot be fixed using 
other routes.
    -   */
    -  protected trait YarnAllocateResponse {
    +  private def createRackResourceRequests(
    +      hostContainers: ArrayBuffer[ContainerRequest])
    +    : ArrayBuffer[ContainerRequest] = {
    +    // Generate modified racks and new set of hosts under it before 
issuing requests.
    +    val rackToCounts = new HashMap[String, Int]()
     
    -    def getAllocatedContainers(): JList[Container]
    +    for (container <- hostContainers) {
    +      val candidateHost = container.getNodes.last
    +      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
    +
    +      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
    +      if (rack != null) {
    +        var count = rackToCounts.getOrElse(rack, 0)
    +        count += 1
    +        rackToCounts.put(rack, count)
    +      }
    +    }
     
    -    def getAvailableResources(): Resource
    +    val requestedContainers = new 
ArrayBuffer[ContainerRequest](rackToCounts.size)
    +    for ((rack, count) <- rackToCounts) {
    +      requestedContainers ++= createResourceRequests(
    +        AllocationType.RACK,
    +        rack,
    +        count,
    +        RM_REQUEST_PRIORITY)
    +    }
     
    -    def getCompletedContainersStatuses(): JList[ContainerStatus]
    +    requestedContainers
    +  }
     
    +  private def addResourceRequests(numExecutors: Int): Unit = {
    +    val containerRequests: List[ContainerRequest] =
    +      if (numExecutors <= 0) {
    +        logDebug("numExecutors: " + numExecutors)
    +        List()
    +      } else if (preferredHostToCount.isEmpty) {
    +        logDebug("host preferences is empty")
    +        createResourceRequests(
    +          AllocationType.ANY,
    +          resource = null,
    +          numExecutors,
    +          RM_REQUEST_PRIORITY).toList
    +      } else {
    +        // Request for all hosts in preferred nodes and for numExecutors -
    +        // candidates.size, request by default allocation policy.
    +        val hostContainerRequests = new 
ArrayBuffer[ContainerRequest](preferredHostToCount.size)
    +        for ((candidateHost, candidateCount) <- preferredHostToCount) {
    +          val requiredCount = candidateCount - 
allocatedContainersOnHost(candidateHost)
    +
    +          if (requiredCount > 0) {
    +            hostContainerRequests ++= createResourceRequests(
    +              AllocationType.HOST,
    +              candidateHost,
    +              requiredCount,
    +              RM_REQUEST_PRIORITY)
    +          }
    +        }
    +        val rackContainerRequests: List[ContainerRequest] = 
createRackResourceRequests(
    +          hostContainerRequests).toList
    +
    +        val anyContainerRequests = createResourceRequests(
    +          AllocationType.ANY,
    +          resource = null,
    +          numExecutors,
    +          RM_REQUEST_PRIORITY)
    +
    +        val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
    +          hostContainerRequests.size + rackContainerRequests.size + 
anyContainerRequests.size)
    +
    +        containerRequestBuffer ++= hostContainerRequests
    +        containerRequestBuffer ++= rackContainerRequests
    +        containerRequestBuffer ++= anyContainerRequests
    +        containerRequestBuffer.toList
    +      }
    +
    +    for (request <- containerRequests) {
    +      amClient.addContainerRequest(request)
    +    }
    +
    +    for (request <- containerRequests) {
    +      val nodes = request.getNodes
    +      val hostStr = if (nodes == null || nodes.isEmpty) {
    +        "Any"
    +      } else {
    +        nodes.last
    +      }
    +      logInfo("Container request (host: %s, priority: %s, capability: 
%s".format(
    +        hostStr,
    +        request.getPriority().getPriority,
    +        request.getCapability))
    +    }
    +  }
    +
    +  private def createResourceRequests(
    +      requestType: AllocationType.AllocationType,
    +      resource: String,
    +      numExecutors: Int,
    +      priority: Int)
    +    : ArrayBuffer[ContainerRequest] = {
    --- End diff --
    
    I would bump this up the previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to