holdenk commented on a change in pull request #30675:
URL: https://github.com/apache/spark/pull/30675#discussion_r541150974



##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
##########
@@ -109,33 +112,33 @@ private[spark] class ExecutorPodsLifecycleManager(
     // Reconcile the case where Spark claims to know about an executor but the 
corresponding pod
     // is missing from the cluster. This would occur if we miss a deletion 
event and the pod
     // transitions immediately from running to absent. We only need to check 
against the latest
-    // snapshot for this, and we don't do this for executors in the deleted 
executors cache or
-    // that we just removed in this round.
-    val lostExecutors = if (snapshots.nonEmpty) {
-      schedulerBackend.getExecutorIds().map(_.toLong).toSet --
+    // fresh full snapshot (coming from ExecutorPodsPollingSnapshotSource) for 
this, and we don't
+    // do this for executors in the deleted executors cache or that we just 
removed in this round.

Review comment:
       For "we just removed in this round" it seems like we do? Provided that 
the executor was registered a long time ago. Unless I'm misunderstanding  
`registrationTs`. Is `registrationTs` the time when the executor was initial 
registered or is it updated? (I'm assuming initial since it's a val but maybe 
we make a new object each snapshot)?

##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
##########
@@ -433,6 +433,16 @@ private[spark] object Config extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA =
+    ConfigBuilder("spark.kubernetes.executor.missingPodDetectDelta")
+      .doc("When a registered executor's POD is missing from the Kubernetes 
API server's polled " +
+        "list of PODs then this delta time is taken as the accepted time 
difference between the " +
+        "registration time and the time of the polling. After this time the 
POD is considered " +
+        "missing from the cluster and the executor will be removed.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .checkValue(delay => delay > 0, s"delay must be a positive time value")
+      .createWithDefaultString("30s")

Review comment:
       I think you're right, having this be configurable makes sense. In my own 
work I've noticed different behavior between even different versions of 
minikube.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to