Github user jerryshao commented on a diff in the pull request:
https://github.com/apache/spark/pull/6394#discussion_r32283912
--- Diff:
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -225,12 +240,89 @@ private[yarn] class YarnAllocator(
logInfo(s"Will request $missing executor containers, each with
${resource.getVirtualCores} " +
s"cores and ${resource.getMemory} MB memory including
$memoryOverhead MB overhead")
- for (i <- 0 until missing) {
- val request = createContainerRequest(resource)
- amClient.addContainerRequest(request)
- val nodes = request.getNodes
- val hostStr = if (nodes == null || nodes.isEmpty) "Any" else
nodes.last
- logInfo(s"Container request (host: $hostStr, capability:
$resource)")
+ // Calculate the number of executors we expected to satisfy all the
preferred locality tasks
+ val localityAwareTaskCores = localityAwarePendingTaskNum *
CPUS_PER_TASK
+ val expectedLocalityAwareContainerNum =
+ (localityAwareTaskCores + resource.getVirtualCores - 1) /
resource.getVirtualCores
+
+ // Calculate the expected container distribution according to the
preferred locality ratio
+ // and existed container distribution
+ val totalPreferredLocalities = preferredLocalityToCounts.values.sum
+ val expectedLocalityToContainerNum = preferredLocalityToCounts.map {
case (host, count) =>
+ val expectedCount =
+ count.toDouble * expectedLocalityAwareContainerNum /
totalPreferredLocalities
+ val existedCount = allocatedHostToContainersMap.get(host).map(s =>
s.size).getOrElse(0)
+ if (expectedCount > existedCount) {
+ // Get the actual container number if existing container can not
fully satisfy the
+ // expected number of container
+ (host, (expectedCount - existedCount).ceil.toInt)
+ } else {
+ // If the current existed container number can fully satisfy the
expected number of
+ // containers, set the required containers to be 0
+ (host, 0)
+ }
+ }
+ // Newly calculated locality required container number, which
excludes some requests which
+ // has already been satisfied by current containers.
+ val updatedLocalityAwareContainerNum =
expectedLocalityToContainerNum.values.sum
+
+ // The number of containers to allocate, divided into two groups,
one with node locality,
+ // and the other without locality preference.
+ var requiredLocalityFreeContainerNum: Int = 0
+ var requiredLocalityAwareContainerNum: Int = 0
+
+ if (updatedLocalityAwareContainerNum == 0) {
+ // If the current allocated executor can satisfy all the locality
preferred tasks,
+ // allocate the new container with no locality preference
+ requiredLocalityFreeContainerNum = missing
+ } else {
+ if (updatedLocalityAwareContainerNum >= missing) {
+ // If newly requested containers cannot satisfy the locality
preferred tasks,
+ // allocate all the new container with locality preference
+ requiredLocalityAwareContainerNum = missing
+ } else {
+ // If part of newly requested can satisfy the locality preferred
tasks, allocate part of
+ // the containers with locality preference, and another part
with no locality preference
+ requiredLocalityAwareContainerNum =
updatedLocalityAwareContainerNum
+ requiredLocalityFreeContainerNum = missing -
updatedLocalityAwareContainerNum
+ }
+ }
+
+ if (requiredLocalityFreeContainerNum > 0) {
+ for (i <- 0 until requiredLocalityFreeContainerNum) {
+ val request = createContainerRequest(resource, null, null)
+ amClient.addContainerRequest(request)
+ val nodes = request.getNodes
+ val hostStr = if (nodes == null || nodes.isEmpty) "Any" else
nodes.last
+ logInfo(s"Container request (host: $hostStr, capability:
$resource)")
+ }
+ }
+
+ if (requiredLocalityAwareContainerNum > 0) {
+ val largestRatio = expectedLocalityToContainerNum.values.max
+ // Round the ratio of preferred locality to the number of locality
required container
+ // number, which is used for locality preferred host calculating.
+ var preferredLocalityRatio =
expectedLocalityToContainerNum.mapValues { ratio =>
+ val adjustedRatio = ratio.toDouble *
requiredLocalityAwareContainerNum / largestRatio
+ adjustedRatio.ceil.toInt
+ }
+
+ for (i <- 0 until requiredLocalityAwareContainerNum) {
+ // Only filter out the ratio which is larger than 0, which means
the current host can
+ // still be allocated with new container request.
+ val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
+ val racks = hosts.map(h => RackResolver.resolve(conf,
h).getNetworkLocation).toSet
+ val request = createContainerRequest(resource, hosts,
racks.toArray)
+ amClient.addContainerRequest(request)
+ val nodes = request.getNodes
+ val hostStr = if (nodes == null || nodes.isEmpty) "Any" else
nodes.last
+ logInfo(s"Container request (host: $hostStr, capability:
$resource)")
+
+ // Each time when the host is used, subtract 1. When the current
ratio is 0,
+ // which means all the required ratio is satisfied, this host
will not allocated again.
+ // Details can be seen in the SPARK-4352.
+ preferredLocalityRatio = preferredLocalityRatio.mapValues(i => i
- 1)
--- End diff --
Thanks a lot @squito , greatly appreciate your comments. I'm still working
on this, my first target is to prove the algorithm that it could be worked, and
then I will refactor the code to make it more clear. Sorry about few comments
on the code, thanks a lot for your comments :smiley: .
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]