Repository: spark
Updated Branches:
  refs/heads/master cc6e53119 -> 1a88f20de


SPARK-4337. [YARN] Add ability to cancel pending requests

Author: Sandy Ryza <sa...@cloudera.com>

Closes #4141 from sryza/sandy-spark-4337 and squashes the following commits:

a98bd20 [Sandy Ryza] Andrew's comments
cdaab7f [Sandy Ryza] SPARK-4337. Add ability to cancel pending requests to YARN


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a88f20d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a88f20d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a88f20d

Branch: refs/heads/master
Commit: 1a88f20de798030a7d5713bd267f612ba5617fca
Parents: cc6e531
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Fri Feb 6 10:53:16 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Feb 6 10:53:16 2015 -0800

----------------------------------------------------------------------
 .../spark/deploy/yarn/YarnAllocator.scala       | 65 ++++++++++++--------
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  | 54 ++++++++++++++--
 2 files changed, 89 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1a88f20d/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 0dbb615..12c62a6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -69,8 +69,7 @@ private[yarn] class YarnAllocator(
   }
 
   // Visible for testing.
-  val allocatedHostToContainersMap =
-    new HashMap[String, collection.mutable.Set[ContainerId]]
+  val allocatedHostToContainersMap = new HashMap[String, 
collection.mutable.Set[ContainerId]]
   val allocatedContainerToHostMap = new HashMap[ContainerId, String]
 
   // Containers that we no longer care about. We've either already told the RM 
