This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c8628c9 Revert "[SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow decommissioning for excludes" c8628c9 is described below commit c8628c943cd12bbad7561bdc297cea9ff23becc7 Author: HyukjinKwon <gurwls...@apache.org> AuthorDate: Wed Feb 10 08:00:03 2021 +0900 Revert "[SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow decommissioning for excludes" This reverts commit 50641d2e3d659f51432aa2c0e6b9af76d71a5796. --- .../apache/spark/ExecutorAllocationClient.scala | 6 --- .../org/apache/spark/internal/config/package.scala | 19 +------- .../org/apache/spark/scheduler/HealthTracker.scala | 35 +++----------- .../cluster/CoarseGrainedClusterMessage.scala | 3 -- .../cluster/CoarseGrainedSchedulerBackend.scala | 56 ++-------------------- .../spark/scheduler/HealthTrackerSuite.scala | 45 ----------------- .../k8s/integrationtest/DecommissionSuite.scala | 32 ------------- .../k8s/integrationtest/KubernetesSuite.scala | 5 +- 8 files changed, 14 insertions(+), 187 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 5b587d7..cdba1c4 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -129,12 +129,6 @@ private[spark] trait ExecutorAllocationClient { decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId) } - /** - * Request that the cluster manager decommission every executor on the specified host. - * - * @return whether the request is acknowledged by the cluster manager. - */ - def decommissionExecutorsOnHost(host: String): Boolean /** * Request that the cluster manager kill every executor on the specified host. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3101bb6..7aeb51d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -827,13 +827,6 @@ package object config { .booleanConf .createWithDefault(false) - private[spark] val EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED = - ConfigBuilder("spark.excludeOnFailure.killExcludedExecutors.decommission") - .doc("Attempt decommission of excluded nodes instead of going directly to kill") - .version("3.2.0") - .booleanConf - .createWithDefault(false) - private[spark] val EXCLUDE_ON_FAILURE_LEGACY_TIMEOUT_CONF = ConfigBuilder("spark.scheduler.executorTaskExcludeOnFailureTime") .internal() @@ -1965,8 +1958,7 @@ package object config { private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL = ConfigBuilder("spark.executor.decommission.killInterval") - .doc("Duration after which a decommissioned executor will be killed forcefully " + - "*by an outside* (e.g. non-spark) service. " + + .doc("Duration after which a decommissioned executor will be killed forcefully." + "This config is useful for cloud environments where we know in advance when " + "an executor is going to go down after decommissioning signal i.e. around 2 mins " + "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " + @@ -1975,15 +1967,6 @@ package object config { .timeConf(TimeUnit.SECONDS) .createOptional - private[spark] val EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT = - ConfigBuilder("spark.executor.decommission.forceKillTimeout") - .doc("Duration after which a Spark will force a decommissioning executor to exit." + - " this should be set to a high value in most situations as low values will prevent " + - " block migrations from having enough time to complete.") - .version("3.2.0") - .timeConf(TimeUnit.SECONDS) - .createOptional - private[spark] val EXECUTOR_DECOMMISSION_SIGNAL = ConfigBuilder("spark.executor.decommission.signal") .doc("The signal that used to trigger the executor to start decommission.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala index 6bd5668..c6b8dca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala @@ -40,7 +40,6 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} * stage, but still many failures over the entire application * * "flaky" executors -- they don't fail every task, but are still faulty enough to merit * excluding - * * missing shuffle files -- may trigger fetch failures on healthy executors. * * See the design doc on SPARK-8425 for a more in-depth discussion. Note SPARK-32037 renamed * the feature. @@ -65,8 +64,6 @@ private[scheduler] class HealthTracker ( val EXCLUDE_ON_FAILURE_TIMEOUT_MILLIS = HealthTracker.getExludeOnFailureTimeout(conf) private val EXCLUDE_FETCH_FAILURE_ENABLED = conf.get(config.EXCLUDE_ON_FAILURE_FETCH_FAILURE_ENABLED) - private val EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED = - conf.get(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) /** * A map from executorId to information on task failures. Tracks the time of each task failure, @@ -157,21 +154,11 @@ private[scheduler] class HealthTracker ( } private def killExecutor(exec: String, msg: String): Unit = { - val fullMsg = if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) { - s"${msg} (actually decommissioning)" - } else { - msg - } allocationClient match { case Some(a) => - logInfo(fullMsg) - if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) { - a.decommissionExecutor(exec, ExecutorDecommissionInfo(fullMsg), - adjustTargetNumExecutors = false) - } else { - a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, - force = true) - } + logInfo(msg) + a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, + force = true) case None => logInfo(s"Not attempting to kill excluded executor id $exec " + s"since allocation client is not defined.") @@ -195,18 +182,10 @@ private[scheduler] class HealthTracker ( if (conf.get(config.EXCLUDE_ON_FAILURE_KILL_ENABLED)) { allocationClient match { case Some(a) => - if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) { - logInfo(s"Decommissioning all executors on excluded host $node " + - s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.") - if (!a.decommissionExecutorsOnHost(node)) { - logError(s"Decommissioning executors on $node failed.") - } - } else { - logInfo(s"Killing all executors on excluded host $node " + - s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.") - if (!a.killExecutorsOnHost(node)) { - logError(s"Killing executors on node $node failed.") - } + logInfo(s"Killing all executors on excluded host $node " + + s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.") + if (a.killExecutorsOnHost(node) == false) { + logError(s"Killing executors on node $node failed.") } case None => logWarning(s"Not attempting to kill executors on excluded host $node " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index a6f52f9..2f17143 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -49,9 +49,6 @@ private[spark] object CoarseGrainedClusterMessages { case class KillExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage - case class DecommissionExecutorsOnHost(host: String) - extends CoarseGrainedClusterMessage - case class UpdateDelegationTokens(tokens: Array[Byte]) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b44f677..ccb5eb1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster -import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import javax.annotation.concurrent.GuardedBy @@ -115,11 +115,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") - private val cleanupService: Option[ScheduledExecutorService] = - conf.get(EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT).map { _ => - ThreadUtils.newDaemonSingleThreadScheduledExecutor("cleanup-decommission-execs") - } - class DriverEndpoint extends IsolatedRpcEndpoint with Logging { override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv @@ -181,20 +176,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } case KillExecutorsOnHost(host) => - scheduler.getExecutorsAliveOnHost(host).foreach { execs => - killExecutors(execs.toSeq, adjustTargetNumExecutors = false, countFailures = false, + scheduler.getExecutorsAliveOnHost(host).foreach { exec => + killExecutors(exec.toSeq, adjustTargetNumExecutors = false, countFailures = false, force = true) } - case DecommissionExecutorsOnHost(host) => - val reason = ExecutorDecommissionInfo(s"Decommissioning all executors on $host.") - scheduler.getExecutorsAliveOnHost(host).foreach { execs => - val execsWithReasons = execs.map(exec => (exec, reason)).toArray - - decommissionExecutors(execsWithReasons, adjustTargetNumExecutors = false, - triggeredByExecutor = false) - } - case UpdateDelegationTokens(newDelegationTokens) => updateDelegationTokens(newDelegationTokens) @@ -520,21 +506,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } - conf.get(EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT).map { cleanupInterval => - val cleanupTask = new Runnable() { - override def run(): Unit = Utils.tryLogNonFatalError { - val stragglers = CoarseGrainedSchedulerBackend.this.synchronized { - executorsToDecommission.filter(executorsPendingDecommission.contains) - } - if (stragglers.nonEmpty) { - logInfo(s"${stragglers.toList} failed to decommission in ${cleanupInterval}, killing.") - killExecutors(stragglers, false, false, true) - } - } - } - cleanupService.map(_.schedule(cleanupTask, cleanupInterval, TimeUnit.SECONDS)) - } - executorsToDecommission } @@ -577,7 +548,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def stop(): Unit = { reviveThread.shutdownNow() - cleanupService.foreach(_.shutdownNow()) stopExecutors() delegationTokenManager.foreach(_.stop()) try { @@ -881,29 +851,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Future.successful(false) /** - * Request that the cluster manager decommissions all executors on a given host. - * @return whether the decommission request is acknowledged. - */ - final override def decommissionExecutorsOnHost(host: String): Boolean = { - logInfo(s"Requesting to kill any and all executors on host $host") - // A potential race exists if a new executor attempts to register on a host - // that is on the exclude list and is no longer valid. To avoid this race, - // all executor registration and decommissioning happens in the event loop. This way, either - // an executor will fail to register, or will be decommed when all executors on a host - // are decommed. - // Decommission all the executors on this host in an event loop to ensure serialization. - driverEndpoint.send(DecommissionExecutorsOnHost(host)) - true - } - - /** * Request that the cluster manager kill all executors on a given host. * @return whether the kill request is acknowledged. */ final override def killExecutorsOnHost(host: String): Boolean = { - logInfo(s"Requesting to kill any and all executors on host $host") + logInfo(s"Requesting to kill any and all executors on host ${host}") // A potential race exists if a new executor attempts to register on a host - // that is on the exclude list and is no longer valid. To avoid this race, + // that is on the exclude list and is no no longer valid. To avoid this race, // all executor registration and killing happens in the event loop. This way, either // an executor will fail to register, or will be killed when all executors on a host // are killed. diff --git a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala index 5710be1..7ecc1f5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala @@ -554,51 +554,6 @@ class HealthTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with Mock verify(allocationClientMock).killExecutorsOnHost("hostA") } - test("excluding decommission and kills executors when enabled") { - val allocationClientMock = mock[ExecutorAllocationClient] - - // verify we decommission when configured - conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true) - conf.set(config.DECOMMISSION_ENABLED.key, "true") - conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true") - conf.set(config.MAX_FAILURES_PER_EXEC.key, "1") - conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2") - healthTracker = new HealthTracker(listenerBusMock, conf, Some(allocationClientMock), clock) - - // Fail 4 tasks in one task set on executor 1, so that executor gets excluded for the whole - // application. - val taskSetExclude2 = createTaskSetExcludelist(stageId = 0) - (0 until 4).foreach { partition => - taskSetExclude2.updateExcludedForFailedTask( - "hostA", exec = "1", index = partition, failureReason = "testing") - } - healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, taskSetExclude2.execToFailures) - - val msg1 = - "Killing excluded executor id 1 since spark.excludeOnFailure.killExcludedExecutors is set." + - " (actually decommissioning)" - - verify(allocationClientMock).decommissionExecutor( - "1", ExecutorDecommissionInfo(msg1), false) - - val taskSetExclude3 = createTaskSetExcludelist(stageId = 1) - // Fail 4 tasks in one task set on executor 2, so that executor gets excluded for the whole - // application. Since that's the second executor that is excluded on the same node, we also - // exclude that node. - (0 until 4).foreach { partition => - taskSetExclude3.updateExcludedForFailedTask( - "hostA", exec = "2", index = partition, failureReason = "testing") - } - healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, taskSetExclude3.execToFailures) - - val msg2 = - "Killing excluded executor id 2 since spark.excludeOnFailure.killExcludedExecutors is set." + - " (actually decommissioning)" - verify(allocationClientMock).decommissionExecutor( - "2", ExecutorDecommissionInfo(msg2), false, false) - verify(allocationClientMock).decommissionExecutorsOnHost("hostA") - } - test("fetch failure excluding kills executors, configured by EXCLUDE_ON_FAILURE_KILL_ENABLED") { val allocationClientMock = mock[ExecutorAllocationClient] when(allocationClientMock.killExecutors(any(), any(), any(), any())).thenReturn(Seq("called")) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala index 56a23ab..92f6a32 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -116,38 +116,6 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => executorPatience = None, decommissioningTest = false) } - - test("Test decommissioning timeouts", k8sTestTag) { - sparkAppConf - .set(config.DECOMMISSION_ENABLED.key, "true") - .set("spark.kubernetes.container.image", pyImage) - .set(config.STORAGE_DECOMMISSION_ENABLED.key, "true") - .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key, "true") - .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key, "true") - // Ensure we have somewhere to migrate our data too - .set("spark.executor.instances", "3") - // Set super high so the timeout is triggered - .set("spark.storage.decommission.replicationReattemptInterval", "8640000") - // Set super low so the timeout is triggered - .set(config.EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL.key, "10") - - runSparkApplicationAndVerifyCompletion( - appResource = PYSPARK_DECOMISSIONING, - mainClass = "", - expectedDriverLogOnCompletion = Seq( - "Finished waiting, stopping Spark", - "Decommission executors", - "failed to decommission in 10, killing", - "killed by driver."), - appArgs = Array.empty[String], - driverPodChecker = doBasicDriverPyPodCheck, - executorPodChecker = doBasicExecutorPyPodCheck, - appLocator = appLocator, - isJVM = false, - pyFiles = None, - executorPatience = None, - decommissioningTest = true) - } } private[spark] object DecommissionSuite { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 9f1bcf7..494c825 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -345,11 +345,8 @@ class KubernetesSuite extends SparkFunSuite } // Delete the pod to simulate cluster scale down/migration. // This will allow the pod to remain up for the grace period - // We set an intentionally long grace period to test that Spark - // exits once the blocks are done migrating and doesn't wait for the - // entire grace period if it does not need to. kubernetesTestComponents.kubernetesClient.pods() - .withName(name).withGracePeriod(Int.MaxValue).delete() + .withName(name).delete() logDebug(s"Triggered pod decom/delete: $name deleted") // Make sure this pod is deleted Eventually.eventually(TIMEOUT, INTERVAL) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org