Repository: spark
Updated Branches:
  refs/heads/master c0cad596b -> b56e9c613


[SPARK-16630][YARN] Blacklist a node if executors won't launch on it

## What changes were proposed in this pull request?

This change extends YARN resource allocation handling with blacklisting 
functionality.
This handles cases when node is messed up or misconfigured such that a 
container won't launch on it. Before this change backlisting only focused on 
task execution but this change introduces YarnAllocatorBlacklistTracker which 
tracks allocation failures per host (when enabled via 
"spark.yarn.blacklist.executor.launch.blacklisting.enabled").

## How was this patch tested?

### With unit tests

Including a new suite: YarnAllocatorBlacklistTrackerSuite.

#### Manually

It was tested on a cluster by deleting the Spark jars on one of the node.

#### Behaviour before these changes

Starting Spark as:
```
spark2-shell --master yarn --deploy-mode client --num-executors 4  --conf 
spark.executor.memory=4g --conf "spark.yarn.max.executor.failures=6"
```

Log is:
```
18/04/12 06:49:36 INFO yarn.ApplicationMaster: Final app status: FAILED, 
exitCode: 11, (reason: Max number of executor failures (6) reached)
18/04/12 06:49:39 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster 
with FAILED (diag message: Max number of executor failures (6) reached)
18/04/12 06:49:39 INFO impl.AMRMClientImpl: Waiting for application to be 
successfully unregistered.
18/04/12 06:49:39 INFO yarn.ApplicationMaster: Deleting staging directory 
hdfs://apiros-1.gce.test.com:8020/user/systest/.sparkStaging/application_1523459048274_0016
18/04/12 06:49:39 INFO util.ShutdownHookManager: Shutdown hook called
```

#### Behaviour after these changes

Starting Spark as:
```
spark2-shell --master yarn --deploy-mode client --num-executors 4  --conf 
spark.executor.memory=4g --conf "spark.yarn.max.executor.failures=6" --conf 
"spark.yarn.blacklist.executor.launch.blacklisting.enabled=true"
```

And the log is:
```
18/04/13 05:37:43 INFO yarn.YarnAllocator: Will request 1 executor 
container(s), each with 1 core(s) and 4505 MB memory (including 409 MB of 
overhead)
18/04/13 05:37:43 INFO yarn.YarnAllocator: Submitted 1 unlocalized container 
requests.
18/04/13 05:37:43 INFO yarn.YarnAllocator: Launching container 
container_1523459048274_0025_01_000008 on host apiros-4.gce.test.com for 
executor with ID 6
18/04/13 05:37:43 INFO yarn.YarnAllocator: Received 1 containers from YARN, 
launching executors on 1 of them.
18/04/13 05:37:43 INFO yarn.YarnAllocator: Completed container 
container_1523459048274_0025_01_000007 on host: apiros-4.gce.test.com (state: 
COMPLETE, exit status: 1)
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: blacklisting host as 
YARN allocation failed: apiros-4.gce.test.com
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: adding nodes to YARN 
application master's blacklist: List(apiros-4.gce.test.com)
18/04/13 05:37:43 WARN yarn.YarnAllocator: Container marked as failed: 
container_1523459048274_0025_01_000007 on host: apiros-4.gce.test.com. Exit 
status: 1. Diagnostics: Exception from container-launch.
Container id: container_1523459048274_0025_01_000007
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)
        at org.apache.hadoop.util.Shell.run(Shell.java:507)
        at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789)
        at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
        at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
```

Where the most important part is:

```
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: blacklisting host as 
YARN allocation failed: apiros-4.gce.test.com
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: adding nodes to YARN 
application master's blacklist: List(apiros-4.gce.test.com)
```

And execution was continued (no shutdown called).

### Testing the backlisting of the whole cluster

Starting Spark with YARN blacklisting enabled then removing a the Spark core 
jar one by one from all the cluster nodes. Then executing a simple spark job 
which fails checking the yarn log the expected exit status is contained:

```
18/06/15 01:07:10 INFO yarn.ApplicationMaster: Final app status: FAILED, 
exitCode: 11, (reason: Due to executor failures all available nodes are 
blacklisted)
18/06/15 01:07:13 INFO util.ShutdownHookManager: Shutdown hook called
```

Author: “attilapiros” <piros.attila.zs...@gmail.com>

Closes #21068 from attilapiros/SPARK-16630.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b56e9c61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b56e9c61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b56e9c61

Branch: refs/heads/master
Commit: b56e9c613fb345472da3db1a567ee129621f6bf3
Parents: c0cad59
Author: “attilapiros” <piros.attila.zs...@gmail.com>
Authored: Thu Jun 21 09:17:18 2018 -0500
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Thu Jun 21 09:17:18 2018 -0500

----------------------------------------------------------------------
 .../spark/scheduler/BlacklistTracker.scala      |   2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   3 +-
 .../apache/spark/HeartbeatReceiverSuite.scala   |   6 +-
 docs/running-on-yarn.md                         |  10 +
 ...esosCoarseGrainedSchedulerBackendSuite.scala |   1 +
 .../spark/deploy/yarn/ApplicationMaster.scala   |   4 +
 .../spark/deploy/yarn/YarnAllocator.scala       |  60 ++----
 .../yarn/YarnAllocatorBlacklistTracker.scala    | 187 +++++++++++++++++++
 .../org/apache/spark/deploy/yarn/config.scala   |   6 +
 .../spark/deploy/yarn/FailureTrackerSuite.scala | 100 ++++++++++
 .../YarnAllocatorBlacklistTrackerSuite.scala    | 140 ++++++++++++++
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  |  16 +-
 12 files changed, 479 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala 
b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index 30cf75d..980fbbe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -371,7 +371,7 @@ private[scheduler] class BlacklistTracker (
 
 }
 
-private[scheduler] object BlacklistTracker extends Logging {
+private[spark] object BlacklistTracker extends Logging {
 
   private val DEFAULT_TIMEOUT = "1h"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index d8794e8..9b90e30 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -170,8 +170,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         if (executorDataMap.contains(executorId)) {
           executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + 
executorId))
           context.reply(true)
-        } else if (scheduler.nodeBlacklist != null &&
-          scheduler.nodeBlacklist.contains(hostname)) {
+        } else if (scheduler.nodeBlacklist.contains(hostname)) {
           // If the cluster manager gives us an executor on a blacklisted node 
(because it
           // already started allocating those resources before we informed it 
of our blacklist,
           // or if it ignored our blacklist), then we reject that executor 
immediately.

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 8891648..b705556 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark
 
 import java.util.concurrent.{ExecutorService, TimeUnit}
 
-import scala.collection.Map
 import scala.collection.mutable
 import scala.concurrent.Future
 import scala.concurrent.duration._
@@ -73,6 +72,7 @@ class HeartbeatReceiverSuite
     sc = spy(new SparkContext(conf))
     scheduler = mock(classOf[TaskSchedulerImpl])
     when(sc.taskScheduler).thenReturn(scheduler)
+    when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]())
     when(scheduler.sc).thenReturn(sc)
     heartbeatReceiverClock = new ManualClock
     heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
@@ -241,7 +241,7 @@ class HeartbeatReceiverSuite
       } === Some(true))
   }
 
-  private def getTrackedExecutors: Map[String, Long] = {
+  private def getTrackedExecutors: collection.Map[String, Long] = {
     // We may receive undesired SparkListenerExecutorAdded from 
LocalSchedulerBackend,
     // so exclude it from the map. See SPARK-10800.
     heartbeatReceiver.invokePrivate(_executorLastSeen()).
@@ -272,7 +272,7 @@ private class FakeSchedulerBackend(
 
   protected override def doRequestTotalExecutors(requestedTotal: Int): 
Future[Boolean] = {
     clusterManagerEndpoint.ask[Boolean](
-      RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount, Set.empty[String]))
+      RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount, Set.empty))
   }
 
   protected override def doKillExecutors(executorIds: Seq[String]): 
