mridulm commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r942841928
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -77,17 +77,55 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
private[spark] var scheduler: TaskScheduler = null
- // executor ID -> timestamp of when the last heartbeat from this executor
was received
+ /**
+ * [SPARK-39984]
+ * Please make sure the intersection between `executorLastSeen` and
`waitingList` is an empty set.
+ * If the intersection is not empty, it is possible to never kill the
executor until the executor
+ * recovers. When an executor is in both `executorLastSeen` and
`waitingList`, the value of
+ * `workerLastHeartbeat` in waitingList may update if the worker sends
heartbeats to master
+ * normally.
+ *
+ * `executorLastSeen`:
+ * - key: executor ID
+ * - value: timestamp of when the last heartbeat from this executor was
received
+ *
+ * `waitingList`: executor ID -> WorkerLastHeartbeat
+ * - key: executor ID
+ * - value: timestamp of when the last heartbeat from the worker was
received
+ *
+ * when driver does not receive any heartbeat from an executor for
`executorTimeoutMs` seconds,
+ * the driver will ask master for the last heartbeat from the worker which
the executor is running
+ * on.
+ */
private val executorLastSeen = new HashMap[String, Long]
+ private val waitingList = new HashMap[String, Long]
Review Comment:
Rename from `waitingList` (and related variables/methods in this class) ?
Something like `executorExpiryCandidates` or better.
##########
core/src/main/scala/org/apache/spark/internal/config/Network.scala:
##########
@@ -51,6 +51,21 @@ private[spark] object Network {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString)
+ private[spark] val NETWORK_EXECUTOR_TIMEOUT =
+ ConfigBuilder("spark.network.executorTimeout")
+ .version("3.4.0")
+ .timeConf(TimeUnit.SECONDS)
+ .createOptional
Review Comment:
QQ: Any particular reason for this to be `s` here and `ms` for
`HEARTBEAT_WAITINGLIST_TIMEOUT` ?
Keep both consistent as `ms` ?
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -199,41 +242,137 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
removeExecutor(executorRemoved.executorId)
}
+ private def killExecutor(executorId: String, timeout: Long): Unit = {
+ logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+ s"${timeout} ms exceeds timeout $executorTimeoutMs ms")
+ killExecutorThread.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ // Note: we want to get an executor back after expiring this one,
+ // so do not simply call `sc.killExecutor` here (SPARK-8119)
+ sc.killAndReplaceExecutor(executorId)
+ // SPARK-27348: in case of the executors which are not gracefully shut
down,
+ // we should remove lost executors from CoarseGrainedSchedulerBackend
manually
+ // here to guarantee two things:
+ // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
+ // a lost executor instead of waiting for disconnect message
+ // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
+ // those executors to avoid app hang
+ sc.schedulerBackend match {
+ case backend: CoarseGrainedSchedulerBackend =>
+ val isStandalone = backend.isInstanceOf[StandaloneSchedulerBackend]
+ backend.driverEndpoint.send(RemoveExecutor(executorId,
+ ExecutorProcessLost(
+ s"Executor heartbeat timed out after ${timeout} ms",
+ causedByApp = !checkWorkerLastHeartbeat || !isStandalone)))
Review Comment:
QQ: If the executor/jvm hangs (due to interaction with the application
code), we wont be able to detect that anymore, right ? Or is that handled in
any other way ?
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -130,8 +170,11 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId,
executorUpdates) =>
var reregisterBlockManager = !sc.isStopped
if (scheduler != null) {
- if (executorLastSeen.contains(executorId)) {
+ val isStandalone =
sc.schedulerBackend.isInstanceOf[StandaloneSchedulerBackend]
Review Comment:
nit: pull this out as a method or populate as a field when
`TaskSchedulerIsSet` is fired ?
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -77,17 +77,55 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
private[spark] var scheduler: TaskScheduler = null
- // executor ID -> timestamp of when the last heartbeat from this executor
was received
+ /**
+ * [SPARK-39984]
+ * Please make sure the intersection between `executorLastSeen` and
`waitingList` is an empty set.
+ * If the intersection is not empty, it is possible to never kill the
executor until the executor
+ * recovers. When an executor is in both `executorLastSeen` and
`waitingList`, the value of
+ * `workerLastHeartbeat` in waitingList may update if the worker sends
heartbeats to master
+ * normally.
+ *
+ * `executorLastSeen`:
+ * - key: executor ID
+ * - value: timestamp of when the last heartbeat from this executor was
received
+ *
+ * `waitingList`: executor ID -> WorkerLastHeartbeat
+ * - key: executor ID
+ * - value: timestamp of when the last heartbeat from the worker was
received
+ *
+ * when driver does not receive any heartbeat from an executor for
`executorTimeoutMs` seconds,
+ * the driver will ask master for the last heartbeat from the worker which
the executor is running
+ * on.
+ */
private val executorLastSeen = new HashMap[String, Long]
+ private val waitingList = new HashMap[String, Long]
private val executorTimeoutMs = sc.conf.get(
config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
-
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
+ ).getOrElse(
+ sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match {
+ case Some(executorTimeout) =>
Utils.timeStringAsMs(s"${executorTimeout}s")
+ case None =>
Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")
+ }
+ )
private val checkTimeoutIntervalMs =
sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
private val executorHeartbeatIntervalMs =
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
+ /**
+ * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend.
+ *
+ * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout.
+ * `waitingListTimeout`: The timeout used for waitingList.
+ */
+ private val checkWorkerLastHeartbeat =
sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT)
+ private val waitingListTimeout = checkWorkerLastHeartbeat match {
+ case true =>
+
sc.conf.get(Network.HEARTBEAT_WAITINGLIST_TIMEOUT).getOrElse(Utils.timeStringAsMs("30s"))
+ case false => Utils.timeStringAsMs("0s")
+ }
+
Review Comment:
Make the 30s default value in `HEARTBEAT_WAITINGLIST_TIMEOUT` config (use
`createWithDefaultString(30s)` there).
```suggestion
private val waitingListTimeout = if (checkWorkerLastHeartbeat)
sc.conf.get(Network.HEARTBEAT_WAITINGLIST_TIMEOUT) else 0
```
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -199,41 +242,137 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
removeExecutor(executorRemoved.executorId)
}
+ private def killExecutor(executorId: String, timeout: Long): Unit = {
+ logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+ s"${timeout} ms exceeds timeout $executorTimeoutMs ms")
+ killExecutorThread.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ // Note: we want to get an executor back after expiring this one,
+ // so do not simply call `sc.killExecutor` here (SPARK-8119)
+ sc.killAndReplaceExecutor(executorId)
+ // SPARK-27348: in case of the executors which are not gracefully shut
down,
+ // we should remove lost executors from CoarseGrainedSchedulerBackend
manually
+ // here to guarantee two things:
+ // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
+ // a lost executor instead of waiting for disconnect message
+ // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
+ // those executors to avoid app hang
+ sc.schedulerBackend match {
+ case backend: CoarseGrainedSchedulerBackend =>
+ val isStandalone = backend.isInstanceOf[StandaloneSchedulerBackend]
+ backend.driverEndpoint.send(RemoveExecutor(executorId,
+ ExecutorProcessLost(
+ s"Executor heartbeat timed out after ${timeout} ms",
+ causedByApp = !checkWorkerLastHeartbeat || !isStandalone)))
+
+ // LocalSchedulerBackend is used locally and only has one single
executor
+ case _: LocalSchedulerBackend =>
+
+ case other => throw new UnsupportedOperationException(
+ s"Unknown scheduler backend: ${other.getClass}")
+ }
+ }
+ })
+ }
+
+ private def removeExecutorFromWaitingList(executorId: String): Unit = {
+ val isStandalone =
sc.schedulerBackend.isInstanceOf[StandaloneSchedulerBackend]
+ if (checkWorkerLastHeartbeat && isStandalone) {
+ waitingList.remove(executorId)
+ }
+ }
+
private def expireDeadHosts(): Unit = {
+ /**
+ * [SPARK-39984]
+ * Originally, the driver’s HeartbeatReceiver will expire an executor if it
does not receive any
+ * heartbeat from the executor for 120 seconds. However, 120 seconds is too
long, but we will face
+ * other challenges when we try to lower the timeout threshold. To
elaborate, when an executor is
+ * performing full GC, it cannot send/reply any message. Next paragraphs
describe the solution to
+ * detect network disconnection between driver and executor in a short time.
+ *
+ * An executor is running on a worker but in different JVMs, and a driver is
running on a master
+ * but in different JVMs. Hence, the network connection between
driver/executor and master/worker
+ * is the same. Because executor and worker are running on different JVMs,
worker can still send
+ * heartbeat to master when executor performs GC.
+ *
+ * For new Heartbeat Receiver, if driver does not receive any heartbeat from
the executor for
+ * `executorTimeoutMs` (default: 60s) seconds, HeartbeatReceiver will send a
request to master to
+ * ask for the latest heartbeat from the worker which the executor runs on
`workerLastHeartbeat`.
+ * HeartbeatReceiver can determine whether the heartbeat loss is caused by
network issues or other
+ * issues (e.g. GC). If the heartbeat loss is not caused by network issues,
the HeartbeatReceiver
+ * will put the executor into a waitingList rather than expiring it
immediately.
+ *
+ * [Note]: Definition of `network issues`
+ * Here, the definition `network issues` is the issues that related to
network directly. If the
+ * network is connected, the issues do not included in `network issues`. For
example, an
+ * executor's JVM is closed by a problematic task, so the JVM will notify
driver that the socket
+ * is closed. If the network is connected, driver will receive the
notification and trigger the
+ * function `onDisconnected`. This issue is not a `network issue` because
the network is
+ * connected.
+ *
+ * [Warning 1]
+ * Worker will send heartbeats to Master every (conf.get(WORKER_TIMEOUT) *
1000 / 4) milliseconds.
+ * Check deploy/worker/Worker.scala for more details. This new mechanism
design is based on the
+ * assumption: `waitingListTimeout` > (conf.get(WORKER_TIMEOUT) * 1000 / 4).
+ *
+ * [Warning 2]
+ * Not every deployment method schedules driver on master.
+ */
Review Comment:
Let us make this comment more concise, and what the state would be after
this PR is merged.
Something like
```suggestion
/**
* [SPARK-39984]
* The driver’s HeartbeatReceiver will expire an executor if it does not
receive any
* heartbeat from the executor for 120 seconds. However, lowering from 120
seconds has
* other challenges. For example: when executor is performing full GC it
cannot send/reply any message.
*
* For standalone deployments:
* If driver does not receive any heartbeat from the executor for
* `executorTimeoutMs` (default: 60s) seconds, HeartbeatReceiver will send
a request to master to
* ask for the latest heartbeat from the worker which the executor runs on.
* HeartbeatReceiver can determine whether the heartbeat loss is caused by
network issues or other
* issues (e.g. GC). If the heartbeat loss is not caused by network
issues, the HeartbeatReceiver
* will put the executor into a waitingList rather than expiring it
immediately.
*/
```
Also, let us move this to the configuration of
`HEARTBEAT_WAITINGLIST_TIMEOUT`:
```
* assumption: `waitingListTimeout` > (conf.get(WORKER_TIMEOUT) * 1000 /
4).
```
If this is a strong requirement, add as warning when fetching the the config
above ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]