tedyu commented on PR #38651:
URL: https://github.com/apache/spark/pull/38651#issuecomment-1336401312

   I think we can do more than setting default value for 
`KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD`.
   
   What do you think of the following change ?
   
   Thanks
   ```
   diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
   index 49ab1d3248..bac171336a 100644
   --- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
   +++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
   @@ -29,6 +29,7 @@ import scala.util.control.NonFatal
    import io.fabric8.kubernetes.api.model.Pod
   
    import org.apache.spark.{SparkConf, SparkContext}
   +import org.apache.spark.deploy.SparkHadoopUtil
    import 
org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD
    import org.apache.spark.internal.Logging
    import org.apache.spark.util.Clock
   @@ -98,7 +99,15 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(
   
      override def stop(): Unit = {
        pollingTasks.asScala.foreach(_.cancel(false))
   -    val awaitSeconds = 
conf.get(KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD)
   +    var awaitSeconds = 
conf.get(KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD)
   +    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
   +    if (hadoopConf.get("hadoop.service.shutdown.timeout") != null) {
   +        val hadoopTimeout = 
hadoopConf.get("hadoop.service.shutdown.timeout").toLong
   +        val GRACE = 8
   +        if (hadoopTimeout-GRACE <= awaitSeconds) {
   +            awaitSeconds = hadoopTimeout-GRACE
   +        }
   +    }
        ThreadUtils.shutdown(subscribersExecutor, FiniteDuration(awaitSeconds, 
TimeUnit.SECONDS))
      }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to