to release them or
@@ -84,7 +83,7 @@ private[yarn] class YarnAllocator(
   private var executorIdCounter = 0
   @volatile private var numExecutorsFailed = 0
 
-  @volatile private var maxExecutors = args.numExecutors
+  @volatile private var targetNumExecutors = args.numExecutors
 
   // Keep track of which container is running which executor to remove the 
executors later
   private val executorIdToContainer = new HashMap[String, Container]
@@ -133,10 +132,12 @@ private[yarn] class YarnAllocator(
     amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, 
resource).map(_.size).sum
 
   /**
-   * Request as many executors from the ResourceManager as needed to reach the 
desired total.
+   * Request as many executors from the ResourceManager as needed to reach the 
desired total. If
+   * the requested total is smaller than the current number of running 
executors, no executors will
+   * be killed.
    */
   def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
-    maxExecutors = requestedTotal
+    targetNumExecutors = requestedTotal
   }
 
   /**
@@ -147,8 +148,8 @@ private[yarn] class YarnAllocator(
       val container = executorIdToContainer.remove(executorId).get
       internalReleaseContainer(container)
       numExecutorsRunning -= 1
-      maxExecutors -= 1
-      assert(maxExecutors >= 0, "Allocator killed more executors than are 
allocated!")
+      targetNumExecutors -= 1
+      assert(targetNumExecutors >= 0, "Allocator killed more executors than 
are allocated!")
     } else {
       logWarning(s"Attempted to kill unknown executor $executorId!")
     }
@@ -163,15 +164,8 @@ private[yarn] class YarnAllocator(
    * This must be synchronized because variables read in this method are 
mutated by other methods.
    */
   def allocateResources(): Unit = synchronized {
-    val numPendingAllocate = getNumPendingAllocate
-    val missing = maxExecutors - numPendingAllocate - numExecutorsRunning
+    updateResourceRequests()
 
-    if (missing > 0) {
-      logInfo(s"Will request $missing executor containers, each with 
${resource.getVirtualCores} " +
-        s"cores and ${resource.getMemory} MB memory including $memoryOverhead 
MB overhead")
-    }
-
-    addResourceRequests(missing)
     val progressIndicator = 0.1f
     // Poll the ResourceManager. This doubles as a heartbeat if there are no 
pending container
     // requests.
@@ -201,15 +195,36 @@ private[yarn] class YarnAllocator(
   }
 
   /**
-   * Request numExecutors additional containers from YARN. Visible for testing.
+   * Update the set of container requests that we will sync with the RM based 
on the number of
+   * executors we have currently running and our target number of executors.
+   *
+   * Visible for testing.
    */
-  def addResourceRequests(numExecutors: Int): Unit = {
-    for (i <- 0 until numExecutors) {
-      val request = new ContainerRequest(resource, null, null, 
RM_REQUEST_PRIORITY)
-      amClient.addContainerRequest(request)
-      val nodes = request.getNodes
-      val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
-      logInfo("Container request (host: %s, capability: %s".format(hostStr, 
resource))
+  def updateResourceRequests(): Unit = {
+    val numPendingAllocate = getNumPendingAllocate
+    val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
+
+    if (missing > 0) {
+      logInfo(s"Will request $missing executor containers, each with 
${resource.getVirtualCores} " +
+        s"cores and ${resource.getMemory} MB memory including $memoryOverhead 
MB overhead")
+
+      for (i <- 0 until missing) {
+        val request = new ContainerRequest(resource, null, null, 
RM_REQUEST_PRIORITY)
+        amClient.addContainerRequest(request)
+        val nodes = request.getNodes
+        val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
+        logInfo(s"Container request (host: $hostStr, capability: $resource)")
+      }
+    } else if (missing < 0) {
+      val numToCancel = math.min(numPendingAllocate, -missing)
+      logInfo(s"Canceling requests for $numToCancel executor containers")
+
+      val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, 
ANY_HOST, resource)
+      if (!matchingRequests.isEmpty) {
+        
matchingRequests.head.take(numToCancel).foreach(amClient.removeContainerRequest)
+      } else {
+        logWarning("Expected to find pending requests, but found none.")
+      }
     }
   }
 
@@ -266,7 +281,7 @@ private[yarn] class YarnAllocator(
    * containersToUse or remaining.
    *
    * @param allocatedContainer container that was given to us by YARN
-   * @location resource name, either a node, rack, or *
+   * @param location resource name, either a node, rack, or *
    * @param containersToUse list of containers that will be used
    * @param remaining list of containers that will not be used
    */
@@ -294,7 +309,7 @@ private[yarn] class YarnAllocator(
   private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): 
Unit = {
     for (container <- containersToUse) {
       numExecutorsRunning += 1
-      assert(numExecutorsRunning <= maxExecutors)
+      assert(numExecutorsRunning <= targetNumExecutors)
       val executorHostname = container.getNodeId.getHost
       val containerId = container.getId
       executorIdCounter += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/1a88f20d/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 024b25f..3c224f1 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -107,8 +107,8 @@ class YarnAllocatorSuite extends FunSuite with Matchers 
with BeforeAndAfterEach
 
   test("single container allocated") {
     // request a single container and receive it
-    val handler = createAllocator()
-    handler.addResourceRequests(1)
+    val handler = createAllocator(1)
+    handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
     handler.getNumPendingAllocate should be (1)
 
@@ -123,8 +123,8 @@ class YarnAllocatorSuite extends FunSuite with Matchers 
with BeforeAndAfterEach
 
   test("some containers allocated") {
     // request a few containers and receive some of them
-    val handler = createAllocator()
-    handler.addResourceRequests(4)
+    val handler = createAllocator(4)
+    handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
     handler.getNumPendingAllocate should be (4)
 
@@ -144,7 +144,7 @@ class YarnAllocatorSuite extends FunSuite with Matchers 
with BeforeAndAfterEach
 
   test("receive more containers than requested") {
     val handler = createAllocator(2)
-    handler.addResourceRequests(2)
+    handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
     handler.getNumPendingAllocate should be (2)
 
@@ -162,6 +162,50 @@ class YarnAllocatorSuite extends FunSuite with Matchers 
with BeforeAndAfterEach
     handler.allocatedHostToContainersMap.contains("host4") should be (false)
   }
 
+  test("decrease total requested executors") {
+    val handler = createAllocator(4)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (4)
+
+    handler.requestTotalExecutors(3)
+    handler.updateResourceRequests()
+    handler.getNumPendingAllocate should be (3)
+
+    val container = createContainer("host1")
+    handler.handleAllocatedContainers(Array(container))
+
+    handler.getNumExecutorsRunning should be (1)
+    handler.allocatedContainerToHostMap.get(container.getId).get should be 
("host1")
+    handler.allocatedHostToContainersMap.get("host1").get should contain 
(container.getId)
+
+    handler.requestTotalExecutors(2)
+    handler.updateResourceRequests()
+    handler.getNumPendingAllocate should be (1)
+  }
+
+  test("decrease total requested executors to less than currently running") {
+    val handler = createAllocator(4)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (4)
+
+    handler.requestTotalExecutors(3)
+    handler.updateResourceRequests()
+    handler.getNumPendingAllocate should be (3)
+
+    val container1 = createContainer("host1")
+    val container2 = createContainer("host2")
+    handler.handleAllocatedContainers(Array(container1, container2))
+
+    handler.getNumExecutorsRunning should be (2)
+
+    handler.requestTotalExecutors(1)
+    handler.updateResourceRequests()
+    handler.getNumPendingAllocate should be (0)
+    handler.getNumExecutorsRunning should be (2)
+  }
+
   test("memory exceeded diagnostic regexes") {
     val diagnostics =
       "Container 
[pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to