Wei Chen created SPARK-21562: -------------------------------- Summary: Spark may request extra containers if the rpc between YARN and spark is too fast Key: SPARK-21562 URL: https://issues.apache.org/jira/browse/SPARK-21562 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 2.2.0 Reporter: Wei Chen
hi huys, I find an interesting problem when spark tries to request containers from YARN. Here is the case: In YarnAllocator.scala 1. this function requests container from YARN only if there are executors are not be requested. {color:red}def updateResourceRequests(): Unit = { val pendingAllocate = getPendingAllocate val numPendingAllocate = pendingAllocate.size val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning if (missing > 0) { ...... } ..... }{color} 2. After the requested containers are allocated(granted through RPC), then it will update the pending queues {color:red}private def matchContainerToRequest( allocatedContainer: Container, location: String, containersToUse: ArrayBuffer[Container], remaining: ArrayBuffer[Container]): Unit = { ..... amClient.removeContainerRequest(containerRequest) //update pending queues ..... } {color} 3. After the allocated containers are launched, it will update the running queue {color:red}private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = { for (container <- containersToUse) { .... auncherPool.execute(new Runnable { override def run(): Unit = { try { new ExecutorRunnable( Some(container), conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores, appAttemptId.getApplicationId.toString, securityMgr, localResources ).run() logInfo(s"has launched $containerId") updateInternalState() //update running queues .... } }{color} However, in step 3 it will launch a thread to first launch ExecutorRunnable then update running queue. We found it would take almost 1 sec before the updating running queue function is called(updateInternalState()). So there would be an inconsistent situation here since the pending queue is updated but the running queue is not updated yet due to the launching thread does not reach updateInternalState() yet. If there is an RPC call to amClient.allocate() between this inconsistent interval, then more executors than targetNumExecutors would be requested. Here is an example: Initial: targetNumExecutors numPendingAllocate numExecutorsRunning 1 0 0 After first RPC call to amClient.allocate: targetNumExecutors numPendingAllocate numExecutorsRunning 1 1 0 After the first allocated container is granted by YARN targetNumExecutors numPendingAllocate numExecutorsRunning 1 0(is removed in step 2) 0 =====>if there is a RPC call here to amClient.allocate(), then more containers are requested, however this situation is caused by the inconsistent state. After the container is launched in step 3 targetNumExecutors numPendingAllocate numExecutorsRunning 1 0 1 ======================================================================= I found this problem because I am testng the feature if YARN's opportunisitc containers(e.g., allocation takes 100ms) which is much faster then guaranteed containers(e.g., allocateion takes almost 1s). I am not sure if I have a correct understanding. Appreciate anyone's help in this issue(correct me if I have miss understanding) Wei -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org