This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c8628c9  Revert "[SPARK-34104][SPARK-34105][CORE][K8S] Maximum 
decommissioning time & allow decommissioning for excludes"
c8628c9 is described below

commit c8628c943cd12bbad7561bdc297cea9ff23becc7
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Wed Feb 10 08:00:03 2021 +0900

    Revert "[SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time 
& allow decommissioning for excludes"
    
    This reverts commit 50641d2e3d659f51432aa2c0e6b9af76d71a5796.
---
 .../apache/spark/ExecutorAllocationClient.scala    |  6 ---
 .../org/apache/spark/internal/config/package.scala | 19 +-------
 .../org/apache/spark/scheduler/HealthTracker.scala | 35 +++-----------
 .../cluster/CoarseGrainedClusterMessage.scala      |  3 --
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 56 ++--------------------
 .../spark/scheduler/HealthTrackerSuite.scala       | 45 -----------------
 .../k8s/integrationtest/DecommissionSuite.scala    | 32 -------------
 .../k8s/integrationtest/KubernetesSuite.scala      |  5 +-
 8 files changed, 14 insertions(+), 187 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 5b587d7..cdba1c4 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -129,12 +129,6 @@ private[spark] trait ExecutorAllocationClient {
     decommissionedExecutors.nonEmpty && 
decommissionedExecutors(0).equals(executorId)
   }
 
