Repository: flink
Updated Branches:
  refs/heads/master 086acf681 -> 2a49eaaf3


[FLINK-3300] fix concurrency bug in YarnJobManager

Adds message passing between Hadoop's async resource manager client and
the YarnJobManager actor.

This closes #1561.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a49eaaf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a49eaaf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a49eaaf

Branch: refs/heads/master
Commit: 2a49eaaf3c949864457aee0ffd99343a50ac7285
Parents: 086acf6
Author: Maximilian Michels <m...@apache.org>
Authored: Fri Jan 29 11:21:08 2016 +0100
Committer: Maximilian Michels <m...@apache.org>
Committed: Mon Feb 1 10:09:37 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/yarn/YarnJobManager.scala  | 425 +++++++++----------
 .../org/apache/flink/yarn/YarnMessages.scala    |  21 +-
 2 files changed, 231 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a49eaaf/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 135f87e..92eb4be 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -180,6 +180,7 @@ class YarnJobManager(
         _ ! decorateMessage(JobManagerStopped)
       }
 
+      // Shutdown and discard all queued messages
       context.system.shutdown()
 
     case RegisterApplicationClient =>
@@ -220,6 +221,85 @@ class YarnJobManager(
     case StartYarnSession(hadoopConfig, webServerPort) =>
       startYarnSession(hadoopConfig, webServerPort)
 
+
+    case YarnContainersAllocated(containers: JavaList[Container]) =>
+      val newlyAllocatedContainers = containers.asScala
+
+      newlyAllocatedContainers.foreach {
+        container => log.info(s"Got new container for allocation: 
${container.getId}")
+      }
+
+      allocatedContainersList ++= containers.asScala
+      numPendingRequests = math.max(0, numPendingRequests - 
newlyAllocatedContainers.length)
+
+      allocateContainers()
+
+      if (runningContainers >= numTaskManagers && 
allocatedContainersList.nonEmpty) {
+        log.info(s"Flink has ${allocatedContainersList.size} allocated 
containers which " +
+          s"are not needed right now. Returning them")
+        for (container <- allocatedContainersList) {
+          rmClientOption match {
+            case Some(client) => 
client.releaseAssignedContainer(container.getId)
+            case None =>
+          }
+        }
+        allocatedContainersList = List()
+      }
+
+
+    case YarnContainersCompleted(statuses: JavaList[ContainerStatus]) =>
+
+      val completedContainerStatuses = statuses.asScala
+      val idStatusMap = completedContainerStatuses
+        .map(status => (status.getContainerId, status)).toMap
+
+      completedContainerStatuses.foreach {
+        status => log.info(s"Container ${status.getContainerId} is completed " 
+
+          s"with diagnostics: ${status.getDiagnostics}")
+      }
+
+      // get failed containers (returned containers are also completed, so we 
have to
+      // distinguish if it was running before).
+      val (completedContainers, remainingRunningContainers) = 
runningContainersList
+        .partition(idStatusMap contains _.getId)
+
+      completedContainers.foreach {
+        container =>
+          val status = idStatusMap(container.getId)
+          failedContainers += 1
+          runningContainers -= 1
+          log.info(s"Container ${status.getContainerId} was a running 
container. " +
+            s"Total failed containers $failedContainers.")
+          val detail = status.getExitStatus match {
+            case -103 => "Vmem limit exceeded";
+            case -104 => "Pmem limit exceeded";
+            case _ => ""
+          }
+          messageListener foreach {
+            _ ! decorateMessage(
+              YarnMessage(s"Diagnostics for 
containerID=${status.getContainerId} in " +
+                s"state=${status.getState}.\n${status.getDiagnostics} $detail")
+            )
+          }
+      }
+
+      runningContainersList = remainingRunningContainers
+
+      // maxFailedContainers == -1 is infinite number of retries.
+      if (maxFailedContainers != -1 && failedContainers >= 
maxFailedContainers) {
+        val msg = s"Stopping YARN session because the number of failed " +
+          s"containers ($failedContainers) exceeded the maximum failed 
container " +
+          s"count ($maxFailedContainers). This number is controlled by " +
+          s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration 
" +
+          s"setting. By default its the number of requested containers"
+        log.error(msg)
+        self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, 
msg))
+
+      }
+
+      allocateContainers()
+
+
     case jnf: JobNotFound =>
       log.warn(s"Job with ID ${jnf.jobID} not found in JobManager")
       if (stopWhenJobFinished == null) {
@@ -315,9 +395,6 @@ class YarnJobManager(
         FAST_YARN_HEARTBEAT_DELAY.toMillis.toInt,
         AMRMClientAsyncHandler)
 
-      // inject client into handler to adjust the heartbeat interval and make 
requests
-      AMRMClientAsyncHandler.setClient(rmClientAsync)
-
       rmClientAsync.init(conf)
       rmClientAsync.start()
 
@@ -531,16 +608,135 @@ class YarnJobManager(
     }
   }
 
