jaceklaskowski commented on a change in pull request #30675:
URL: https://github.com/apache/spark/pull/30675#discussion_r553988213
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
##########
@@ -86,7 +86,7 @@ private[spark] class ExecutorPodsAllocator(
private val hasPendingPods = new AtomicBoolean()
- private var lastSnapshot = ExecutorPodsSnapshot(Nil)
+ private var lastSnapshot = ExecutorPodsSnapshot(Nil, 0)
Review comment:
I didn't like this cryptic `(Nil)` before, and `(Nil, 0)` made it even
more cryptic. Could you use named arguments or simply add a no-arg constructor
(perhaps with default values)?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -629,6 +629,10 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.keySet.toSeq
}
+ def getExecutorsWithRegistrationTs(): Map[String, Long] = synchronized {
+ executorDataMap.map(kv => (kv._1, kv._2.registrationTs)).toMap
Review comment:
nit: `executorDataMap.mapValues(_.registrationTs).toMap`?
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
##########
@@ -29,25 +29,27 @@ import org.apache.spark.internal.Logging
/**
* An immutable view of the current executor pods that are running in the
cluster.
*/
-private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long,
ExecutorPodState]) {
+private[spark] case class ExecutorPodsSnapshot(
+ executorPods: Map[Long, ExecutorPodState],
+ fullSnapshotTs: Long) {
import ExecutorPodsSnapshot._
def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
- new ExecutorPodsSnapshot(newExecutorPods)
+ new ExecutorPodsSnapshot(newExecutorPods, fullSnapshotTs)
}
}
object ExecutorPodsSnapshot extends Logging {
private var shouldCheckAllContainers: Boolean = _
private var sparkContainerName: String = _
- def apply(executorPods: Seq[Pod]): ExecutorPodsSnapshot = {
- ExecutorPodsSnapshot(toStatesByExecutorId(executorPods))
+ def apply(executorPods: Seq[Pod], fullSnapshotTs: Long):
ExecutorPodsSnapshot = {
+ ExecutorPodsSnapshot(toStatesByExecutorId(executorPods), fullSnapshotTs)
}
- def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long,
ExecutorPodState])
+ def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long,
ExecutorPodState], 0)
Review comment:
Ah, so there is a no-argument constructor already (!) 👍
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
##########
@@ -109,33 +112,33 @@ private[spark] class ExecutorPodsLifecycleManager(
// Reconcile the case where Spark claims to know about an executor but the
corresponding pod
// is missing from the cluster. This would occur if we miss a deletion
event and the pod
// transitions immediately from running to absent. We only need to check
against the latest
- // snapshot for this, and we don't do this for executors in the deleted
executors cache or
- // that we just removed in this round.
- val lostExecutors = if (snapshots.nonEmpty) {
- schedulerBackend.getExecutorIds().map(_.toLong).toSet --
+ // fresh full snapshot (coming from ExecutorPodsPollingSnapshotSource) for
this, and we don't
+ // do this for executors in the deleted executors cache or that we just
removed in this round.
+ if (snapshots.nonEmpty && lastFullSnapshotTs !=
snapshots.last.fullSnapshotTs) {
+ lastFullSnapshotTs = snapshots.last.fullSnapshotTs
+ val lostExecutorsWithRegistrationTs =
+ schedulerBackend.getExecutorsWithRegistrationTs().map(t =>
(t._1.toLong, t._2)) --
Review comment:
Why don't we convert executor IDs to longs in
`getExecutorsWithRegistrationTs` then?
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
##########
@@ -469,6 +469,17 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(false)
+ val KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA =
+ ConfigBuilder("spark.kubernetes.executor.missingPodDetectDelta")
+ .doc("When a registered executor's POD is missing from the Kubernetes
API server's polled " +
+ "list of PODs then this delta time is taken as the accepted time
difference between the " +
+ "registration time and the time of the polling. After this time the
POD is considered " +
+ "missing from the cluster and the executor will be removed.")
+ .version("3.1.0")
Review comment:
nit: At this time we know there'll be no 3.1.0 so should it be changed
to 3.1.1?
##########
File path:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
##########
@@ -469,6 +469,17 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(false)
+ val KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA =
+ ConfigBuilder("spark.kubernetes.executor.missingPodDetectDelta")
+ .doc("When a registered executor's POD is missing from the Kubernetes
API server's polled " +
+ "list of PODs then this delta time is taken as the accepted time
difference between the " +
+ "registration time and the time of the polling. After this time the
POD is considered " +
+ "missing from the cluster and the executor will be removed.")
+ .version("3.1.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .checkValue(delay => delay > 0, s"delay must be a positive time value")
Review comment:
nit: remove string interpolation (`s"..."`)
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -239,7 +239,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
}
val data = new ExecutorData(executorRef, executorAddress, hostname,
0, cores, logUrlHandler.applyPattern(logUrls, attributes),
attributes,
- resourcesInfo, resourceProfileId)
+ resourcesInfo, resourceProfileId, System.currentTimeMillis())
Review comment:
nit: Could you add `registrationTs =` to make this `currentTimeMillis`
clearer?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]