-  /**
-   * Request that the cluster manager decommission every executor on the 
specified host.
-   *
-   * @return whether the request is acknowledged by the cluster manager.
-   */
-  def decommissionExecutorsOnHost(host: String): Boolean
 
   /**
    * Request that the cluster manager kill every executor on the specified 
host.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 3101bb6..7aeb51d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -827,13 +827,6 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
-  private[spark] val EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED =
-    ConfigBuilder("spark.excludeOnFailure.killExcludedExecutors.decommission")
-      .doc("Attempt decommission of excluded nodes instead of going directly 
to kill")
-      .version("3.2.0")
-      .booleanConf
-      .createWithDefault(false)
-
   private[spark] val EXCLUDE_ON_FAILURE_LEGACY_TIMEOUT_CONF =
     ConfigBuilder("spark.scheduler.executorTaskExcludeOnFailureTime")
       .internal()
@@ -1965,8 +1958,7 @@ package object config {
 
   private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
     ConfigBuilder("spark.executor.decommission.killInterval")
-      .doc("Duration after which a decommissioned executor will be killed 
forcefully " +
-        "*by an outside* (e.g. non-spark) service. " +
+      .doc("Duration after which a decommissioned executor will be killed 
forcefully." +
         "This config is useful for cloud environments where we know in advance 
when " +
         "an executor is going to go down after decommissioning signal i.e. 
around 2 mins " +
         "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is 
currently " +
@@ -1975,15 +1967,6 @@ package object config {
       .timeConf(TimeUnit.SECONDS)
       .createOptional
 
-  private[spark] val EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT =
-    ConfigBuilder("spark.executor.decommission.forceKillTimeout")
-      .doc("Duration after which a Spark will force a decommissioning executor 
to exit." +
-        " this should be set to a high value in most situations as low values 
will prevent " +
-        " block migrations from having enough time to complete.")
-      .version("3.2.0")
-      .timeConf(TimeUnit.SECONDS)
-      .createOptional
-
   private[spark] val EXECUTOR_DECOMMISSION_SIGNAL =
     ConfigBuilder("spark.executor.decommission.signal")
       .doc("The signal that used to trigger the executor to start 
decommission.")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala 
b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
index 6bd5668..c6b8dca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
@@ -40,7 +40,6 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
  *      stage, but still many failures over the entire application
  *  * "flaky" executors -- they don't fail every task, but are still faulty 
enough to merit
  *      excluding
- *  * missing shuffle files -- may trigger fetch failures on healthy executors.
  *
  * See the design doc on SPARK-8425 for a more in-depth discussion. Note 
SPARK-32037 renamed
  * the feature.
@@ -65,8 +64,6 @@ private[scheduler] class HealthTracker (
   val EXCLUDE_ON_FAILURE_TIMEOUT_MILLIS = 
HealthTracker.getExludeOnFailureTimeout(conf)
   private val EXCLUDE_FETCH_FAILURE_ENABLED =
     conf.get(config.EXCLUDE_ON_FAILURE_FETCH_FAILURE_ENABLED)
-  private val EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED =
-    conf.get(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED)
 
   /**
    * A map from executorId to information on task failures. Tracks the time of 
each task failure,
@@ -157,21 +154,11 @@ private[scheduler] class HealthTracker (
   }
 
   private def killExecutor(exec: String, msg: String): Unit = {
-    val fullMsg = if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
-      s"${msg} (actually decommissioning)"
-    } else {
-      msg
-    }
     allocationClient match {
       case Some(a) =>
-        logInfo(fullMsg)
-        if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
-          a.decommissionExecutor(exec, ExecutorDecommissionInfo(fullMsg),
-            adjustTargetNumExecutors = false)
-        } else {
-          a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
-            force = true)
-        }
+        logInfo(msg)
+        a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
+          force = true)
       case None =>
         logInfo(s"Not attempting to kill excluded executor id $exec " +
           s"since allocation client is not defined.")
@@ -195,18 +182,10 @@ private[scheduler] class HealthTracker (
     if (conf.get(config.EXCLUDE_ON_FAILURE_KILL_ENABLED)) {
       allocationClient match {
         case Some(a) =>
-          if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
-            logInfo(s"Decommissioning all executors on excluded host $node " +
-              s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
-            if (!a.decommissionExecutorsOnHost(node)) {
-              logError(s"Decommissioning executors on $node failed.")
-            }
-          } else {
-            logInfo(s"Killing all executors on excluded host $node " +
-              s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
-            if (!a.killExecutorsOnHost(node)) {
-              logError(s"Killing executors on node $node failed.")
-            }
+          logInfo(s"Killing all executors on excluded host $node " +
+            s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
+          if (a.killExecutorsOnHost(node) == false) {
+            logError(s"Killing executors on node $node failed.")
           }
         case None =>
           logWarning(s"Not attempting to kill executors on excluded host $node 
" +
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index a6f52f9..2f17143 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -49,9 +49,6 @@ private[spark] object CoarseGrainedClusterMessages {
   case class KillExecutorsOnHost(host: String)
     extends CoarseGrainedClusterMessage
 
-  case class DecommissionExecutorsOnHost(host: String)
-    extends CoarseGrainedClusterMessage
-
   case class UpdateDelegationTokens(tokens: Array[Byte])
     extends CoarseGrainedClusterMessage
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b44f677..ccb5eb1 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster
 
-import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import javax.annotation.concurrent.GuardedBy
 
@@ -115,11 +115,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   private val reviveThread =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
 
-  private val cleanupService: Option[ScheduledExecutorService] =
-    conf.get(EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT).map { _ =>
-      
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cleanup-decommission-execs")
-    }
-
   class DriverEndpoint extends IsolatedRpcEndpoint with Logging {
 
     override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv
@@ -181,20 +176,11 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         }
 
       case KillExecutorsOnHost(host) =>
-        scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
-          killExecutors(execs.toSeq, adjustTargetNumExecutors = false, 
countFailures = false,
+        scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
+          killExecutors(exec.toSeq, adjustTargetNumExecutors = false, 
countFailures = false,
             force = true)
         }
 
-      case DecommissionExecutorsOnHost(host) =>
-        val reason = ExecutorDecommissionInfo(s"Decommissioning all executors 
on $host.")
-        scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
-          val execsWithReasons = execs.map(exec => (exec, reason)).toArray
-
-          decommissionExecutors(execsWithReasons, adjustTargetNumExecutors = 
false,
-            triggeredByExecutor = false)
-        }
-
       case UpdateDelegationTokens(newDelegationTokens) =>
         updateDelegationTokens(newDelegationTokens)
 
@@ -520,21 +506,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       }
     }
 
-    conf.get(EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT).map { cleanupInterval =>
-      val cleanupTask = new Runnable() {
-        override def run(): Unit = Utils.tryLogNonFatalError {
-          val stragglers = CoarseGrainedSchedulerBackend.this.synchronized {
-            
executorsToDecommission.filter(executorsPendingDecommission.contains)
-          }
-          if (stragglers.nonEmpty) {
-            logInfo(s"${stragglers.toList} failed to decommission in 
${cleanupInterval}, killing.")
-            killExecutors(stragglers, false, false, true)
-          }
-        }
-      }
-      cleanupService.map(_.schedule(cleanupTask, cleanupInterval, 
TimeUnit.SECONDS))
-    }
-
     executorsToDecommission
   }
 
@@ -577,7 +548,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   override def stop(): Unit = {
     reviveThread.shutdownNow()
-    cleanupService.foreach(_.shutdownNow())
     stopExecutors()
     delegationTokenManager.foreach(_.stop())
     try {
@@ -881,29 +851,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     Future.successful(false)
 
   /**
-   * Request that the cluster manager decommissions all executors on a given 
host.
-   * @return whether the decommission request is acknowledged.
-   */
-  final override def decommissionExecutorsOnHost(host: String): Boolean = {
-    logInfo(s"Requesting to kill any and all executors on host $host")
-    // A potential race exists if a new executor attempts to register on a host
-    // that is on the exclude list and is no longer valid. To avoid this race,
-    // all executor registration and decommissioning happens in the event 
loop. This way, either
-    // an executor will fail to register, or will be decommed when all 
executors on a host
-    // are decommed.
-    // Decommission all the executors on this host in an event loop to ensure 
serialization.
-    driverEndpoint.send(DecommissionExecutorsOnHost(host))
-    true
-  }
-
-  /**
    * Request that the cluster manager kill all executors on a given host.
    * @return whether the kill request is acknowledged.
    */
   final override def killExecutorsOnHost(host: String): Boolean = {
-    logInfo(s"Requesting to kill any and all executors on host $host")
+    logInfo(s"Requesting to kill any and all executors on host ${host}")
     // A potential race exists if a new executor attempts to register on a host
-    // that is on the exclude list and is no longer valid. To avoid this race,
+    // that is on the exclude list and is no no longer valid. To avoid this 
race,
     // all executor registration and killing happens in the event loop. This 
way, either
     // an executor will fail to register, or will be killed when all executors 
on a host
     // are killed.
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
index 5710be1..7ecc1f5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
@@ -554,51 +554,6 @@ class HealthTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with Mock
     verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
-  test("excluding decommission and kills executors when enabled") {
-    val allocationClientMock = mock[ExecutorAllocationClient]
-
-    // verify we decommission when configured
-    conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
-    conf.set(config.DECOMMISSION_ENABLED.key, "true")
-    conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
-    conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
-    conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
-    healthTracker = new HealthTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
-
-    // Fail 4 tasks in one task set on executor 1, so that executor gets 
excluded for the whole
-    // application.
-    val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
-    (0 until 4).foreach { partition =>
-      taskSetExclude2.updateExcludedForFailedTask(
-        "hostA", exec = "1", index = partition, failureReason = "testing")
-    }
-    healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude2.execToFailures)
-
-    val msg1 =
-      "Killing excluded executor id 1 since 
spark.excludeOnFailure.killExcludedExecutors is set." +
-      " (actually decommissioning)"
-
-    verify(allocationClientMock).decommissionExecutor(
-      "1", ExecutorDecommissionInfo(msg1), false)
-
-    val taskSetExclude3 = createTaskSetExcludelist(stageId = 1)
-    // Fail 4 tasks in one task set on executor 2, so that executor gets 
excluded for the whole
-    // application.  Since that's the second executor that is excluded on the 
same node, we also
-    // exclude that node.
-    (0 until 4).foreach { partition =>
-      taskSetExclude3.updateExcludedForFailedTask(
-        "hostA", exec = "2", index = partition, failureReason = "testing")
-    }
-    healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude3.execToFailures)
-
-    val msg2 =
-      "Killing excluded executor id 2 since 
spark.excludeOnFailure.killExcludedExecutors is set." +
-      " (actually decommissioning)"
-    verify(allocationClientMock).decommissionExecutor(
-      "2", ExecutorDecommissionInfo(msg2), false, false)
-    verify(allocationClientMock).decommissionExecutorsOnHost("hostA")
-  }
-
   test("fetch failure excluding kills executors, configured by 
EXCLUDE_ON_FAILURE_KILL_ENABLED") {
     val allocationClientMock = mock[ExecutorAllocationClient]
     when(allocationClientMock.killExecutors(any(), any(), any(), 
any())).thenReturn(Seq("called"))
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 56a23ab..92f6a32 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -116,38 +116,6 @@ private[spark] trait DecommissionSuite { k8sSuite: 
KubernetesSuite =>
       executorPatience = None,
       decommissioningTest = false)
   }
