kevin85421 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r963377849
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
private[spark] var scheduler: TaskScheduler = null
- // executor ID -> timestamp of when the last heartbeat from this executor
was received
+ /**
+ * [SPARK-39984]
+ * Please make sure the intersection between `executorLastSeen` and
`executorExpiryCandidates` is
+ * an empty set. If the intersection is not empty, it is possible to never
kill the executor until
+ * the executor recovers. When an executor is in both `executorLastSeen` and
+ * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in
`executorExpiryCandidates`
+ * may update if the worker sends heartbeats to master normally.
+ *
+ * `executorLastSeen`:
+ * - key: executor ID
+ * - value: timestamp of when the last heartbeat from this executor was
received
+ *
+ * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat
+ * - key: executor ID
+ * - value: timestamp of when the last heartbeat from the worker was
received
+ *
+ * when driver does not receive any heartbeat from an executor for
`executorTimeoutMs` seconds,
+ * the driver will ask master for the last heartbeat from the worker which
the executor is running
+ * on.
+ */
private val executorLastSeen = new HashMap[String, Long]
+ private val executorExpiryCandidates = new HashMap[String, Long]
private val executorTimeoutMs = sc.conf.get(
config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
-
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
+ ).getOrElse(
+ sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match {
+ case Some(executorTimeout) => executorTimeout
+ case None =>
Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")
+ }
+ )
private val checkTimeoutIntervalMs =
sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
private val executorHeartbeatIntervalMs =
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
+ /**
+ * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend.
+ *
+ * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout.
+ * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates.
+ */
+ private val checkWorkerLastHeartbeat = {
Review Comment:
Updated
https://github.com/apache/spark/pull/37411/commits/92629e30410d7ae9741457240c3f1a789f6b042b.
(1) Declare the variable `checkWorkerLastHeartbeat` as a lazy variable
(2) Set an initial delay for `ExpireDeadHosts`
(3) If the scheduler backend has not initialized when
`checkWorkerLastHeartbeat` is evaluated, an `UnsupportedOperationException`
will be thrown.
```scala
private lazy val checkWorkerLastHeartbeat = sc.schedulerBackend match {
case _: CoarseGrainedSchedulerBackend =>
sc.conf.get(config.HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT) &&
sc.schedulerBackend.isInstanceOf[StandaloneSchedulerBackend]
case _: LocalSchedulerBackend => false
case other => throw new UnsupportedOperationException(
s"Unknown scheduler backend: ${other.getClass}")
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]