+
   /**
-    * Heartbeats with the resource manager and handles container updates.
+    * Allocates new containers if necessary.
     */
-  object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler {
+  private def allocateContainers() : Unit = {
+
+    // check if we want to start some of our allocated containers.
+    if (runningContainers < numTaskManagers) {
+      val missingContainers = numTaskManagers - runningContainers
+      log.info(s"The user requested $numTaskManagers containers, 
$runningContainers " +
+        s"running. $missingContainers containers missing")
+
+      val numStartedContainers = 
startTMsInAllocatedContainers(missingContainers)
+
+      // if there are still containers missing, request them from YARN
+      val toAllocateFromYarn = Math.max(
+        missingContainers - numStartedContainers - numPendingRequests,
+        0)
+
+      if (toAllocateFromYarn > 0) {
+        val reallocate = flinkConfiguration
+          .getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, true)
+        log.info(s"There are $missingContainers containers missing." +
+          s" $numPendingRequests are already requested. " +
+          s"Requesting $toAllocateFromYarn additional container(s) from YARN. 
" +
+          s"Reallocation of failed containers is enabled=$reallocate " +
+          s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')")
+        // there are still containers missing. Request them from YARN
+        if (reallocate) {
+          for (i <- 1 to toAllocateFromYarn) {
+            val containerRequest = getContainerRequest(memoryPerTaskManager)
+            rmClientOption match {
+              case Some(client) => client.addContainerRequest(containerRequest)
+              case None =>
+            }
+            numPendingRequests += 1
+            log.info("Requested additional container from YARN. Pending 
requests " +
+              s"$numPendingRequests.")
+          }
+        }
+      }
+    }
+  }
+
+  /** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in 
the available
+    * allocated containers. The number of successfully started TaskManagers is 
returned.
+    *
+    * @param numTMsToStart Number of TaskManagers to start if enough allocated 
containers are
+    *                      available. If not, then all allocated containers 
are used
+    * @return Number of successfully started TaskManagers
+    */
+  private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = {
+    // not enough containers running
+    if (allocatedContainersList.nonEmpty) {
+      log.info(s"${allocatedContainersList.size} containers already allocated 
by YARN. " +
+        "Starting...")
+
+      nmClientOption match {
+        case Some(nmClient) =>
+          containerLaunchContext match {
+            case Some(ctx) =>
+              val (containersToBeStarted, remainingContainers) = 
allocatedContainersList
+                .splitAt(numTMsToStart)
+
+              val startedContainers = containersToBeStarted.flatMap {
+                container =>
+                  try {
+                    nmClient.startContainer(container, ctx)
+                    val message = s"Launching container (${container.getId} on 
host " +
+                      s"${container.getNodeId.getHost})."
+                    log.info(message)
+
+                    messageListener foreach {
+                      _ ! decorateMessage(YarnMessage(message))
+                    }
+
+                    Some(container)
+                  } catch {
+                    case e: YarnException =>
+                      log.error(s"Exception while starting YARN " +
+                        s"container ${container.getId} on " +
+                        s"host ${container.getNodeId.getHost}", e)
+                      None
+                  }
+              }
+
+              runningContainers += startedContainers.length
+              runningContainersList :::= startedContainers
+
+              allocatedContainersList = remainingContainers
+
+              val heartbeatInterval =
+                if (runningContainers < numTaskManagers) {
+                  FAST_YARN_HEARTBEAT_DELAY
+                } else {
+                  YARN_HEARTBEAT_DELAY
+                }
 
-    /*
-     * Asynchronous client to make requests to the RM.
-     * Must be set via setClient(..) before its service is started.
-     */
-    private var client : AMRMClientAsync[ContainerRequest] = null
+              rmClientOption match {
+                case Some(client) => 
client.setHeartbeatInterval(heartbeatInterval.toMillis.toInt)
+                case None =>
+              }
+
+              startedContainers.length
+            case None =>
+              log.error("The ContainerLaunchContext was not set.")
+              self ! decorateMessage(
+                StopYarnSession(
+                  FinalApplicationStatus.FAILED,
+                  "Fatal error in AM: The ContainerLaunchContext was not 
set."))
+              0
+          }
+        case None =>
+          log.error("The NMClient was not set.")
+          self ! decorateMessage(
+            StopYarnSession(
+              FinalApplicationStatus.FAILED,
+              "Fatal error in AM: The NMClient was not set."))
+          0
+      }
+    } else {
+      0
+    }
+  }
+
+  /**
+    * Heartbeats with the resource manager and informs of container updates.
+    */
+  object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler {
 
     override def onError(e: Throwable): Unit = {
       self ! decorateMessage(
@@ -563,213 +759,16 @@ class YarnJobManager(
     }
 
     override def onContainersCompleted(statuses: JavaList[ContainerStatus]): 
Unit = {
-
-      val completedContainerStatuses = statuses.asScala
-      val idStatusMap = completedContainerStatuses
-        .map(status => (status.getContainerId, status)).toMap
-
-      completedContainerStatuses.foreach {
-        status => log.info(s"Container ${status.getContainerId} is completed " 
+
-          s"with diagnostics: ${status.getDiagnostics}")
-      }
-
-      // get failed containers (returned containers are also completed, so we 
have to
-      // distinguish if it was running before).
-      val (completedContainers, remainingRunningContainers) = 
runningContainersList
-        .partition(idStatusMap contains _.getId)
-
-      completedContainers.foreach {
-        container =>
-          val status = idStatusMap(container.getId)
-          failedContainers += 1
-          runningContainers -= 1
-          log.info(s"Container ${status.getContainerId} was a running 
container. " +
-            s"Total failed containers $failedContainers.")
-          val detail = status.getExitStatus match {
-            case -103 => "Vmem limit exceeded";
-            case -104 => "Pmem limit exceeded";
-            case _ => ""
-          }
-          messageListener foreach {
-            _ ! decorateMessage(
-              YarnMessage(s"Diagnostics for 
containerID=${status.getContainerId} in " +
-                s"state=${status.getState}.\n${status.getDiagnostics} $detail")
-            )
-          }
-      }
-
-      runningContainersList = remainingRunningContainers
-
-      // maxFailedContainers == -1 is infinite number of retries.
-      if (maxFailedContainers != -1 && failedContainers >= 
maxFailedContainers) {
-        val msg = s"Stopping YARN session because the number of failed " +
-          s"containers ($failedContainers) exceeded the maximum failed 
container " +
-          s"count ($maxFailedContainers). This number is controlled by " +
-          s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration 
" +
-          s"setting. By default its the number of requested containers"
-        log.error(msg)
-        self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, 
msg))
-
-      }
-
-      allocateContainers()
-
+      self ! decorateMessage(
+        YarnContainersCompleted(statuses)
+      )
     }
 
     override def onContainersAllocated(containers: JavaList[Container]): Unit 
= {
-
-      val newlyAllocatedContainers = containers.asScala
-
-      newlyAllocatedContainers.foreach {
-        container => log.info(s"Got new container for allocation: 
${container.getId}")
-      }
-
-      allocatedContainersList ++= containers.asScala
-      numPendingRequests = math.max(0, numPendingRequests - 
newlyAllocatedContainers.length)
-
-      allocateContainers()
-
-      if (runningContainers >= numTaskManagers && 
allocatedContainersList.nonEmpty) {
-        log.info(s"Flink has ${allocatedContainersList.size} allocated 
containers which " +
-          s"are not needed right now. Returning them")
-        for (container <- allocatedContainersList) {
-          client.releaseAssignedContainer(container.getId)
-        }
-        allocatedContainersList = List()
-      }
-    }
-
-    /**
-      * Allocates new containers if necessary.
-      */
-    private def allocateContainers() : Unit = {
-
-      // check if we want to start some of our allocated containers.
-      if (runningContainers < numTaskManagers) {
-        val missingContainers = numTaskManagers - runningContainers
-        log.info(s"The user requested $numTaskManagers containers, 
$runningContainers " +
-          s"running. $missingContainers containers missing")
-
-        val numStartedContainers = 
startTMsInAllocatedContainers(missingContainers)
-
-        // if there are still containers missing, request them from YARN
-        val toAllocateFromYarn = Math.max(
-          missingContainers - numStartedContainers - numPendingRequests,
-          0)
-
-        if (toAllocateFromYarn > 0) {
-          val reallocate = flinkConfiguration
-            .getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, 
true)
-          log.info(s"There are $missingContainers containers missing." +
-            s" $numPendingRequests are already requested. " +
-            s"Requesting $toAllocateFromYarn additional container(s) from 
YARN. " +
-            s"Reallocation of failed containers is enabled=$reallocate " +
-            s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')")
-          // there are still containers missing. Request them from YARN
-          if (reallocate) {
-            for (i <- 1 to toAllocateFromYarn) {
-              val containerRequest = getContainerRequest(memoryPerTaskManager)
-              client.addContainerRequest(containerRequest)
-              numPendingRequests += 1
-              log.info("Requested additional container from YARN. Pending 
requests " +
-                s"$numPendingRequests.")
-            }
-          }
-        }
-      }
-    }
-
-    /** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in 
the available
-      * allocated containers. The number of successfully started TaskManagers 
is returned.
-      *
-      * @param numTMsToStart Number of TaskManagers to start if enough 
allocated containers are
-      *                      available. If not, then all allocated containers 
are used
-      * @return Number of successfully started TaskManagers
-      */
-    private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = {
-      // not enough containers running
-      if (allocatedContainersList.nonEmpty) {
-        log.info(s"${allocatedContainersList.size} containers already 
allocated by YARN. " +
-          "Starting...")
-
-        nmClientOption match {
-          case Some(nmClient) =>
-            containerLaunchContext match {
-              case Some(ctx) =>
-                val (containersToBeStarted, remainingContainers) = 
allocatedContainersList
-                  .splitAt(numTMsToStart)
-
-                val startedContainers = containersToBeStarted.flatMap {
-                  container =>
-                    try {
-                      nmClient.startContainer(container, ctx)
-                      val message = s"Launching container (${container.getId} 
on host " +
-                        s"${container.getNodeId.getHost})."
-                      log.info(message)
-
-                      messageListener foreach {
-                        _ ! decorateMessage(YarnMessage(message))
-                      }
-
-                      Some(container)
-                    } catch {
-                      case e: YarnException =>
-                        log.error(s"Exception while starting YARN " +
-                          s"container ${container.getId} on " +
-                          s"host ${container.getNodeId.getHost}", e)
-                        None
-                    }
-                }
-
-                runningContainers += startedContainers.length
-                runningContainersList :::= startedContainers
-
-                allocatedContainersList = remainingContainers
-
-                if (runningContainers < numTaskManagers) {
-                  setHeartbeatRate(FAST_YARN_HEARTBEAT_DELAY)
-                } else {
-                  setHeartbeatRate(YARN_HEARTBEAT_DELAY)
-                }
-
-                startedContainers.length
-              case None =>
-                log.error("The ContainerLaunchContext was not set.")
-                self ! decorateMessage(
-                  StopYarnSession(
-                    FinalApplicationStatus.FAILED,
-                    "Fatal error in AM: The ContainerLaunchContext was not 
set."))
-                0
-            }
-          case None =>
-            log.error("The NMClient was not set.")
-            self ! decorateMessage(
-              StopYarnSession(
-                FinalApplicationStatus.FAILED,
-                "Fatal error in AM: The NMClient was not set."))
-            0
-        }
-      } else {
-        0
-      }
-    }
-
-    /**
-      * Adjusts the heartbeat interval of the asynchronous client.
-      * @param interval The interval between the heartbeats.
-      */
-    private def setHeartbeatRate(interval : FiniteDuration): Unit = {
-      client.setHeartbeatInterval(interval.toMillis.toInt)
-    }
-
-    /**
-      * Register the client with the CallbackHandler. Must be called before 
the client is started.
-      * @param clientAsync The AMRM client to make requests with.
-      */
-    def setClient(clientAsync: AMRMClientAsync[ContainerRequest]) = {
-      client = clientAsync
+      self ! decorateMessage(
+        YarnContainersAllocated(containers)
+      )
     }
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a49eaaf/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
index 75b9b6f..7a35590 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
@@ -18,12 +18,12 @@
 
 package org.apache.flink.yarn
 
-import java.util.{UUID, Date}
+import java.util.{List => JavaList, UUID, Date}
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.api.records.{ContainerStatus, Container, 
FinalApplicationStatus}
 
 import scala.concurrent.duration.{Deadline, FiniteDuration}
 
@@ -40,8 +40,25 @@ object YarnMessages {
 
   case object JobManagerStopped
 
+  /**
+    * Entry point to start a new YarnSession.
+    * @param config The configuration to start the YarnSession with.
+    * @param webServerPort The port of the web server to bind to.
+    */
   case class StartYarnSession(config: Configuration, webServerPort: Int)
 
+  /**
+    * Callback from the async resource manager client when containers were 
allocated.
+    * @param containers List of containers which were allocated.
+    */
+  case class YarnContainersAllocated(containers: JavaList[Container])
+
+  /**
+    * Callback from the async resource manager client when containers were 
completed.
+    * @param statuses List of the completed containers' status.
+    */
+  case class YarnContainersCompleted(statuses: JavaList[ContainerStatus])
+
   /** Triggers the registration of the ApplicationClient to the YarnJobManager
     *
     * @param jobManagerAkkaURL JobManager's Akka URL

Reply via email to