Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6394#discussion_r32243556
  
    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
    @@ -225,12 +240,89 @@ private[yarn] class YarnAllocator(
           logInfo(s"Will request $missing executor containers, each with 
${resource.getVirtualCores} " +
             s"cores and ${resource.getMemory} MB memory including 
$memoryOverhead MB overhead")
     
    -      for (i <- 0 until missing) {
    -        val request = createContainerRequest(resource)
    -        amClient.addContainerRequest(request)
    -        val nodes = request.getNodes
    -        val hostStr = if (nodes == null || nodes.isEmpty) "Any" else 
nodes.last
    -        logInfo(s"Container request (host: $hostStr, capability: 
$resource)")
    +      // Calculate the number of executors we expected to satisfy all the 
preferred locality tasks
    +      val localityAwareTaskCores = localityAwarePendingTaskNum * 
CPUS_PER_TASK
    +      val expectedLocalityAwareContainerNum =
    +        (localityAwareTaskCores + resource.getVirtualCores - 1) / 
resource.getVirtualCores
    +
    +      // Calculate the expected container distribution according to the 
preferred locality ratio
    +      // and existed container distribution
    +      val totalPreferredLocalities = preferredLocalityToCounts.values.sum
    +      val expectedLocalityToContainerNum = preferredLocalityToCounts.map { 
case (host, count) =>
    +        val expectedCount =
    +          count.toDouble * expectedLocalityAwareContainerNum / 
totalPreferredLocalities
    +        val existedCount = allocatedHostToContainersMap.get(host).map(s => 
s.size).getOrElse(0)
    +        if (expectedCount > existedCount) {
    +          // Get the actual container number if existing container can not 
fully satisfy the
    +          // expected number of container
    +          (host, (expectedCount - existedCount).ceil.toInt)
    +        } else {
    +          // If the current existed container number can fully satisfy the 
expected number of
    +          // containers, set the required containers to be 0
    +          (host, 0)
    +        }
    +      }
    +      // Newly calculated locality required container number, which 
excludes some requests which
    +      // has already been satisfied by current containers.
    +      val updatedLocalityAwareContainerNum = 
expectedLocalityToContainerNum.values.sum
    +
    +      // The number of containers to allocate, divided into two groups, 
one with node locality,
    +      // and the other without locality preference.
    +      var requiredLocalityFreeContainerNum: Int = 0
    +      var requiredLocalityAwareContainerNum: Int = 0
    +
    +      if (updatedLocalityAwareContainerNum == 0) {
    +        // If the current allocated executor can satisfy all the locality 
preferred tasks,
    +        // allocate the new container with no locality preference
    +        requiredLocalityFreeContainerNum = missing
    +      } else {
    +        if (updatedLocalityAwareContainerNum >= missing) {
    +          // If newly requested containers cannot satisfy the locality 
preferred tasks,
    +          // allocate all the new container with locality preference
    +          requiredLocalityAwareContainerNum = missing
    +        } else {
    +          // If part of newly requested can satisfy the locality preferred 
tasks, allocate part of
    +          // the containers with locality preference, and another part 
with no locality preference
    +          requiredLocalityAwareContainerNum = 
updatedLocalityAwareContainerNum
    +          requiredLocalityFreeContainerNum = missing - 
updatedLocalityAwareContainerNum
    +        }
    +      }
    +
    +      if (requiredLocalityFreeContainerNum > 0) {
    +        for (i <- 0 until requiredLocalityFreeContainerNum) {
    +          val request = createContainerRequest(resource, null, null)
    +          amClient.addContainerRequest(request)
    +          val nodes = request.getNodes
    +          val hostStr = if (nodes == null || nodes.isEmpty) "Any" else 
nodes.last
    +          logInfo(s"Container request (host: $hostStr, capability: 
$resource)")
    +        }
    +      }
    +
    +      if (requiredLocalityAwareContainerNum > 0) {
    +        val largestRatio = expectedLocalityToContainerNum.values.max
    +        // Round the ratio of preferred locality to the number of locality 
required container
    +        // number, which is used for locality preferred host calculating.
    +        var preferredLocalityRatio = 
expectedLocalityToContainerNum.mapValues { ratio =>
    +          val adjustedRatio = ratio.toDouble * 
requiredLocalityAwareContainerNum / largestRatio
    +          adjustedRatio.ceil.toInt
    +        }
    +
    +        for (i <- 0 until requiredLocalityAwareContainerNum) {
    +          // Only filter out the ratio which is larger than 0, which means 
the current host can
    +          // still be allocated with new container request.
    +          val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
    +          val racks = hosts.map(h => RackResolver.resolve(conf, 
h).getNetworkLocation).toSet
    +          val request = createContainerRequest(resource, hosts, 
racks.toArray)
    +          amClient.addContainerRequest(request)
    +          val nodes = request.getNodes
    +          val hostStr = if (nodes == null || nodes.isEmpty) "Any" else 
nodes.last
    +          logInfo(s"Container request (host: $hostStr, capability: 
$resource)")
    +
    +          // Each time when the host is used, subtract 1. When the current 
ratio is 0,
    +          // which means all the required ratio is satisfied, this host 
will not allocated again.
    +          // Details can be seen in the SPARK-4352.
    +          preferredLocalityRatio = preferredLocalityRatio.mapValues(i => i 
- 1)
    --- End diff --
    
    hi @jerryshao I am really new to this code base, and I know you are still 
working on it, but I have a pretty tough time following what is going on  -- 
can I suggest some refactoring to make this more readable?
    
    (a) can we break out some smaller methods here?  maybe one for the body of 
this `for` loop ... or perhaps up to enclosing `if 
(requiredLocalityAwareContainerNum > 0)`.  Maybe there should even be another 
method which includes that and the previous `if 
(requiredLocalityFreeContainerNum > 0)`
    (b) can you those methods get a higher level description of the algorithm?  
I'd really like some description here that is not just referencing a JIRA -- 
that is just a bit of a unnecessary burden on somebody that it looking at it, 
and the discussion goes back and forth a bit on the JIRA so there isn't even a 
great summary there.  Even just a general description about the approach you 
have would be helpful, and the more nitty-gritty could be left to the unit 
tests, which leads me to :
    (c) it would be really great if these new methods could be refactored so 
they have no dependence on yarn, and they would have their own unit tests, 
which would help explain what is going on more.  The discussion between you & 
sandy on the jira was just about an abstract resource allocation problem -- it 
would be great to have that part cleanly separated and tested, to make it 
easier to follow.  (and might make it easier to change the strategy down the 
road if there are alternatives to try out / improvements to be made.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to