-
-  test("Test decommissioning timeouts", k8sTestTag) {
-    sparkAppConf
-      .set(config.DECOMMISSION_ENABLED.key, "true")
-      .set("spark.kubernetes.container.image", pyImage)
-      .set(config.STORAGE_DECOMMISSION_ENABLED.key, "true")
-      .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key, "true")
-      .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key, "true")
-      // Ensure we have somewhere to migrate our data too
-      .set("spark.executor.instances", "3")
-      // Set super high so the timeout is triggered
-      .set("spark.storage.decommission.replicationReattemptInterval", 
"8640000")
-      // Set super low so the timeout is triggered
-      .set(config.EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL.key, "10")
-
-    runSparkApplicationAndVerifyCompletion(
-      appResource = PYSPARK_DECOMISSIONING,
-      mainClass = "",
-      expectedDriverLogOnCompletion = Seq(
-        "Finished waiting, stopping Spark",
-        "Decommission executors",
-        "failed to decommission in 10, killing",
-        "killed by driver."),
-      appArgs = Array.empty[String],
-      driverPodChecker = doBasicDriverPyPodCheck,
-      executorPodChecker = doBasicExecutorPyPodCheck,
-      appLocator = appLocator,
-      isJVM = false,
-      pyFiles = None,
-      executorPatience = None,
-      decommissioningTest = true)
-  }
 }
 
 private[spark] object DecommissionSuite {
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 9f1bcf7..494c825 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -345,11 +345,8 @@ class KubernetesSuite extends SparkFunSuite
                 }
                 // Delete the pod to simulate cluster scale down/migration.
                 // This will allow the pod to remain up for the grace period
-                // We set an intentionally long grace period to test that Spark
-                // exits once the blocks are done migrating and doesn't wait 
for the
-                // entire grace period if it does not need to.
                 kubernetesTestComponents.kubernetesClient.pods()
-                  .withName(name).withGracePeriod(Int.MaxValue).delete()
+                  .withName(name).delete()
                 logDebug(s"Triggered pod decom/delete: $name deleted")
                 // Make sure this pod is deleted
                 Eventually.eventually(TIMEOUT, INTERVAL) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to