mridulm commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r940866631
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -199,41 +222,131 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
removeExecutor(executorRemoved.executorId)
}
+ private def killExecutor(executorId: String, timeout: Long): Unit = {
+ logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+ s"${timeout} ms exceeds timeout $executorTimeoutMs ms")
+ killExecutorThread.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ // Note: we want to get an executor back after expiring this one,
+ // so do not simply call `sc.killExecutor` here (SPARK-8119)
+ sc.killAndReplaceExecutor(executorId)
+ // SPARK-27348: in case of the executors which are not gracefully shut
down,
+ // we should remove lost executors from CoarseGrainedSchedulerBackend
manually
+ // here to guarantee two things:
+ // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
+ // a lost executor instead of waiting for disconnect message
+ // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
+ // those executors to avoid app hang
+ sc.schedulerBackend match {
+ case backend: CoarseGrainedSchedulerBackend =>
+ backend.driverEndpoint.send(RemoveExecutor(executorId,
+ ExecutorProcessLost(
+ s"Executor heartbeat timed out after ${timeout} ms",
+ causedByApp =
!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT))))
+
+ // LocalSchedulerBackend is used locally and only has one single
executor
+ case _: LocalSchedulerBackend =>
+
+ case other => throw new UnsupportedOperationException(
+ s"Unknown scheduler backend: ${other.getClass}")
+ }
+ }
+ })
+ }
+
private def expireDeadHosts(): Unit = {
+ /**
+ * [SC-105641]
+ * Originally, the driver’s HeartbeatReceiver will expire an executor if it
does not receive any
+ * heartbeat from the executor for 120 seconds. However, 120 seconds is too
long, but we will face
+ * other challenges when we try to lower the timeout threshold. To
elaborate, when an executor is
+ * performing full GC, it cannot send/reply any message. Next paragraphs
describe the solution to
+ * detect network disconnection between driver and executor in a short time.
+ *
+ * An executor is running on a worker but in different JVMs, and a driver is
running on a master
+ * but in different JVMs. Hence, the network connection between
driver/executor and master/worker
+ * is the same. Because executor and worker are running on different JVMs,
worker can still send
+ * heartbeat to master when executor performs GC.
+ *
+ * For new Heartbeat Receiver, if driver does not receive any heartbeat from
the executor for
+ * `executorTimeoutMs` (default: 60s) seconds, HeartbeatReceiver will send a
request to master to
+ * ask for the latest heartbeat from the worker which the executor runs on
`workerLastHeartbeat`.
+ * HeartbeatReceiver can determine whether the heartbeat loss is caused by
network issues or other
+ * issues (e.g. GC). If the heartbeat loss is not caused by network issues,
the HeartbeatReceiver
+ * will put the executor into a waitingList rather than expiring it
immediately.
+ *
+ * [Note]: Definition of `network issues`
+ * Here, the definition `network issues` is the issues that related to
network directly. If the
+ * network is connected, the issues do not included in `network issues`. For
example, an
+ * executor's JVM is closed by a problematic task, so the JVM will notify
driver that the socket
+ * is closed. If the network is connected, driver will receive the
notification and trigger the
+ * function `onDisconnected`. This issue is not a `network issue` because
the network is
+ * connected.
+ *
+ * [Warning 1]
+ * Worker will send heartbeats to Master every (conf.get(WORKER_TIMEOUT) *
1000 / 4) milliseconds.
+ * Check deploy/worker/Worker.scala for more details. This new mechanism
design is based on the
+ * assumption: (executorTimeoutMs / 2) > (conf.get(WORKER_TIMEOUT) * 1000 /
4).
+ *
+ * [Warning 2]
+ * Not every deployment method schedules driver on master.
+ */
logTrace("Checking for hosts with no recent heartbeats in
HeartbeatReceiver.")
val now = clock.getTimeMillis()
- for ((executorId, lastSeenMs) <- executorLastSeen) {
- if (now - lastSeenMs > executorTimeoutMs) {
- logWarning(s"Removing executor $executorId with no recent heartbeats:
" +
- s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
- // Asynchronously kill the executor to avoid blocking the current
thread
- killExecutorThread.submit(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- // Note: we want to get an executor back after expiring this one,
- // so do not simply call `sc.killExecutor` here (SPARK-8119)
- sc.killAndReplaceExecutor(executorId)
- // SPARK-27348: in case of the executors which are not gracefully
shut down,
- // we should remove lost executors from
CoarseGrainedSchedulerBackend manually
- // here to guarantee two things:
- // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
- // a lost executor instead of waiting for disconnect message
- // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
- // those executors to avoid app hang
- sc.schedulerBackend match {
- case backend: CoarseGrainedSchedulerBackend =>
- backend.driverEndpoint.send(RemoveExecutor(executorId,
- ExecutorProcessLost(
- s"Executor heartbeat timed out after ${now - lastSeenMs}
ms")))
-
- // LocalSchedulerBackend is used locally and only has one single
executor
- case _: LocalSchedulerBackend =>
-
- case other => throw new UnsupportedOperationException(
- s"Unknown scheduler backend: ${other.getClass}")
- }
+ if (!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT)) {
+ for ((executorId, lastSeenMs) <- executorLastSeen) {
+ if (now - lastSeenMs > executorTimeoutMs) {
+ killExecutor(executorId, now - lastSeenMs)
+ waitingList.remove(executorId)
+ executorLastSeen.remove(executorId)
+ }
+ }
+ } else {
+ for ((executorId, workerLastHeartbeat) <- waitingList) {
+ if (now - workerLastHeartbeat > executorTimeoutMs / 2) {
+ killExecutor(executorId, now - workerLastHeartbeat)
+ waitingList.remove(executorId)
+ executorLastSeen.remove(executorId)
+ }
+ }
+
+ val buf = new ArrayBuffer[String]()
+ for ((executorId, lastSeenMs) <- executorLastSeen) {
+ if (now - lastSeenMs > executorTimeoutMs) {
+ sc.schedulerBackend match {
+ case _: StandaloneSchedulerBackend =>
+ buf += executorId
+ case _ =>
+ killExecutor(executorId, now - lastSeenMs)
+ waitingList.remove(executorId)
+ executorLastSeen.remove(executorId)
+ }
+ }
+ }
+
+ sc.schedulerBackend match {
+ case backend: StandaloneSchedulerBackend =>
+ backend.client.workerLastHeartbeat(sc.applicationId, buf) match {
+ case Some(workerLastHeartbeats) =>
+ for ((executorId, workerLastHeartbeat) <- buf zip
workerLastHeartbeats) {
+ if (now - workerLastHeartbeat > executorTimeoutMs / 2) {
+ val lastSeenMs = executorLastSeen.get(executorId).get
+ killExecutor(executorId, now - lastSeenMs)
+ waitingList.remove(executorId)
+ } else {
+ waitingList(executorId) = workerLastHeartbeat
+ }
+ executorLastSeen.remove(executorId)
+ }
+ case None =>
+ for (executorId <- buf) {
+ val lastSeenMs = executorLastSeen.get(executorId).get
+ killExecutor(executorId, now - lastSeenMs)
+ executorLastSeen.remove(executorId)
+ waitingList.remove(executorId)
+ }
Review Comment:
This (and other cases in this class) would get impacted due to lack of
scheduler backend when the heatbeat receiver is created - please do revisit
them after fixing that issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]