Future[Boolean] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4dbcbea..575da72 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -411,6 +411,16 @@ To use a custom metrics.properties for the application 
master and executors, upd
   name matches both the include and the exclude pattern, this file will be 
excluded eventually.
   </td>
 </tr>
+<tr>
+  
<td><code>spark.yarn.blacklist.executor.launch.blacklisting.enabled</code></td>
+  <td>false</td>
+  <td>
+  Flag to enable blacklisting of nodes having YARN resource allocation 
problems.
+  The error limit for blacklisting can be configured by
+  <code>spark.blacklist.application.maxFailedExecutorsPerNode</code>.
+  </td>
+</tr>
+
 </table>
 
 # Important notes

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index f4bd1ee..b790c7c 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -789,6 +789,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
 
     taskScheduler = mock[TaskSchedulerImpl]
+    when(taskScheduler.nodeBlacklist).thenReturn(Set[String]())
     when(taskScheduler.sc).thenReturn(sc)
 
     externalShuffleClient = mock[MesosExternalShuffleClient]

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 3d6ee50..ecc5769 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -515,6 +515,10 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
               finish(FinalApplicationStatus.FAILED,
                 ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
                 s"Max number of executor failures ($maxNumExecutorFailures) 
reached")
+            } else if (allocator.isAllNodeBlacklisted) {
+              finish(FinalApplicationStatus.FAILED,
+                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+                "Due to executor failures all available nodes are blacklisted")
             } else {
               logDebug("Sending progress")
               allocator.allocateResources()

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index ebee3d4..fae054e 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -24,7 +24,7 @@ import java.util.regex.Pattern
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.yarn.api.records._
@@ -66,7 +66,8 @@ private[yarn] class YarnAllocator(
     appAttemptId: ApplicationAttemptId,
     securityMgr: SecurityManager,
     localResources: Map[String, LocalResource],
-    resolver: SparkRackResolver)
+    resolver: SparkRackResolver,
+    clock: Clock = new SystemClock)
   extends Logging {
 
   import YarnAllocator._
@@ -102,18 +103,14 @@ private[yarn] class YarnAllocator(
   private var executorIdCounter: Int =
     driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
 
-  // Queue to store the timestamp of failed executors
-  private val failedExecutorsTimeStamps = new Queue[Long]()
+  private[spark] val failureTracker = new FailureTracker(sparkConf, clock)
 
-  private var clock: Clock = new SystemClock
-
-  private val executorFailuresValidityInterval =
-    sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
+  private val allocatorBlacklistTracker =
+    new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker)
 
   @volatile private var targetNumExecutors =
     SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
 
-  private var currentNodeBlacklist = Set.empty[String]
 
   // Executor loss reason requests that are pending - maps from executor ID 
for inquiry to a
   // list of requesters that should be responded to once we find out why the 
given executor
@@ -149,7 +146,6 @@ private[yarn] class YarnAllocator(
 
   private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
 
-
   // A map to store preferred hostname and possible task numbers running on it.
   private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
 
@@ -160,26 +156,11 @@ private[yarn] class YarnAllocator(
   private[yarn] val containerPlacementStrategy =
     new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, 
resolver)
 
-  /**
-   * Use a different clock for YarnAllocator. This is mainly used for testing.
-   */
-  def setClock(newClock: Clock): Unit = {
-    clock = newClock
-  }
-
   def getNumExecutorsRunning: Int = runningExecutors.size()
 
-  def getNumExecutorsFailed: Int = synchronized {
-    val endTime = clock.getTimeMillis()
+  def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors
 
-    while (executorFailuresValidityInterval > 0
-      && failedExecutorsTimeStamps.nonEmpty
-      && failedExecutorsTimeStamps.head < endTime - 
executorFailuresValidityInterval) {
-      failedExecutorsTimeStamps.dequeue()
-    }
-
-    failedExecutorsTimeStamps.size
-  }
+  def isAllNodeBlacklisted: Boolean = 
allocatorBlacklistTracker.isAllNodeBlacklisted
 
   /**
    * A sequence of pending container requests that have not yet been fulfilled.
@@ -204,9 +185,8 @@ private[yarn] class YarnAllocator(
    * @param localityAwareTasks number of locality aware tasks to be used as 
container placement hint
    * @param hostToLocalTaskCount a map of preferred hostname to possible task 
counts to be used as
    *                             container placement hint.
-   * @param nodeBlacklist a set of blacklisted nodes, which is passed in to 
avoid allocating new
-    *                      containers on them. It will be used to update the 
application master's
-    *                      blacklist.
+   * @param nodeBlacklist blacklisted nodes, which is passed in to avoid 
allocating new containers
+   *                      on them. It will be used to update the application 
master's blacklist.
    * @return Whether the new requested total is different than the old value.
    */
   def requestTotalExecutorsWithPreferredLocalities(
@@ -220,19 +200,7 @@ private[yarn] class YarnAllocator(
     if (requestedTotal != targetNumExecutors) {
       logInfo(s"Driver requested a total number of $requestedTotal 
executor(s).")
       targetNumExecutors = requestedTotal
-
-      // Update blacklist infomation to YARN ResouceManager for this 
application,
-      // in order to avoid allocating new Containers on the problematic nodes.
-      val blacklistAdditions = nodeBlacklist -- currentNodeBlacklist
-      val blacklistRemovals = currentNodeBlacklist -- nodeBlacklist
-      if (blacklistAdditions.nonEmpty) {
-        logInfo(s"adding nodes to YARN application master's blacklist: 
$blacklistAdditions")
-      }
-      if (blacklistRemovals.nonEmpty) {
-        logInfo(s"removing nodes from YARN application master's blacklist: 
$blacklistRemovals")
-      }
-      amClient.updateBlacklist(blacklistAdditions.toList.asJava, 
blacklistRemovals.toList.asJava)
-      currentNodeBlacklist = nodeBlacklist
+      allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
       true
     } else {
       false
@@ -268,6 +236,7 @@ private[yarn] class YarnAllocator(
     val allocateResponse = amClient.allocate(progressIndicator)
 
     val allocatedContainers = allocateResponse.getAllocatedContainers()
+    
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
 
     if (allocatedContainers.size > 0) {
       logDebug(("Allocated containers: %d. Current executor count: %d. " +
@@ -602,8 +571,9 @@ private[yarn] class YarnAllocator(
               completedContainer.getDiagnostics,
               PMEM_EXCEEDED_PATTERN))
           case _ =>
-            // Enqueue the timestamp of failed executor
-            failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
+            // all the failures which not covered above, like:
+            // disk failure, kill by app master or resource manager, ...
+            allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
             (true, "Container marked as failed: " + containerId + onHostStr +
               ". Exit status: " + completedContainer.getExitStatus +
               ". Diagnostics: " + completedContainer.getDiagnostics)

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
new file mode 100644
index 0000000..1b48a0e
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.scheduler.BlacklistTracker
+import org.apache.spark.util.{Clock, SystemClock, Utils}
+
+/**
+ * YarnAllocatorBlacklistTracker is responsible for tracking the blacklisted 
nodes
+ * and synchronizing the node list to YARN.
+ *
+ * Blacklisted nodes are coming from two different sources:
+ *
+ * <ul>
+ *   <li> from the scheduler as task level blacklisted nodes
+ *   <li> from this class (tracked here) as YARN resource allocation problems
+ * </ul>
+ *
+ * The reason to realize this logic here (and not in the driver) is to avoid 
possible delays
+ * between synchronizing the blacklisted nodes with YARN and resource 
allocations.
+ */
+private[spark] class YarnAllocatorBlacklistTracker(
+    sparkConf: SparkConf,
+    amClient: AMRMClient[ContainerRequest],
+    failureTracker: FailureTracker)
+  extends Logging {
+
+  private val blacklistTimeoutMillis = 
BlacklistTracker.getBlacklistTimeout(sparkConf)
+
+  private val launchBlacklistEnabled = 
sparkConf.get(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED)
+
+  private val maxFailuresPerHost = sparkConf.get(MAX_FAILED_EXEC_PER_NODE)
+
+  private val allocatorBlacklist = new HashMap[String, Long]()
+
+  private var currentBlacklistedYarnNodes = Set.empty[String]
+
+  private var schedulerBlacklist = Set.empty[String]
+
+  private var numClusterNodes = Int.MaxValue
+
+  def setNumClusterNodes(numClusterNodes: Int): Unit = {
+    this.numClusterNodes = numClusterNodes
+  }
+
+  def handleResourceAllocationFailure(hostOpt: Option[String]): Unit = {
+    hostOpt match {
+      case Some(hostname) if launchBlacklistEnabled =>
+        // failures on an already blacklisted nodes are not even tracked.
+        // otherwise, such failures could shutdown the application
+        // as resource requests are asynchronous
+        // and a late failure response could exceed MAX_EXECUTOR_FAILURES
+        if (!schedulerBlacklist.contains(hostname) &&
+            !allocatorBlacklist.contains(hostname)) {
+          failureTracker.registerFailureOnHost(hostname)
+          updateAllocationBlacklistedNodes(hostname)
+        }
+      case _ =>
+        failureTracker.registerExecutorFailure()
+    }
+  }
+
+  private def updateAllocationBlacklistedNodes(hostname: String): Unit = {
+    val failuresOnHost = failureTracker.numFailuresOnHost(hostname)
+    if (failuresOnHost > maxFailuresPerHost) {
+      logInfo(s"blacklisting $hostname as YARN allocation failed 
$failuresOnHost times")
+      allocatorBlacklist.put(
+        hostname,
+        failureTracker.clock.getTimeMillis() + blacklistTimeoutMillis)
+      refreshBlacklistedNodes()
+    }
+  }
+
+  def setSchedulerBlacklistedNodes(schedulerBlacklistedNodesWithExpiry: 
Set[String]): Unit = {
+    this.schedulerBlacklist = schedulerBlacklistedNodesWithExpiry
+    refreshBlacklistedNodes()
+  }
+
+  def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= 
numClusterNodes
+
+  private def refreshBlacklistedNodes(): Unit = {
+    removeExpiredYarnBlacklistedNodes()
+    val allBlacklistedNodes = schedulerBlacklist ++ allocatorBlacklist.keySet
+    synchronizeBlacklistedNodeWithYarn(allBlacklistedNodes)
+  }
+
+  private def synchronizeBlacklistedNodeWithYarn(nodesToBlacklist: 
Set[String]): Unit = {
+    // Update blacklist information to YARN ResourceManager for this 
application,
+    // in order to avoid allocating new Containers on the problematic nodes.
+    val additions = (nodesToBlacklist -- 
currentBlacklistedYarnNodes).toList.sorted
+    val removals = (currentBlacklistedYarnNodes -- 
nodesToBlacklist).toList.sorted
+    if (additions.nonEmpty) {
+      logInfo(s"adding nodes to YARN application master's blacklist: 
$additions")
+    }
+    if (removals.nonEmpty) {
+      logInfo(s"removing nodes from YARN application master's blacklist: 
$removals")
+    }
+    amClient.updateBlacklist(additions.asJava, removals.asJava)
+    currentBlacklistedYarnNodes = nodesToBlacklist
+  }
+
+  private def removeExpiredYarnBlacklistedNodes(): Unit = {
+    val now = failureTracker.clock.getTimeMillis()
+    allocatorBlacklist.retain { (_, expiryTime) => expiryTime > now }
+  }
+}
+
+/**
+ * FailureTracker is responsible for tracking executor failures both for each 
host separately
+ * and for all hosts altogether.
+ */
+private[spark] class FailureTracker(
+    sparkConf: SparkConf,
+    val clock: Clock = new SystemClock) extends Logging {
+
+  private val executorFailuresValidityInterval =
+    
sparkConf.get(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
+
+  // Queue to store the timestamp of failed executors for each host
+  private val failedExecutorsTimeStampsPerHost = mutable.Map[String, 
mutable.Queue[Long]]()
+
+  private val failedExecutorsTimeStamps = new mutable.Queue[Long]()
+
+  private def updateAndCountFailures(failedExecutorsWithTimeStamps: 
mutable.Queue[Long]): Int = {
+    val endTime = clock.getTimeMillis()
+    while (executorFailuresValidityInterval > 0 &&
+        failedExecutorsWithTimeStamps.nonEmpty &&
+        failedExecutorsWithTimeStamps.head < endTime - 
executorFailuresValidityInterval) {
+      failedExecutorsWithTimeStamps.dequeue()
+    }
+    failedExecutorsWithTimeStamps.size
+  }
+
+  def numFailedExecutors: Int = synchronized {
+    updateAndCountFailures(failedExecutorsTimeStamps)
+  }
+
+  def registerFailureOnHost(hostname: String): Unit = synchronized {
+    val timeMillis = clock.getTimeMillis()
+    failedExecutorsTimeStamps.enqueue(timeMillis)
+    val failedExecutorsOnHost =
+      failedExecutorsTimeStampsPerHost.getOrElse(hostname, {
+        val failureOnHost = mutable.Queue[Long]()
+        failedExecutorsTimeStampsPerHost.put(hostname, failureOnHost)
+        failureOnHost
+      })
+    failedExecutorsOnHost.enqueue(timeMillis)
+  }
+
+  def registerExecutorFailure(): Unit = synchronized {
+    val timeMillis = clock.getTimeMillis()
+    failedExecutorsTimeStamps.enqueue(timeMillis)
+  }
+
+  def numFailuresOnHost(hostname: String): Int = {
+    failedExecutorsTimeStampsPerHost.get(hostname).map { failedExecutorsOnHost 
=>
+      updateAndCountFailures(failedExecutorsOnHost)
+    }.getOrElse(0)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 1a99b3b..129084a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -328,4 +328,10 @@ package object config {
     CACHED_FILES_TYPES,
     CACHED_CONF_ARCHIVE)
 
+  /* YARN allocator-level blacklisting related config entries. */
+  private[spark] val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED =
+    ConfigBuilder("spark.yarn.blacklist.executor.launch.blacklisting.enabled")
+      .booleanConf
+      .createWithDefault(false)
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/FailureTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/FailureTrackerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/FailureTrackerSuite.scala
new file mode 100644
index 0000000..4f77b9c
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/FailureTrackerSuite.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.util.ManualClock
+
+class FailureTrackerSuite extends SparkFunSuite with Matchers {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+  }
+
+  test("failures expire if validity interval is set") {
+    val sparkConf = new SparkConf()
+    sparkConf.set(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS, 100L)
+
+    val clock = new ManualClock()
+    val failureTracker = new FailureTracker(sparkConf, clock)
+
+    clock.setTime(0)
+    failureTracker.registerFailureOnHost("host1")
+    failureTracker.numFailuresOnHost("host1") should be (1)
+    failureTracker.numFailedExecutors should be (1)
+
+    clock.setTime(10)
+    failureTracker.registerFailureOnHost("host2")
+    failureTracker.numFailuresOnHost("host2") should be (1)
+    failureTracker.numFailedExecutors should be (2)
+
+    clock.setTime(20)
+    failureTracker.registerFailureOnHost("host1")
+    failureTracker.numFailuresOnHost("host1") should be (2)
+    failureTracker.numFailedExecutors should be (3)
+
+    clock.setTime(30)
+    failureTracker.registerFailureOnHost("host2")
+    failureTracker.numFailuresOnHost("host2") should be (2)
+    failureTracker.numFailedExecutors should be (4)
+
+    clock.setTime(101)
+    failureTracker.numFailuresOnHost("host1") should be (1)
+    failureTracker.numFailedExecutors should be (3)
+
+    clock.setTime(231)
+    failureTracker.numFailuresOnHost("host1") should be (0)
+    failureTracker.numFailuresOnHost("host2") should be (0)
+    failureTracker.numFailedExecutors should be (0)
+  }
+
+
+  test("failures never expire if validity interval is not set (-1)") {
+    val sparkConf = new SparkConf()
+
+    val clock = new ManualClock()
+    val failureTracker = new FailureTracker(sparkConf, clock)
+
+    clock.setTime(0)
+    failureTracker.registerFailureOnHost("host1")
+    failureTracker.numFailuresOnHost("host1") should be (1)
+    failureTracker.numFailedExecutors should be (1)
+
+    clock.setTime(10)
+    failureTracker.registerFailureOnHost("host2")
+    failureTracker.numFailuresOnHost("host2") should be (1)
+    failureTracker.numFailedExecutors should be (2)
+
+    clock.setTime(20)
+    failureTracker.registerFailureOnHost("host1")
+    failureTracker.numFailuresOnHost("host1") should be (2)
+    failureTracker.numFailedExecutors should be (3)
+
+    clock.setTime(30)
+    failureTracker.registerFailureOnHost("host2")
+    failureTracker.numFailuresOnHost("host2") should be (2)
+    failureTracker.numFailedExecutors should be (4)
+
+    clock.setTime(1000)
+    failureTracker.numFailuresOnHost("host1") should be (2)
+    failureTracker.numFailuresOnHost("host2") should be (2)
+    failureTracker.numFailedExecutors should be (4)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
new file mode 100644
index 0000000..aeac68e
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.util.Arrays
+import java.util.Collections
+
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import 
org.apache.spark.deploy.yarn.config.YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED
+import org.apache.spark.internal.config.{BLACKLIST_TIMEOUT_CONF, 
MAX_FAILED_EXEC_PER_NODE}
+import org.apache.spark.util.ManualClock
+
+class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
+  with BeforeAndAfterEach {
+
+  val BLACKLIST_TIMEOUT = 100L
+  val MAX_FAILED_EXEC_PER_NODE_VALUE = 2
+
+  var amClientMock: AMRMClient[ContainerRequest] = _
+  var yarnBlacklistTracker: YarnAllocatorBlacklistTracker = _
+  var failureTracker: FailureTracker = _
+  var clock: ManualClock = _
+
+  override def beforeEach(): Unit = {
+    val sparkConf = new SparkConf()
+    sparkConf.set(BLACKLIST_TIMEOUT_CONF, BLACKLIST_TIMEOUT)
+    sparkConf.set(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED, true)
+    sparkConf.set(MAX_FAILED_EXEC_PER_NODE, MAX_FAILED_EXEC_PER_NODE_VALUE)
+    clock = new ManualClock()
+
+    amClientMock = mock(classOf[AMRMClient[ContainerRequest]])
+    failureTracker = new FailureTracker(sparkConf, clock)
+    yarnBlacklistTracker =
+      new YarnAllocatorBlacklistTracker(sparkConf, amClientMock, 
failureTracker)
+    yarnBlacklistTracker.setNumClusterNodes(4)
+    super.beforeEach()
+  }
+
+  test("expiring its own blacklisted nodes") {
+    (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
+      _ => {
+        yarnBlacklistTracker.handleResourceAllocationFailure(Some("host"))
+        // host should not be blacklisted at these failures as 
MAX_FAILED_EXEC_PER_NODE is 2
+        verify(amClientMock, never())
+          .updateBlacklist(Arrays.asList("host"), Collections.emptyList())
+      }
+    }
+
+    yarnBlacklistTracker.handleResourceAllocationFailure(Some("host"))
+    // the third failure on the host triggers the blacklisting
+    verify(amClientMock).updateBlacklist(Arrays.asList("host"), 
Collections.emptyList())
+
+    clock.advance(BLACKLIST_TIMEOUT)
+
+    // trigger synchronisation of blacklisted nodes with YARN
+    yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set())
+    verify(amClientMock).updateBlacklist(Collections.emptyList(), 
Arrays.asList("host"))
+  }
+
+  test("not handling the expiry of scheduler blacklisted nodes") {
+    yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
+    verify(amClientMock)
+      .updateBlacklist(Arrays.asList("host1", "host2"), 
Collections.emptyList())
+
+    // advance timer more then host1, host2 expiry time
+    clock.advance(200L)
+
+    // expired blacklisted nodes (simulating a resource request)
+    yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
+    // no change is communicated to YARN regarding the blacklisting
+    verify(amClientMock).updateBlacklist(Collections.emptyList(), 
Collections.emptyList())
+  }
+
+  test("combining scheduler and allocation blacklist") {
+    (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
+      _ => {
+        yarnBlacklistTracker.handleResourceAllocationFailure(Some("host1"))
+        // host1 should not be blacklisted at these failures as 
MAX_FAILED_EXEC_PER_NODE is 2
+        verify(amClientMock, never())
+          .updateBlacklist(Arrays.asList("host1"), Collections.emptyList())
+      }
+    }
+
+    // as this is the third failure on host1 the node will be blacklisted
+    yarnBlacklistTracker.handleResourceAllocationFailure(Some("host1"))
+    verify(amClientMock)
+      .updateBlacklist(Arrays.asList("host1"), Collections.emptyList())
+
+    yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host2", "host3"))
+    verify(amClientMock)
+      .updateBlacklist(Arrays.asList("host2", "host3"), 
Collections.emptyList())
+
+    clock.advance(10L)
+
+    yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host3", "host4"))
+    verify(amClientMock)
+      .updateBlacklist(Arrays.asList("host4"), Arrays.asList("host2"))
+  }
+
+  test("blacklist all available nodes") {
+    yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2", 
"host3"))
+    verify(amClientMock)
+      .updateBlacklist(Arrays.asList("host1", "host2", "host3"), 
Collections.emptyList())
+
+    clock.advance(60L)
+    (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
+      _ => {
+        yarnBlacklistTracker.handleResourceAllocationFailure(Some("host4"))
+        // host4 should not be blacklisted at these failures as 
MAX_FAILED_EXEC_PER_NODE is 2
+        verify(amClientMock, never())
+          .updateBlacklist(Arrays.asList("host4"), Collections.emptyList())
+      }
+    }
+
+    // the third failure on the host triggers the blacklisting
+    yarnBlacklistTracker.handleResourceAllocationFailure(Some("host4"))
+
+    verify(amClientMock).updateBlacklist(Arrays.asList("host4"), 
Collections.emptyList())
+    assert(yarnBlacklistTracker.isAllNodeBlacklisted === true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b56e9c61/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 525abb6..3f783ba 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -59,6 +59,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers 
with BeforeAndAfter
 
   var rmClient: AMRMClient[ContainerRequest] = _
 
+  var clock: ManualClock = _
+
   var containerNum = 0
 
   override def beforeEach() {
@@ -66,6 +68,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers 
with BeforeAndAfter
     rmClient = AMRMClient.createAMRMClient()
     rmClient.init(conf)
     rmClient.start()
+    clock = new ManualClock()
   }
 
   override def afterEach() {
@@ -101,7 +104,8 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
       appAttemptId,
       new SecurityManager(sparkConf),
       Map(),
-      new MockResolver())
+      new MockResolver(),
+      clock)
   }
 
   def createContainer(host: String): Container = {
@@ -332,10 +336,14 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), 
Set("hostA"))
     verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, 
Seq[String]().asJava)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), 
Set("hostA", "hostB"))
+    val blacklistedNodes = Set(
+      "hostA",
+      "hostB"
+    )
+    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), 
blacklistedNodes)
     verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, 
Seq[String]().asJava)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set())
+    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), 
Set.empty)
     verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA", 
"hostB").asJava)
   }
 
@@ -353,8 +361,6 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
   test("window based failure executor counting") {
     sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
     val handler = createAllocator(4)
-    val clock = new ManualClock(0L)
-    handler.setClock(clock)
 
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to