Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/11612#discussion_r55907969
--- Diff:
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -265,25 +265,52 @@ private[yarn] class YarnAllocator(
// For locality unmatched and locality free container requests,
cancel these container
// requests, since required locality preference has been changed,
recalculating using
// container placement strategy.
- val (localityMatched, localityUnMatched, localityFree) =
splitPendingAllocationsByLocality(
+ val (localRequests, staleRequests, anyHostRequests) =
splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
- // Remove the outdated container request and recalculate the
requested container number
- localityUnMatched.foreach(amClient.removeContainerRequest)
- localityFree.foreach(amClient.removeContainerRequest)
- val updatedNumContainer = missing + localityUnMatched.size +
localityFree.size
+ // cancel "stale" requests for locations that are no longer needed
+ staleRequests.foreach { stale =>
+ amClient.removeContainerRequest(stale)
+ }
+ val cancelledContainers = staleRequests.size
+ logInfo(s"Canceled $cancelledContainers container requests (locality
no longer needed)")
+
+ // consider the number of new containers and cancelled stale
containers available
+ val availableContainers = missing + cancelledContainers
+
+ // to maximize locality, include requests with no locality
preference that can be cancelled
+ val potentialContainers = availableContainers + anyHostRequests.size
val containerLocalityPreferences =
containerPlacementStrategy.localityOfRequestedContainers(
- updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
- allocatedHostToContainersMap, localityMatched)
+ potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
+ allocatedHostToContainersMap, localRequests)
+
+ val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
+ containerLocalityPreferences.foreach {
+ case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
+ newLocalityRequests.append(createContainerRequest(resource,
nodes, racks))
+ case _ =>
+ }
- for (locality <- containerLocalityPreferences) {
- val request = createContainerRequest(resource, locality.nodes,
locality.racks)
+ if (availableContainers >= newLocalityRequests.size) {
+ // more containers are available than needed for locality, fill in
requests for any host
+ for (i <- 0 until (availableContainers -
newLocalityRequests.size)) {
+ newLocalityRequests.append(createContainerRequest(resource,
null, null))
+ }
+ } else {
+ val numToCancel = newLocalityRequests.size - availableContainers
--- End diff --
nevermind, the check a few lines above takes care of that...
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]