xkrogen commented on a change in pull request #29906:
URL: https://github.com/apache/spark/pull/29906#discussion_r499077732



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -722,74 +722,83 @@ package object config {
       .booleanConf
       .createWithDefault(true)
 
-  // Blacklist confs
-  private[spark] val BLACKLIST_ENABLED =
-    ConfigBuilder("spark.blacklist.enabled")
+  private[spark] val EXCLUDE_ON_FAILURE_ENABLED =
+    ConfigBuilder("spark.excludeOnFailure.enabled")
       .version("2.1.0")

Review comment:
       Do we need to update the "from" version strings here?

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -907,13 +908,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()
 
   /**
-   * Checks whether the executor is blacklisted. This is called when the 
executor tries to
-   * register with the scheduler, and will deny registration if this method 
returns true.
+   * Checks whether the executor is excluded due to failure(s). This is called 
when the executor
+   *  tries to register with the scheduler, and will deny registration if this 
method returns true.

Review comment:
       minor nit: extra space at the start of the line

##########
File path: core/src/main/scala/org/apache/spark/status/api/v1/api.scala
##########
@@ -82,10 +82,11 @@ class ExecutorStageSummary private[spark](
     val shuffleWriteRecords : Long,
     val memoryBytesSpilled : Long,
     val diskBytesSpilled : Long,
-    val isBlacklistedForStage: Boolean,
+    val isBlacklistedForStage: Boolean, // deprecated

Review comment:
       Can we `@deprecated` for this and others?

##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -284,80 +284,138 @@ private[spark] class AppStatusListener(
   }
 
   override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): 
Unit = {
-    updateBlackListStatus(event.executorId, true)
+    updateExcludedStatus(event.executorId, true)
+  }
+
+  override def onExecutorExcluded(event: SparkListenerExecutorExcluded): Unit 
= {
+    updateExcludedStatus(event.executorId, true)
   }
 
   override def onExecutorBlacklistedForStage(
-      event: SparkListenerExecutorBlacklistedForStage): Unit = {
+    event: SparkListenerExecutorBlacklistedForStage): Unit = {
+    val now = System.nanoTime()
+
+    Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
+      setStageExcludedStatus(stage, now, event.executorId)
+    }
+    liveExecutors.get(event.executorId).foreach { exec =>
+      addExcludedStageTo(exec, event.stageId, now)
+    }
+  }
+
+  override def onExecutorExcludedForStage(
+      event: SparkListenerExecutorExcludedForStage): Unit = {
     val now = System.nanoTime()
 
     Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
-      setStageBlackListStatus(stage, now, event.executorId)
+      setStageExcludedStatus(stage, now, event.executorId)
     }
     liveExecutors.get(event.executorId).foreach { exec =>
-      addBlackListedStageTo(exec, event.stageId, now)
+      addExcludedStageTo(exec, event.stageId, now)
     }
   }
 
   override def onNodeBlacklistedForStage(event: 
SparkListenerNodeBlacklistedForStage): Unit = {
     val now = System.nanoTime()
 
-    // Implicitly blacklist every available executor for the stage associated 
with this node
+    // Implicitly exclude every available executor for the stage associated 
with this node
     Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
       val executorIds = liveExecutors.values.filter(_.host == 
event.hostId).map(_.executorId).toSeq
-      setStageBlackListStatus(stage, now, executorIds: _*)
+      setStageExcludedStatus(stage, now, executorIds: _*)
     }
     liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec =>
-      addBlackListedStageTo(exec, event.stageId, now)
+      addExcludedStageTo(exec, event.stageId, now)
+    }
+  }
+
+  override def onNodeExcludedForStage(event: 
SparkListenerNodeExcludedForStage): Unit = {
+    val now = System.nanoTime()
+
+    // Implicitly exclude every available executor for the stage associated 
with this node
+    Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
+      val executorIds = liveExecutors.values.filter(_.host == 
event.hostId).map(_.executorId).toSeq
+      setStageExcludedStatus(stage, now, executorIds: _*)
+    }
+    liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec =>
+      addExcludedStageTo(exec, event.stageId, now)
     }
   }
 
   private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now: 
Long): Unit = {
-    exec.blacklistedInStages += stageId
+    exec.excludedInStages += stageId
+    liveUpdate(exec, now)
+  }
+
+  private def addExcludedStageTo(exec: LiveExecutor, stageId: Int, now: Long): 
Unit = {
+    exec.excludedInStages += stageId
     liveUpdate(exec, now)
   }
 
   private def setStageBlackListStatus(stage: LiveStage, now: Long, 
executorIds: String*): Unit = {
     executorIds.foreach { executorId =>
       val executorStageSummary = stage.executorSummary(executorId)
-      executorStageSummary.isBlacklisted = true
+      executorStageSummary.isExcluded = true
       maybeUpdate(executorStageSummary, now)
     }
-    stage.blackListedExecutors ++= executorIds
+    stage.excludedExecutors ++= executorIds
+    maybeUpdate(stage, now)
+  }
+
+  private def setStageExcludedStatus(stage: LiveStage, now: Long, executorIds: 
String*): Unit = {
+    executorIds.foreach { executorId =>
+      val executorStageSummary = stage.executorSummary(executorId)
+      executorStageSummary.isExcluded = true
+      maybeUpdate(executorStageSummary, now)
+    }
+    stage.excludedExecutors ++= executorIds
     maybeUpdate(stage, now)
   }
 
   override def onExecutorUnblacklisted(event: 
SparkListenerExecutorUnblacklisted): Unit = {
-    updateBlackListStatus(event.executorId, false)
+    updateExcludedStatus(event.executorId, false)
+  }
+
+  override def onExecutorUnexcluded(event: SparkListenerExecutorUnexcluded): 
Unit = {
+    updateExcludedStatus(event.executorId, false)
   }
 
   override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
-    updateNodeBlackList(event.hostId, true)
+    updateNodeExcluded(event.hostId, true)
+  }
+
+  override def onNodeExcluded(event: SparkListenerNodeExcluded): Unit = {
+    updateNodeExcluded(event.hostId, true)
   }
 
   override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): 
Unit = {
-    updateNodeBlackList(event.hostId, false)
+    updateNodeExcluded(event.hostId, false)
+  }
+
+  override def onNodeUnexcluded(event: SparkListenerNodeUnexcluded): Unit = {
+    updateNodeExcluded(event.hostId, false)
   }
 
-  private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
+  private def updateExcludedStatus(execId: String, excluded: Boolean): Unit = {

Review comment:
       Is `updateExclusionStatus` a better name? It reads more smoothly to me, 
but I'm not sure if it's less clear.

##########
File path: docs/monitoring.md
##########
@@ -1125,12 +1125,14 @@ This is the component with the largest amount of 
instrumented metrics
   - stages.failedStages.count
   - stages.skippedStages.count
   - stages.completedStages.count
-  - tasks.blackListedExecutors.count
+  - tasks.blackListedExecutors.count // deprecated use excludedExecutors 
instead

Review comment:
       Is it even worth mentioning in the documentation?

##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -284,80 +284,138 @@ private[spark] class AppStatusListener(
   }
 
   override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): 
Unit = {
-    updateBlackListStatus(event.executorId, true)
+    updateExcludedStatus(event.executorId, true)
+  }
+
+  override def onExecutorExcluded(event: SparkListenerExecutorExcluded): Unit 
= {
+    updateExcludedStatus(event.executorId, true)
   }
 
   override def onExecutorBlacklistedForStage(
-      event: SparkListenerExecutorBlacklistedForStage): Unit = {
+    event: SparkListenerExecutorBlacklistedForStage): Unit = {
+    val now = System.nanoTime()
+
+    Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
+      setStageExcludedStatus(stage, now, event.executorId)
+    }
+    liveExecutors.get(event.executorId).foreach { exec =>
+      addExcludedStageTo(exec, event.stageId, now)
+    }
+  }
+
+  override def onExecutorExcludedForStage(
+      event: SparkListenerExecutorExcludedForStage): Unit = {
     val now = System.nanoTime()
 
     Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
-      setStageBlackListStatus(stage, now, event.executorId)
+      setStageExcludedStatus(stage, now, event.executorId)
     }
     liveExecutors.get(event.executorId).foreach { exec =>
-      addBlackListedStageTo(exec, event.stageId, now)
+      addExcludedStageTo(exec, event.stageId, now)
     }
   }
 
   override def onNodeBlacklistedForStage(event: 
SparkListenerNodeBlacklistedForStage): Unit = {
     val now = System.nanoTime()
 
-    // Implicitly blacklist every available executor for the stage associated 
with this node
+    // Implicitly exclude every available executor for the stage associated 
with this node
     Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
       val executorIds = liveExecutors.values.filter(_.host == 
event.hostId).map(_.executorId).toSeq
-      setStageBlackListStatus(stage, now, executorIds: _*)
+      setStageExcludedStatus(stage, now, executorIds: _*)
     }
     liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec =>
-      addBlackListedStageTo(exec, event.stageId, now)
+      addExcludedStageTo(exec, event.stageId, now)
+    }
+  }
+
+  override def onNodeExcludedForStage(event: 
SparkListenerNodeExcludedForStage): Unit = {
+    val now = System.nanoTime()
+
+    // Implicitly exclude every available executor for the stage associated 
with this node
+    Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
+      val executorIds = liveExecutors.values.filter(_.host == 
event.hostId).map(_.executorId).toSeq
+      setStageExcludedStatus(stage, now, executorIds: _*)
+    }
+    liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec =>
+      addExcludedStageTo(exec, event.stageId, now)
     }
   }
 
   private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now: 
Long): Unit = {
-    exec.blacklistedInStages += stageId
+    exec.excludedInStages += stageId
+    liveUpdate(exec, now)
+  }
+
+  private def addExcludedStageTo(exec: LiveExecutor, stageId: Int, now: Long): 
Unit = {
+    exec.excludedInStages += stageId
     liveUpdate(exec, now)
   }
 
   private def setStageBlackListStatus(stage: LiveStage, now: Long, 
executorIds: String*): Unit = {
     executorIds.foreach { executorId =>
       val executorStageSummary = stage.executorSummary(executorId)
-      executorStageSummary.isBlacklisted = true
+      executorStageSummary.isExcluded = true
       maybeUpdate(executorStageSummary, now)
     }
-    stage.blackListedExecutors ++= executorIds
+    stage.excludedExecutors ++= executorIds
+    maybeUpdate(stage, now)
+  }
+
+  private def setStageExcludedStatus(stage: LiveStage, now: Long, executorIds: 
String*): Unit = {

Review comment:
       Is `setStageExclusionStatus` a better name? It reads more smoothly to 
me, but I'm not sure if it's less clear.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -638,18 +638,19 @@ private[spark] class TaskSchedulerImpl(
                     }
                   } else {
                     // Abort Immediately
-                    logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
-                      s" executors can be found to kill. Aborting stage 
${taskSet.stageId}.")
-                    taskSet.abortSinceCompletelyBlacklisted(taskIndex)
+                    logInfo("Cannot schedule any task because all executor 
excluded from " +

Review comment:
       minor nit: `all executor excluded` -> `all executors excluded`

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -722,74 +722,83 @@ package object config {
       .booleanConf
       .createWithDefault(true)
 
-  // Blacklist confs
-  private[spark] val BLACKLIST_ENABLED =
-    ConfigBuilder("spark.blacklist.enabled")
+  private[spark] val EXCLUDE_ON_FAILURE_ENABLED =
+    ConfigBuilder("spark.excludeOnFailure.enabled")
       .version("2.1.0")
+      .withAlternative("spark.blacklist.enabled")
       .booleanConf
       .createOptional
 
   private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR =
-    ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor")
+    ConfigBuilder("spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor")
       .version("2.1.0")
+      .withAlternative("spark.blacklist.task.maxTaskAttemptsPerExecutor")
       .intConf
       .createWithDefault(1)
 
   private[spark] val MAX_TASK_ATTEMPTS_PER_NODE =
-    ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerNode")
+    ConfigBuilder("spark.excludeOnFailure.task.maxTaskAttemptsPerNode")
       .version("2.1.0")
+      .withAlternative("spark.blacklist.task.maxTaskAttemptsPerNode")
       .intConf
       .createWithDefault(2)
 
   private[spark] val MAX_FAILURES_PER_EXEC =
-    ConfigBuilder("spark.blacklist.application.maxFailedTasksPerExecutor")
+    
ConfigBuilder("spark.excludeOnFailure.application.maxFailedTasksPerExecutor")
       .version("2.2.0")
+      .withAlternative("spark.blacklist.application.maxFailedTasksPerExecutor")
       .intConf
       .createWithDefault(2)
 
   private[spark] val MAX_FAILURES_PER_EXEC_STAGE =
-    ConfigBuilder("spark.blacklist.stage.maxFailedTasksPerExecutor")
+    ConfigBuilder("spark.excludeOnFailure.stage.maxFailedTasksPerExecutor")
       .version("2.1.0")
+      .withAlternative("spark.blacklist.stage.maxFailedTasksPerExecutor")
       .intConf
       .createWithDefault(2)
 
   private[spark] val MAX_FAILED_EXEC_PER_NODE =
-    ConfigBuilder("spark.blacklist.application.maxFailedExecutorsPerNode")
+    
ConfigBuilder("spark.excludeOnFailure.application.maxFailedExecutorsPerNode")
       .version("2.2.0")
+      .withAlternative("spark.blacklist.application.maxFailedExecutorsPerNode")
       .intConf
       .createWithDefault(2)
 
   private[spark] val MAX_FAILED_EXEC_PER_NODE_STAGE =
-    ConfigBuilder("spark.blacklist.stage.maxFailedExecutorsPerNode")
+    ConfigBuilder("spark.excludeOnFailure.stage.maxFailedExecutorsPerNode")
       .version("2.1.0")
+      .withAlternative("spark.blacklist.stage.maxFailedExecutorsPerNode")
       .intConf
       .createWithDefault(2)
 
-  private[spark] val BLACKLIST_TIMEOUT_CONF =
-    ConfigBuilder("spark.blacklist.timeout")
+  private[spark] val EXCLUDE_ON_FAILURE_TIMEOUT_CONF =
+    ConfigBuilder("spark.excludeOnFailure.timeout")
       .version("2.1.0")
+      .withAlternative("spark.blacklist.timeout")
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
-  private[spark] val BLACKLIST_KILL_ENABLED =
-    ConfigBuilder("spark.blacklist.killBlacklistedExecutors")
+  private[spark] val EXCLUDE_ON_FAILURE_KILL_ENABLED =
+    ConfigBuilder("spark.excludeOnFailure.killExcludedExecutors")
       .version("2.2.0")
+      .withAlternative("spark.excludeOnFailure.timeout")
       .booleanConf
       .createWithDefault(false)
 
-  private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
-    ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
+  private[spark] val EXCLUDE_ON_FAILURE_LEGACY_TIMEOUT_CONF =
+    ConfigBuilder("spark.scheduler.executorTaskExcludeOnFailureTime")
       .internal()
       .version("1.0.0")
+      .withAlternative("spark.scheduler.executorTaskBlacklistTime")
       .timeConf(TimeUnit.MILLISECONDS)

Review comment:
       Given that this is already a legacy config marked as deprecated, it 
doesn't seem worth adding a new version. What do you think?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
##########
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
+import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.util.{Clock, SystemClock, Utils}
+
+/**
+ * HealthTracker is designed to track problematic executors and nodes.  It 
supports excluding
+ * executors and nodes across an entire application (with a periodic expiry). 
TaskSetManagers add
+ * additional logic for exclusion of executors and nodes for individual tasks 
and stages which
+ * works in concert with the logic here.
+ *
+ * The tracker needs to deal with a variety of workloads, eg.:
+ *
+ *  * bad user code -- this may lead to many task failures, but that should 
not count against
+ *      individual executors
+ *  * many small stages -- this may prevent a bad executor for having many 
failures within one
+ *      stage, but still many failures over the entire application
+ *  * "flaky" executors -- they don't fail every task, but are still faulty 
enough to merit
+ *      excluding
+ *
+ * See the design doc on SPARK-8425 for a more in-depth discussion.

Review comment:
       People might become confused when they go to SPARK-8425 and the name is 
completely different, should we also include a reference to SPARK-32037?

##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -284,80 +284,138 @@ private[spark] class AppStatusListener(
   }
 
   override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): 
Unit = {
-    updateBlackListStatus(event.executorId, true)
+    updateExcludedStatus(event.executorId, true)
+  }
+
+  override def onExecutorExcluded(event: SparkListenerExecutorExcluded): Unit 
= {
+    updateExcludedStatus(event.executorId, true)
   }
 
   override def onExecutorBlacklistedForStage(
-      event: SparkListenerExecutorBlacklistedForStage): Unit = {
+    event: SparkListenerExecutorBlacklistedForStage): Unit = {
+    val now = System.nanoTime()
+

Review comment:
       Should we consider delegating this implementation to 
`onExecutorExcludedForStage` to avoid code duplication? Same for others.

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala
##########
@@ -72,72 +73,76 @@ private[spark] class YarnAllocatorBlacklistTracker(
 
   def handleResourceAllocationFailure(hostOpt: Option[String]): Unit = {
     hostOpt match {
-      case Some(hostname) if launchBlacklistEnabled =>
-        // failures on an already blacklisted nodes are not even tracked.
+      case Some(hostname) if launchExcludeOnFailureEnabled =>
+        // failures on an already excluded node are not even tracked.
         // otherwise, such failures could shutdown the application
         // as resource requests are asynchronous
         // and a late failure response could exceed MAX_EXECUTOR_FAILURES
-        if (!schedulerBlacklist.contains(hostname) &&
-            !allocatorBlacklist.contains(hostname)) {
+        if (!schedulerExcludedNodeList.contains(hostname) &&
+            !allocatorExcludedNodeList.contains(hostname)) {
           failureTracker.registerFailureOnHost(hostname)
-          updateAllocationBlacklistedNodes(hostname)
+          updateAllocationExcludedNodes(hostname)
         }
       case _ =>
         failureTracker.registerExecutorFailure()
     }
   }
 
-  private def updateAllocationBlacklistedNodes(hostname: String): Unit = {
+  private def updateAllocationExcludedNodes(hostname: String): Unit = {
     val failuresOnHost = failureTracker.numFailuresOnHost(hostname)
     if (failuresOnHost > maxFailuresPerHost) {
-      logInfo(s"blacklisting $hostname as YARN allocation failed 
$failuresOnHost times")
-      allocatorBlacklist.put(
+      logInfo(s"excluding $hostname as YARN allocation failed $failuresOnHost 
times")
+      allocatorExcludedNodeList.put(
         hostname,
-        failureTracker.clock.getTimeMillis() + blacklistTimeoutMillis)
-      refreshBlacklistedNodes()
+        failureTracker.clock.getTimeMillis() + excludeOnFailureTimeoutMillis)
+      refreshExcludedNodes()
     }
   }
 
-  def setSchedulerBlacklistedNodes(schedulerBlacklistedNodesWithExpiry: 
Set[String]): Unit = {
-    this.schedulerBlacklist = schedulerBlacklistedNodesWithExpiry
-    refreshBlacklistedNodes()
+  def setSchedulerExcludedNodes(schedulerExcludedNodesWithExpiry: 
Set[String]): Unit = {
+    this.schedulerExcludedNodeList = schedulerExcludedNodesWithExpiry
+    refreshExcludedNodes()
   }
 
-  def isAllNodeBlacklisted: Boolean = {
+  def isAllNodeExcluded: Boolean = {
     if (numClusterNodes <= 0) {
       logWarning("No available nodes reported, please check Resource Manager.")
       false
     } else {
-      currentBlacklistedYarnNodes.size >= numClusterNodes
+      currentExcludededYarnNodes.size >= numClusterNodes
     }
   }
 
-  private def refreshBlacklistedNodes(): Unit = {
-    removeExpiredYarnBlacklistedNodes()
-    val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ 
allocatorBlacklist.keySet
-    synchronizeBlacklistedNodeWithYarn(allBlacklistedNodes)
+  private def refreshExcludedNodes(): Unit = {
+    removeExpiredYarnExcludedNodes()
+    val allExcludeddNodes =

Review comment:
       minor nit: `allExcludeddNodes` -> `allExcludedNodes`

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
##########
@@ -379,14 +379,15 @@ package object config extends Logging {
     .stringConf
     .createOptional
 
-  /* YARN allocator-level blacklisting related config entries. */
-  private[spark] val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED =
-    ConfigBuilder("spark.yarn.blacklist.executor.launch.blacklisting.enabled")
+  /* YARN allocator-level excludeOnFailure related config entries. */
+  private[spark] val YARN_EXECUTOR_LAUNCH_EXCLUDE_ON_FAILURE_ENABLED =
+    ConfigBuilder("spark.yarn.executor.launch.excludeOnFailure.enabled")

Review comment:
       I wonder if `spark.yarn.exclude.excludeOnFailure.enabled` or something 
might be better to bring it more in line with `spark.yarn.exclude.nodes`? I 
don't really see why "launch" appears in this config name...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to