holdenk commented on a change in pull request #30675:
URL: https://github.com/apache/spark/pull/30675#discussion_r541150974
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
##########
@@ -109,33 +112,33 @@ private[spark] class ExecutorPodsLifecycleManager(
// Reconcile the case where Spark claims to know about an executor but the
corresponding pod
// is missing from the cluster. This would occur if we miss a deletion
event and the pod
// transitions immediately from running to absent. We only need to check
against the latest
- // snapshot for this, and we don't do this for executors in the deleted
executors cache or
- // that we just removed in this round.
- val lostExecutors = if (snapshots.nonEmpty) {
- schedulerBackend.getExecutorIds().map(_.toLong).toSet --
+ // fresh full snapshot (coming from ExecutorPodsPollingSnapshotSource) for
this, and we don't
+ // do this for executors in the deleted executors cache or that we just
removed in this round.
Review comment:
For "we just removed in this round" it seems like we do? Provided that
the executor was registered a long time ago. Unless I'm misunderstanding
`registrationTs`. Is `registrationTs` the time when the executor was initial
registered or is it updated? (I'm assuming initial since it's a val but maybe
we make a new object each snapshot)?
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
##########
@@ -433,6 +433,16 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(false)
+ val KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA =
+ ConfigBuilder("spark.kubernetes.executor.missingPodDetectDelta")
+ .doc("When a registered executor's POD is missing from the Kubernetes
API server's polled " +
+ "list of PODs then this delta time is taken as the accepted time
difference between the " +
+ "registration time and the time of the polling. After this time the
POD is considered " +
+ "missing from the cluster and the executor will be removed.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .checkValue(delay => delay > 0, s"delay must be a positive time value")
+ .createWithDefaultString("30s")
Review comment:
I think you're right, having this be configurable makes sense. In my own
work I've noticed different behavior between even different versions of
minikube.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]