This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 505a8a04ad6 [SPARK-40428][CORE] Fix shutdown hook in the CoarseGrainedSchedulerBackend 505a8a04ad6 is described below commit 505a8a04ad64a6732bf9fec03c28bfbd514d109d Author: Holden Karau <hol...@pigscanfly.ca> AuthorDate: Tue Oct 4 17:26:34 2022 -0700 [SPARK-40428][CORE] Fix shutdown hook in the CoarseGrainedSchedulerBackend ### What changes were proposed in this pull request? Fix the shutdown hook call through to CoarseGrainedSchedulerBackend ### Why are the changes needed? Sometimes if the driver shuts down abnormally resources may be left dangling. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #37885 from holdenk/shutdownhook-for-k8s. Lead-authored-by: Holden Karau <hol...@pigscanfly.ca> Co-authored-by: Holden Karau <hka...@netflix.com> Signed-off-by: Holden Karau <hka...@netflix.com> --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 +++- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 16 ++++++++++++---- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 16 ++++++++++++---- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f101dc8e083..e2c6a912bc2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2119,7 +2119,9 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _plugins.foreach(_.shutdown()) } - FallbackStorage.cleanUp(_conf, _hadoopConfiguration) + Utils.tryLogNonFatalError { + FallbackStorage.cleanUp(_conf, _hadoopConfiguration) + } Utils.tryLogNonFatalError { _eventLogger.foreach(_.stop()) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ab05409aebb..c5529851382 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2890,10 +2890,18 @@ private[spark] class DAGScheduler( } def stop(): Unit = { - messageScheduler.shutdownNow() - shuffleMergeFinalizeScheduler.shutdownNow() - eventProcessLoop.stop() - taskScheduler.stop() + Utils.tryLogNonFatalError { + messageScheduler.shutdownNow() + } + Utils.tryLogNonFatalError { + shuffleMergeFinalizeScheduler.shutdownNow() + } + Utils.tryLogNonFatalError { + eventProcessLoop.stop() + } + Utils.tryLogNonFatalError { + taskScheduler.stop() + } } eventProcessLoop.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index a6735f380f1..5004262a71c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -972,15 +972,23 @@ private[spark] class TaskSchedulerImpl( } override def stop(): Unit = { - speculationScheduler.shutdown() + Utils.tryLogNonFatalError { + speculationScheduler.shutdown() + } if (backend != null) { - backend.stop() + Utils.tryLogNonFatalError { + backend.stop() + } } if (taskResultGetter != null) { - taskResultGetter.stop() + Utils.tryLogNonFatalError { + taskResultGetter.stop() + } } if (barrierCoordinator != null) { - barrierCoordinator.stop() + Utils.tryLogNonFatalError { + barrierCoordinator.stop() + } } starvationTimer.cancel() abortTimer.cancel() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org