Repository: flink Updated Branches: refs/heads/master 086acf681 -> 2a49eaaf3
[FLINK-3300] fix concurrency bug in YarnJobManager Adds message passing between Hadoop's async resource manager client and the YarnJobManager actor. This closes #1561. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a49eaaf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a49eaaf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a49eaaf Branch: refs/heads/master Commit: 2a49eaaf3c949864457aee0ffd99343a50ac7285 Parents: 086acf6 Author: Maximilian Michels <m...@apache.org> Authored: Fri Jan 29 11:21:08 2016 +0100 Committer: Maximilian Michels <m...@apache.org> Committed: Mon Feb 1 10:09:37 2016 +0100 ---------------------------------------------------------------------- .../org/apache/flink/yarn/YarnJobManager.scala | 425 +++++++++---------- .../org/apache/flink/yarn/YarnMessages.scala | 21 +- 2 files changed, 231 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2a49eaaf/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 135f87e..92eb4be 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -180,6 +180,7 @@ class YarnJobManager( _ ! decorateMessage(JobManagerStopped) } + // Shutdown and discard all queued messages context.system.shutdown() case RegisterApplicationClient => @@ -220,6 +221,85 @@ class YarnJobManager( case StartYarnSession(hadoopConfig, webServerPort) => startYarnSession(hadoopConfig, webServerPort) + + case YarnContainersAllocated(containers: JavaList[Container]) => + val newlyAllocatedContainers = containers.asScala + + newlyAllocatedContainers.foreach { + container => log.info(s"Got new container for allocation: ${container.getId}") + } + + allocatedContainersList ++= containers.asScala + numPendingRequests = math.max(0, numPendingRequests - newlyAllocatedContainers.length) + + allocateContainers() + + if (runningContainers >= numTaskManagers && allocatedContainersList.nonEmpty) { + log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " + + s"are not needed right now. Returning them") + for (container <- allocatedContainersList) { + rmClientOption match { + case Some(client) => client.releaseAssignedContainer(container.getId) + case None => + } + } + allocatedContainersList = List() + } + + + case YarnContainersCompleted(statuses: JavaList[ContainerStatus]) => + + val completedContainerStatuses = statuses.asScala + val idStatusMap = completedContainerStatuses + .map(status => (status.getContainerId, status)).toMap + + completedContainerStatuses.foreach { + status => log.info(s"Container ${status.getContainerId} is completed " + + s"with diagnostics: ${status.getDiagnostics}") + } + + // get failed containers (returned containers are also completed, so we have to + // distinguish if it was running before). + val (completedContainers, remainingRunningContainers) = runningContainersList + .partition(idStatusMap contains _.getId) + + completedContainers.foreach { + container => + val status = idStatusMap(container.getId) + failedContainers += 1 + runningContainers -= 1 + log.info(s"Container ${status.getContainerId} was a running container. " + + s"Total failed containers $failedContainers.") + val detail = status.getExitStatus match { + case -103 => "Vmem limit exceeded"; + case -104 => "Pmem limit exceeded"; + case _ => "" + } + messageListener foreach { + _ ! decorateMessage( + YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " + + s"state=${status.getState}.\n${status.getDiagnostics} $detail") + ) + } + } + + runningContainersList = remainingRunningContainers + + // maxFailedContainers == -1 is infinite number of retries. + if (maxFailedContainers != -1 && failedContainers >= maxFailedContainers) { + val msg = s"Stopping YARN session because the number of failed " + + s"containers ($failedContainers) exceeded the maximum failed container " + + s"count ($maxFailedContainers). This number is controlled by " + + s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration " + + s"setting. By default its the number of requested containers" + log.error(msg) + self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg)) + + } + + allocateContainers() + + case jnf: JobNotFound => log.warn(s"Job with ID ${jnf.jobID} not found in JobManager") if (stopWhenJobFinished == null) { @@ -315,9 +395,6 @@ class YarnJobManager( FAST_YARN_HEARTBEAT_DELAY.toMillis.toInt, AMRMClientAsyncHandler) - // inject client into handler to adjust the heartbeat interval and make requests - AMRMClientAsyncHandler.setClient(rmClientAsync) - rmClientAsync.init(conf) rmClientAsync.start() @@ -531,16 +608,135 @@ class YarnJobManager( } } + /** - * Heartbeats with the resource manager and handles container updates. + * Allocates new containers if necessary. */ - object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler { + private def allocateContainers() : Unit = { + + // check if we want to start some of our allocated containers. + if (runningContainers < numTaskManagers) { + val missingContainers = numTaskManagers - runningContainers + log.info(s"The user requested $numTaskManagers containers, $runningContainers " + + s"running. $missingContainers containers missing") + + val numStartedContainers = startTMsInAllocatedContainers(missingContainers) + + // if there are still containers missing, request them from YARN + val toAllocateFromYarn = Math.max( + missingContainers - numStartedContainers - numPendingRequests, + 0) + + if (toAllocateFromYarn > 0) { + val reallocate = flinkConfiguration + .getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, true) + log.info(s"There are $missingContainers containers missing." + + s" $numPendingRequests are already requested. " + + s"Requesting $toAllocateFromYarn additional container(s) from YARN. " + + s"Reallocation of failed containers is enabled=$reallocate " + + s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')") + // there are still containers missing. Request them from YARN + if (reallocate) { + for (i <- 1 to toAllocateFromYarn) { + val containerRequest = getContainerRequest(memoryPerTaskManager) + rmClientOption match { + case Some(client) => client.addContainerRequest(containerRequest) + case None => + } + numPendingRequests += 1 + log.info("Requested additional container from YARN. Pending requests " + + s"$numPendingRequests.") + } + } + } + } + } + + /** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in the available + * allocated containers. The number of successfully started TaskManagers is returned. + * + * @param numTMsToStart Number of TaskManagers to start if enough allocated containers are + * available. If not, then all allocated containers are used + * @return Number of successfully started TaskManagers + */ + private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = { + // not enough containers running + if (allocatedContainersList.nonEmpty) { + log.info(s"${allocatedContainersList.size} containers already allocated by YARN. " + + "Starting...") + + nmClientOption match { + case Some(nmClient) => + containerLaunchContext match { + case Some(ctx) => + val (containersToBeStarted, remainingContainers) = allocatedContainersList + .splitAt(numTMsToStart) + + val startedContainers = containersToBeStarted.flatMap { + container => + try { + nmClient.startContainer(container, ctx) + val message = s"Launching container (${container.getId} on host " + + s"${container.getNodeId.getHost})." + log.info(message) + + messageListener foreach { + _ ! decorateMessage(YarnMessage(message)) + } + + Some(container) + } catch { + case e: YarnException => + log.error(s"Exception while starting YARN " + + s"container ${container.getId} on " + + s"host ${container.getNodeId.getHost}", e) + None + } + } + + runningContainers += startedContainers.length + runningContainersList :::= startedContainers + + allocatedContainersList = remainingContainers + + val heartbeatInterval = + if (runningContainers < numTaskManagers) { + FAST_YARN_HEARTBEAT_DELAY + } else { + YARN_HEARTBEAT_DELAY + } - /* - * Asynchronous client to make requests to the RM. - * Must be set via setClient(..) before its service is started. - */ - private var client : AMRMClientAsync[ContainerRequest] = null + rmClientOption match { + case Some(client) => client.setHeartbeatInterval(heartbeatInterval.toMillis.toInt) + case None => + } + + startedContainers.length + case None => + log.error("The ContainerLaunchContext was not set.") + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.FAILED, + "Fatal error in AM: The ContainerLaunchContext was not set.")) + 0 + } + case None => + log.error("The NMClient was not set.") + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.FAILED, + "Fatal error in AM: The NMClient was not set.")) + 0 + } + } else { + 0 + } + } + + /** + * Heartbeats with the resource manager and informs of container updates. + */ + object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler { override def onError(e: Throwable): Unit = { self ! decorateMessage( @@ -563,213 +759,16 @@ class YarnJobManager( } override def onContainersCompleted(statuses: JavaList[ContainerStatus]): Unit = { - - val completedContainerStatuses = statuses.asScala - val idStatusMap = completedContainerStatuses - .map(status => (status.getContainerId, status)).toMap - - completedContainerStatuses.foreach { - status => log.info(s"Container ${status.getContainerId} is completed " + - s"with diagnostics: ${status.getDiagnostics}") - } - - // get failed containers (returned containers are also completed, so we have to - // distinguish if it was running before). - val (completedContainers, remainingRunningContainers) = runningContainersList - .partition(idStatusMap contains _.getId) - - completedContainers.foreach { - container => - val status = idStatusMap(container.getId) - failedContainers += 1 - runningContainers -= 1 - log.info(s"Container ${status.getContainerId} was a running container. " + - s"Total failed containers $failedContainers.") - val detail = status.getExitStatus match { - case -103 => "Vmem limit exceeded"; - case -104 => "Pmem limit exceeded"; - case _ => "" - } - messageListener foreach { - _ ! decorateMessage( - YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " + - s"state=${status.getState}.\n${status.getDiagnostics} $detail") - ) - } - } - - runningContainersList = remainingRunningContainers - - // maxFailedContainers == -1 is infinite number of retries. - if (maxFailedContainers != -1 && failedContainers >= maxFailedContainers) { - val msg = s"Stopping YARN session because the number of failed " + - s"containers ($failedContainers) exceeded the maximum failed container " + - s"count ($maxFailedContainers). This number is controlled by " + - s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration " + - s"setting. By default its the number of requested containers" - log.error(msg) - self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg)) - - } - - allocateContainers() - + self ! decorateMessage( + YarnContainersCompleted(statuses) + ) } override def onContainersAllocated(containers: JavaList[Container]): Unit = { - - val newlyAllocatedContainers = containers.asScala - - newlyAllocatedContainers.foreach { - container => log.info(s"Got new container for allocation: ${container.getId}") - } - - allocatedContainersList ++= containers.asScala - numPendingRequests = math.max(0, numPendingRequests - newlyAllocatedContainers.length) - - allocateContainers() - - if (runningContainers >= numTaskManagers && allocatedContainersList.nonEmpty) { - log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " + - s"are not needed right now. Returning them") - for (container <- allocatedContainersList) { - client.releaseAssignedContainer(container.getId) - } - allocatedContainersList = List() - } - } - - /** - * Allocates new containers if necessary. - */ - private def allocateContainers() : Unit = { - - // check if we want to start some of our allocated containers. - if (runningContainers < numTaskManagers) { - val missingContainers = numTaskManagers - runningContainers - log.info(s"The user requested $numTaskManagers containers, $runningContainers " + - s"running. $missingContainers containers missing") - - val numStartedContainers = startTMsInAllocatedContainers(missingContainers) - - // if there are still containers missing, request them from YARN - val toAllocateFromYarn = Math.max( - missingContainers - numStartedContainers - numPendingRequests, - 0) - - if (toAllocateFromYarn > 0) { - val reallocate = flinkConfiguration - .getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, true) - log.info(s"There are $missingContainers containers missing." + - s" $numPendingRequests are already requested. " + - s"Requesting $toAllocateFromYarn additional container(s) from YARN. " + - s"Reallocation of failed containers is enabled=$reallocate " + - s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')") - // there are still containers missing. Request them from YARN - if (reallocate) { - for (i <- 1 to toAllocateFromYarn) { - val containerRequest = getContainerRequest(memoryPerTaskManager) - client.addContainerRequest(containerRequest) - numPendingRequests += 1 - log.info("Requested additional container from YARN. Pending requests " + - s"$numPendingRequests.") - } - } - } - } - } - - /** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in the available - * allocated containers. The number of successfully started TaskManagers is returned. - * - * @param numTMsToStart Number of TaskManagers to start if enough allocated containers are - * available. If not, then all allocated containers are used - * @return Number of successfully started TaskManagers - */ - private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = { - // not enough containers running - if (allocatedContainersList.nonEmpty) { - log.info(s"${allocatedContainersList.size} containers already allocated by YARN. " + - "Starting...") - - nmClientOption match { - case Some(nmClient) => - containerLaunchContext match { - case Some(ctx) => - val (containersToBeStarted, remainingContainers) = allocatedContainersList - .splitAt(numTMsToStart) - - val startedContainers = containersToBeStarted.flatMap { - container => - try { - nmClient.startContainer(container, ctx) - val message = s"Launching container (${container.getId} on host " + - s"${container.getNodeId.getHost})." - log.info(message) - - messageListener foreach { - _ ! decorateMessage(YarnMessage(message)) - } - - Some(container) - } catch { - case e: YarnException => - log.error(s"Exception while starting YARN " + - s"container ${container.getId} on " + - s"host ${container.getNodeId.getHost}", e) - None - } - } - - runningContainers += startedContainers.length - runningContainersList :::= startedContainers - - allocatedContainersList = remainingContainers - - if (runningContainers < numTaskManagers) { - setHeartbeatRate(FAST_YARN_HEARTBEAT_DELAY) - } else { - setHeartbeatRate(YARN_HEARTBEAT_DELAY) - } - - startedContainers.length - case None => - log.error("The ContainerLaunchContext was not set.") - self ! decorateMessage( - StopYarnSession( - FinalApplicationStatus.FAILED, - "Fatal error in AM: The ContainerLaunchContext was not set.")) - 0 - } - case None => - log.error("The NMClient was not set.") - self ! decorateMessage( - StopYarnSession( - FinalApplicationStatus.FAILED, - "Fatal error in AM: The NMClient was not set.")) - 0 - } - } else { - 0 - } - } - - /** - * Adjusts the heartbeat interval of the asynchronous client. - * @param interval The interval between the heartbeats. - */ - private def setHeartbeatRate(interval : FiniteDuration): Unit = { - client.setHeartbeatInterval(interval.toMillis.toInt) - } - - /** - * Register the client with the CallbackHandler. Must be called before the client is started. - * @param clientAsync The AMRM client to make requests with. - */ - def setClient(clientAsync: AMRMClientAsync[ContainerRequest]) = { - client = clientAsync + self ! decorateMessage( + YarnContainersAllocated(containers) + ) } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/2a49eaaf/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala index 75b9b6f..7a35590 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala @@ -18,12 +18,12 @@ package org.apache.flink.yarn -import java.util.{UUID, Date} +import java.util.{List => JavaList, UUID, Date} import org.apache.flink.api.common.JobID import org.apache.flink.runtime.messages.RequiresLeaderSessionID import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus +import org.apache.hadoop.yarn.api.records.{ContainerStatus, Container, FinalApplicationStatus} import scala.concurrent.duration.{Deadline, FiniteDuration} @@ -40,8 +40,25 @@ object YarnMessages { case object JobManagerStopped + /** + * Entry point to start a new YarnSession. + * @param config The configuration to start the YarnSession with. + * @param webServerPort The port of the web server to bind to. + */ case class StartYarnSession(config: Configuration, webServerPort: Int) + /** + * Callback from the async resource manager client when containers were allocated. + * @param containers List of containers which were allocated. + */ + case class YarnContainersAllocated(containers: JavaList[Container]) + + /** + * Callback from the async resource manager client when containers were completed. + * @param statuses List of the completed containers' status. + */ + case class YarnContainersCompleted(statuses: JavaList[ContainerStatus]) + /** Triggers the registration of the ApplicationClient to the YarnJobManager * * @param jobManagerAkkaURL JobManager's Akka URL