Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/6394#discussion_r32243556
--- Diff:
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -225,12 +240,89 @@ private[yarn] class YarnAllocator(
logInfo(s"Will request $missing executor containers, each with
${resource.getVirtualCores} " +
s"cores and ${resource.getMemory} MB memory including
$memoryOverhead MB overhead")
- for (i <- 0 until missing) {
- val request = createContainerRequest(resource)
- amClient.addContainerRequest(request)
- val nodes = request.getNodes
- val hostStr = if (nodes == null || nodes.isEmpty) "Any" else
nodes.last
- logInfo(s"Container request (host: $hostStr, capability:
$resource)")
+ // Calculate the number of executors we expected to satisfy all the
preferred locality tasks
+ val localityAwareTaskCores = localityAwarePendingTaskNum *
CPUS_PER_TASK
+ val expectedLocalityAwareContainerNum =
+ (localityAwareTaskCores + resource.getVirtualCores - 1) /
resource.getVirtualCores
+
+ // Calculate the expected container distribution according to the
preferred locality ratio
+ // and existed container distribution
+ val totalPreferredLocalities = preferredLocalityToCounts.values.sum
+ val expectedLocalityToContainerNum = preferredLocalityToCounts.map {
case (host, count) =>
+ val expectedCount =
+ count.toDouble * expectedLocalityAwareContainerNum /
totalPreferredLocalities
+ val existedCount = allocatedHostToContainersMap.get(host).map(s =>
s.size).getOrElse(0)
+ if (expectedCount > existedCount) {
+ // Get the actual container number if existing container can not
fully satisfy the
+ // expected number of container
+ (host, (expectedCount - existedCount).ceil.toInt)
+ } else {
+ // If the current existed container number can fully satisfy the
expected number of
+ // containers, set the required containers to be 0
+ (host, 0)
+ }
+ }
+ // Newly calculated locality required container number, which
excludes some requests which
+ // has already been satisfied by current containers.
+ val updatedLocalityAwareContainerNum =
expectedLocalityToContainerNum.values.sum
+
+ // The number of containers to allocate, divided into two groups,
one with node locality,
+ // and the other without locality preference.
+ var requiredLocalityFreeContainerNum: Int = 0
+ var requiredLocalityAwareContainerNum: Int = 0
+
+ if (updatedLocalityAwareContainerNum == 0) {
+ // If the current allocated executor can satisfy all the locality
preferred tasks,
+ // allocate the new container with no locality preference
+ requiredLocalityFreeContainerNum = missing
+ } else {
+ if (updatedLocalityAwareContainerNum >= missing) {
+ // If newly requested containers cannot satisfy the locality
preferred tasks,
+ // allocate all the new container with locality preference
+ requiredLocalityAwareContainerNum = missing
+ } else {
+ // If part of newly requested can satisfy the locality preferred
tasks, allocate part of
+ // the containers with locality preference, and another part
with no locality preference
+ requiredLocalityAwareContainerNum =
updatedLocalityAwareContainerNum
+ requiredLocalityFreeContainerNum = missing -
updatedLocalityAwareContainerNum
+ }
+ }
+
+ if (requiredLocalityFreeContainerNum > 0) {
+ for (i <- 0 until requiredLocalityFreeContainerNum) {
+ val request = createContainerRequest(resource, null, null)
+ amClient.addContainerRequest(request)
+ val nodes = request.getNodes
+ val hostStr = if (nodes == null || nodes.isEmpty) "Any" else
nodes.last
+ logInfo(s"Container request (host: $hostStr, capability:
$resource)")
+ }
+ }
+
+ if (requiredLocalityAwareContainerNum > 0) {
+ val largestRatio = expectedLocalityToContainerNum.values.max
+ // Round the ratio of preferred locality to the number of locality
required container
+ // number, which is used for locality preferred host calculating.
+ var preferredLocalityRatio =
expectedLocalityToContainerNum.mapValues { ratio =>
+ val adjustedRatio = ratio.toDouble *
requiredLocalityAwareContainerNum / largestRatio
+ adjustedRatio.ceil.toInt
+ }
+
+ for (i <- 0 until requiredLocalityAwareContainerNum) {
+ // Only filter out the ratio which is larger than 0, which means
the current host can
+ // still be allocated with new container request.
+ val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
+ val racks = hosts.map(h => RackResolver.resolve(conf,
h).getNetworkLocation).toSet
+ val request = createContainerRequest(resource, hosts,
racks.toArray)
+ amClient.addContainerRequest(request)
+ val nodes = request.getNodes
+ val hostStr = if (nodes == null || nodes.isEmpty) "Any" else
nodes.last
+ logInfo(s"Container request (host: $hostStr, capability:
$resource)")
+
+ // Each time when the host is used, subtract 1. When the current
ratio is 0,
+ // which means all the required ratio is satisfied, this host
will not allocated again.
+ // Details can be seen in the SPARK-4352.
+ preferredLocalityRatio = preferredLocalityRatio.mapValues(i => i
- 1)
--- End diff --
hi @jerryshao I am really new to this code base, and I know you are still
working on it, but I have a pretty tough time following what is going on --
can I suggest some refactoring to make this more readable?
(a) can we break out some smaller methods here? maybe one for the body of
this `for` loop ... or perhaps up to enclosing `if
(requiredLocalityAwareContainerNum > 0)`. Maybe there should even be another
method which includes that and the previous `if
(requiredLocalityFreeContainerNum > 0)`
(b) can you those methods get a higher level description of the algorithm?
I'd really like some description here that is not just referencing a JIRA --
that is just a bit of a unnecessary burden on somebody that it looking at it,
and the discussion goes back and forth a bit on the JIRA so there isn't even a
great summary there. Even just a general description about the approach you
have would be helpful, and the more nitty-gritty could be left to the unit
tests, which leads me to :
(c) it would be really great if these new methods could be refactored so
they have no dependence on yarn, and they would have their own unit tests,
which would help explain what is going on more. The discussion between you &
sandy on the jira was just about an abstract resource allocation problem -- it
would be great to have that part cleanly separated and tested, to make it
easier to follow. (and might make it easier to change the strategy down the
road if there are alternatives to try out / improvements to be made.)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]