mridulm commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r938566024
##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2398,4 +2398,11 @@ package object config {
.version("3.3.0")
.intConf
.createWithDefault(5)
+
+ private[spark] val HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT =
+ ConfigBuilder("spark.driver.heartbeat.checkWorkerLastHeartbeat")
+ .internal()
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(true)
Review Comment:
Default this to `false`
##########
core/src/main/scala/org/apache/spark/internal/config/Network.scala:
##########
@@ -49,7 +49,13 @@ private[spark] object Network {
ConfigBuilder("spark.network.timeoutInterval")
.version("1.3.2")
.timeConf(TimeUnit.MILLISECONDS)
-
.createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString)
+ .createWithDefaultString("15s")
+
+ private[spark] val NETWORK_EXECUTOR_TIMEOUT =
+ ConfigBuilder("spark.network.executorTimeout")
+ .version("1.3.0")
Review Comment:
`1.3.0` -> `3.4.0`
##########
core/src/main/scala/org/apache/spark/internal/config/Network.scala:
##########
@@ -49,7 +49,13 @@ private[spark] object Network {
ConfigBuilder("spark.network.timeoutInterval")
.version("1.3.2")
.timeConf(TimeUnit.MILLISECONDS)
-
.createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString)
+ .createWithDefaultString("15s")
+
+ private[spark] val NETWORK_EXECUTOR_TIMEOUT =
+ ConfigBuilder("spark.network.executorTimeout")
+ .version("1.3.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("60s")
Review Comment:
fallback to `NETWORK_TIMEOUT` to preserve existing behavior.
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -199,41 +222,131 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
removeExecutor(executorRemoved.executorId)
}
+ private def killExecutor(executorId: String, timeout: Long): Unit = {
+ logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+ s"${timeout} ms exceeds timeout $executorTimeoutMs ms")
+ killExecutorThread.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ // Note: we want to get an executor back after expiring this one,
+ // so do not simply call `sc.killExecutor` here (SPARK-8119)
+ sc.killAndReplaceExecutor(executorId)
+ // SPARK-27348: in case of the executors which are not gracefully shut
down,
+ // we should remove lost executors from CoarseGrainedSchedulerBackend
manually
+ // here to guarantee two things:
+ // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
+ // a lost executor instead of waiting for disconnect message
+ // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
+ // those executors to avoid app hang
+ sc.schedulerBackend match {
+ case backend: CoarseGrainedSchedulerBackend =>
+ backend.driverEndpoint.send(RemoveExecutor(executorId,
+ ExecutorProcessLost(
+ s"Executor heartbeat timed out after ${timeout} ms",
+ causedByApp =
!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT))))
+
+ // LocalSchedulerBackend is used locally and only has one single
executor
+ case _: LocalSchedulerBackend =>
+
+ case other => throw new UnsupportedOperationException(
+ s"Unknown scheduler backend: ${other.getClass}")
+ }
+ }
+ })
+ }
+
private def expireDeadHosts(): Unit = {
+ /**
+ * [SC-105641]
+ * Originally, the driver’s HeartbeatReceiver will expire an executor if it
does not receive any
+ * heartbeat from the executor for 120 seconds. However, 120 seconds is too
long, but we will face
+ * other challenges when we try to lower the timeout threshold. To
elaborate, when an executor is
+ * performing full GC, it cannot send/reply any message. Next paragraphs
describe the solution to
+ * detect network disconnection between driver and executor in a short time.
+ *
+ * An executor is running on a worker but in different JVMs, and a driver is
running on a master
+ * but in different JVMs. Hence, the network connection between
driver/executor and master/worker
+ * is the same. Because executor and worker are running on different JVMs,
worker can still send
+ * heartbeat to master when executor performs GC.
+ *
+ * For new Heartbeat Receiver, if driver does not receive any heartbeat from
the executor for
+ * `executorTimeoutMs` (default: 60s) seconds, HeartbeatReceiver will send a
request to master to
+ * ask for the latest heartbeat from the worker which the executor runs on
`workerLastHeartbeat`.
+ * HeartbeatReceiver can determine whether the heartbeat loss is caused by
network issues or other
+ * issues (e.g. GC). If the heartbeat loss is not caused by network issues,
the HeartbeatReceiver
+ * will put the executor into a waitingList rather than expiring it
immediately.
+ *
+ * [Note]: Definition of `network issues`
+ * Here, the definition `network issues` is the issues that related to
network directly. If the
+ * network is connected, the issues do not included in `network issues`. For
example, an
+ * executor's JVM is closed by a problematic task, so the JVM will notify
driver that the socket
+ * is closed. If the network is connected, driver will receive the
notification and trigger the
+ * function `onDisconnected`. This issue is not a `network issue` because
the network is
+ * connected.
+ *
+ * [Warning 1]
+ * Worker will send heartbeats to Master every (conf.get(WORKER_TIMEOUT) *
1000 / 4) milliseconds.
+ * Check deploy/worker/Worker.scala for more details. This new mechanism
design is based on the
+ * assumption: (executorTimeoutMs / 2) > (conf.get(WORKER_TIMEOUT) * 1000 /
4).
+ *
+ * [Warning 2]
+ * Not every deployment method schedules driver on master.
+ */
logTrace("Checking for hosts with no recent heartbeats in
HeartbeatReceiver.")
val now = clock.getTimeMillis()
- for ((executorId, lastSeenMs) <- executorLastSeen) {
- if (now - lastSeenMs > executorTimeoutMs) {
- logWarning(s"Removing executor $executorId with no recent heartbeats:
" +
- s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
- // Asynchronously kill the executor to avoid blocking the current
thread
- killExecutorThread.submit(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- // Note: we want to get an executor back after expiring this one,
- // so do not simply call `sc.killExecutor` here (SPARK-8119)
- sc.killAndReplaceExecutor(executorId)
- // SPARK-27348: in case of the executors which are not gracefully
shut down,
- // we should remove lost executors from
CoarseGrainedSchedulerBackend manually
- // here to guarantee two things:
- // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
- // a lost executor instead of waiting for disconnect message
- // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
- // those executors to avoid app hang
- sc.schedulerBackend match {
- case backend: CoarseGrainedSchedulerBackend =>
- backend.driverEndpoint.send(RemoveExecutor(executorId,
- ExecutorProcessLost(
- s"Executor heartbeat timed out after ${now - lastSeenMs}
ms")))
-
- // LocalSchedulerBackend is used locally and only has one single
executor
- case _: LocalSchedulerBackend =>
-
- case other => throw new UnsupportedOperationException(
- s"Unknown scheduler backend: ${other.getClass}")
- }
+ if (!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT)) {
+ for ((executorId, lastSeenMs) <- executorLastSeen) {
+ if (now - lastSeenMs > executorTimeoutMs) {
+ killExecutor(executorId, now - lastSeenMs)
+ waitingList.remove(executorId)
+ executorLastSeen.remove(executorId)
+ }
+ }
+ } else {
+ for ((executorId, workerLastHeartbeat) <- waitingList) {
+ if (now - workerLastHeartbeat > executorTimeoutMs / 2) {
+ killExecutor(executorId, now - workerLastHeartbeat)
+ waitingList.remove(executorId)
+ executorLastSeen.remove(executorId)
+ }
+ }
+
+ val buf = new ArrayBuffer[String]()
+ for ((executorId, lastSeenMs) <- executorLastSeen) {
+ if (now - lastSeenMs > executorTimeoutMs) {
+ sc.schedulerBackend match {
+ case _: StandaloneSchedulerBackend =>
+ buf += executorId
+ case _ =>
+ killExecutor(executorId, now - lastSeenMs)
+ waitingList.remove(executorId)
+ executorLastSeen.remove(executorId)
+ }
+ }
+ }
+
+ sc.schedulerBackend match {
+ case backend: StandaloneSchedulerBackend =>
+ backend.client.workerLastHeartbeat(sc.applicationId, buf) match {
+ case Some(workerLastHeartbeats) =>
+ for ((executorId, workerLastHeartbeat) <- buf zip
workerLastHeartbeats) {
+ if (now - workerLastHeartbeat > executorTimeoutMs / 2) {
+ val lastSeenMs = executorLastSeen.get(executorId).get
+ killExecutor(executorId, now - lastSeenMs)
+ waitingList.remove(executorId)
+ } else {
+ waitingList(executorId) = workerLastHeartbeat
+ }
+ executorLastSeen.remove(executorId)
+ }
+ case None =>
+ for (executorId <- buf) {
+ val lastSeenMs = executorLastSeen.get(executorId).get
+ killExecutor(executorId, now - lastSeenMs)
+ executorLastSeen.remove(executorId)
+ waitingList.remove(executorId)
+ }
Review Comment:
Looks like the `if` condition should be
`!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT) &&
sc.schedulerBackend isInstanceOf StandaloneSchedulerBackend` ?
##########
core/src/main/scala/org/apache/spark/internal/config/Network.scala:
##########
@@ -49,7 +49,13 @@ private[spark] object Network {
ConfigBuilder("spark.network.timeoutInterval")
.version("1.3.2")
.timeConf(TimeUnit.MILLISECONDS)
-
.createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString)
+ .createWithDefaultString("15s")
Review Comment:
Revert ?
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -77,12 +77,32 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
private[spark] var scheduler: TaskScheduler = null
- // executor ID -> timestamp of when the last heartbeat from this executor
was received
+ /**
+ * [SC-105641]
+ * Please make sure the intersection between `executorLastSeen` and
`waitingList` is an empty set.
+ * If the intersection is not empty, it is possible to never kill the
executor until the executor
+ * recovers. When an executor is in both `executorLastSeen` and
`waitingList`, the value of
+ * `workerLastHeartbeat` in waitingList may update if the worker sends
heartbeats to master
+ * normally.
+ *
+ * `executorLastSeen`:
+ * - key: executor ID
+ * - value: timestamp of when the last heartbeat from this executor was
received
+ *
+ * `waitingList`: executor ID -> WorkerLastHeartbeat
+ * - key: executor ID
+ * - value: timestamp of when the last heartbeat from the worker was
received
+ *
+ * when driver does not receive any heartbeat from an executor for
`executorTimeoutMs` seconds,
+ * the driver will ask master for the last heartbeat from the worker which
the executor is running
+ * on.
+ */
private val executorLastSeen = new HashMap[String, Long]
+ private val waitingList = new HashMap[String, Long]
Review Comment:
We need `waitingList` to be updated/maintained only when
`HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT` is enabled.
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -199,41 +222,131 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
removeExecutor(executorRemoved.executorId)
}
+ private def killExecutor(executorId: String, timeout: Long): Unit = {
+ logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+ s"${timeout} ms exceeds timeout $executorTimeoutMs ms")
+ killExecutorThread.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ // Note: we want to get an executor back after expiring this one,
+ // so do not simply call `sc.killExecutor` here (SPARK-8119)
+ sc.killAndReplaceExecutor(executorId)
+ // SPARK-27348: in case of the executors which are not gracefully shut
down,
+ // we should remove lost executors from CoarseGrainedSchedulerBackend
manually
+ // here to guarantee two things:
+ // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
+ // a lost executor instead of waiting for disconnect message
+ // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
+ // those executors to avoid app hang
+ sc.schedulerBackend match {
+ case backend: CoarseGrainedSchedulerBackend =>
+ backend.driverEndpoint.send(RemoveExecutor(executorId,
+ ExecutorProcessLost(
+ s"Executor heartbeat timed out after ${timeout} ms",
+ causedByApp =
!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT))))
+
+ // LocalSchedulerBackend is used locally and only has one single
executor
+ case _: LocalSchedulerBackend =>
+
+ case other => throw new UnsupportedOperationException(
+ s"Unknown scheduler backend: ${other.getClass}")
+ }
+ }
+ })
+ }
+
private def expireDeadHosts(): Unit = {
+ /**
+ * [SC-105641]
+ * Originally, the driver’s HeartbeatReceiver will expire an executor if it
does not receive any
+ * heartbeat from the executor for 120 seconds. However, 120 seconds is too
long, but we will face
+ * other challenges when we try to lower the timeout threshold. To
elaborate, when an executor is
+ * performing full GC, it cannot send/reply any message. Next paragraphs
describe the solution to
+ * detect network disconnection between driver and executor in a short time.
+ *
+ * An executor is running on a worker but in different JVMs, and a driver is
running on a master
+ * but in different JVMs. Hence, the network connection between
driver/executor and master/worker
+ * is the same. Because executor and worker are running on different JVMs,
worker can still send
+ * heartbeat to master when executor performs GC.
+ *
+ * For new Heartbeat Receiver, if driver does not receive any heartbeat from
the executor for
+ * `executorTimeoutMs` (default: 60s) seconds, HeartbeatReceiver will send a
request to master to
+ * ask for the latest heartbeat from the worker which the executor runs on
`workerLastHeartbeat`.
+ * HeartbeatReceiver can determine whether the heartbeat loss is caused by
network issues or other
+ * issues (e.g. GC). If the heartbeat loss is not caused by network issues,
the HeartbeatReceiver
+ * will put the executor into a waitingList rather than expiring it
immediately.
+ *
+ * [Note]: Definition of `network issues`
+ * Here, the definition `network issues` is the issues that related to
network directly. If the
+ * network is connected, the issues do not included in `network issues`. For
example, an
+ * executor's JVM is closed by a problematic task, so the JVM will notify
driver that the socket
+ * is closed. If the network is connected, driver will receive the
notification and trigger the
+ * function `onDisconnected`. This issue is not a `network issue` because
the network is
+ * connected.
+ *
+ * [Warning 1]
+ * Worker will send heartbeats to Master every (conf.get(WORKER_TIMEOUT) *
1000 / 4) milliseconds.
+ * Check deploy/worker/Worker.scala for more details. This new mechanism
design is based on the
+ * assumption: (executorTimeoutMs / 2) > (conf.get(WORKER_TIMEOUT) * 1000 /
4).
+ *
+ * [Warning 2]
+ * Not every deployment method schedules driver on master.
+ */
logTrace("Checking for hosts with no recent heartbeats in
HeartbeatReceiver.")
val now = clock.getTimeMillis()
- for ((executorId, lastSeenMs) <- executorLastSeen) {
- if (now - lastSeenMs > executorTimeoutMs) {
- logWarning(s"Removing executor $executorId with no recent heartbeats:
" +
- s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
- // Asynchronously kill the executor to avoid blocking the current
thread
- killExecutorThread.submit(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- // Note: we want to get an executor back after expiring this one,
- // so do not simply call `sc.killExecutor` here (SPARK-8119)
- sc.killAndReplaceExecutor(executorId)
- // SPARK-27348: in case of the executors which are not gracefully
shut down,
- // we should remove lost executors from
CoarseGrainedSchedulerBackend manually
- // here to guarantee two things:
- // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
- // a lost executor instead of waiting for disconnect message
- // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
- // those executors to avoid app hang
- sc.schedulerBackend match {
- case backend: CoarseGrainedSchedulerBackend =>
- backend.driverEndpoint.send(RemoveExecutor(executorId,
- ExecutorProcessLost(
- s"Executor heartbeat timed out after ${now - lastSeenMs}
ms")))
-
- // LocalSchedulerBackend is used locally and only has one single
executor
- case _: LocalSchedulerBackend =>
-
- case other => throw new UnsupportedOperationException(
- s"Unknown scheduler backend: ${other.getClass}")
- }
+ if (!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT)) {
Review Comment:
Pull this out as an instance variable
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -199,41 +222,131 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
removeExecutor(executorRemoved.executorId)
}
+ private def killExecutor(executorId: String, timeout: Long): Unit = {
+ logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+ s"${timeout} ms exceeds timeout $executorTimeoutMs ms")
+ killExecutorThread.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ // Note: we want to get an executor back after expiring this one,
+ // so do not simply call `sc.killExecutor` here (SPARK-8119)
+ sc.killAndReplaceExecutor(executorId)
+ // SPARK-27348: in case of the executors which are not gracefully shut
down,
+ // we should remove lost executors from CoarseGrainedSchedulerBackend
manually
+ // here to guarantee two things:
+ // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
+ // a lost executor instead of waiting for disconnect message
+ // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
+ // those executors to avoid app hang
+ sc.schedulerBackend match {
+ case backend: CoarseGrainedSchedulerBackend =>
+ backend.driverEndpoint.send(RemoveExecutor(executorId,
+ ExecutorProcessLost(
+ s"Executor heartbeat timed out after ${timeout} ms",
+ causedByApp =
!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT))))
+
+ // LocalSchedulerBackend is used locally and only has one single
executor
+ case _: LocalSchedulerBackend =>
+
+ case other => throw new UnsupportedOperationException(
+ s"Unknown scheduler backend: ${other.getClass}")
+ }
+ }
+ })
+ }
+
private def expireDeadHosts(): Unit = {
+ /**
+ * [SC-105641]
+ * Originally, the driver’s HeartbeatReceiver will expire an executor if it
does not receive any
+ * heartbeat from the executor for 120 seconds. However, 120 seconds is too
long, but we will face
+ * other challenges when we try to lower the timeout threshold. To
elaborate, when an executor is
+ * performing full GC, it cannot send/reply any message. Next paragraphs
describe the solution to
+ * detect network disconnection between driver and executor in a short time.
+ *
+ * An executor is running on a worker but in different JVMs, and a driver is
running on a master
+ * but in different JVMs. Hence, the network connection between
driver/executor and master/worker
+ * is the same. Because executor and worker are running on different JVMs,
worker can still send
+ * heartbeat to master when executor performs GC.
+ *
+ * For new Heartbeat Receiver, if driver does not receive any heartbeat from
the executor for
+ * `executorTimeoutMs` (default: 60s) seconds, HeartbeatReceiver will send a
request to master to
+ * ask for the latest heartbeat from the worker which the executor runs on
`workerLastHeartbeat`.
+ * HeartbeatReceiver can determine whether the heartbeat loss is caused by
network issues or other
+ * issues (e.g. GC). If the heartbeat loss is not caused by network issues,
the HeartbeatReceiver
+ * will put the executor into a waitingList rather than expiring it
immediately.
+ *
+ * [Note]: Definition of `network issues`
+ * Here, the definition `network issues` is the issues that related to
network directly. If the
+ * network is connected, the issues do not included in `network issues`. For
example, an
+ * executor's JVM is closed by a problematic task, so the JVM will notify
driver that the socket
+ * is closed. If the network is connected, driver will receive the
notification and trigger the
+ * function `onDisconnected`. This issue is not a `network issue` because
the network is
+ * connected.
+ *
+ * [Warning 1]
+ * Worker will send heartbeats to Master every (conf.get(WORKER_TIMEOUT) *
1000 / 4) milliseconds.
+ * Check deploy/worker/Worker.scala for more details. This new mechanism
design is based on the
+ * assumption: (executorTimeoutMs / 2) > (conf.get(WORKER_TIMEOUT) * 1000 /
4).
+ *
+ * [Warning 2]
+ * Not every deployment method schedules driver on master.
+ */
logTrace("Checking for hosts with no recent heartbeats in
HeartbeatReceiver.")
val now = clock.getTimeMillis()
- for ((executorId, lastSeenMs) <- executorLastSeen) {
- if (now - lastSeenMs > executorTimeoutMs) {
- logWarning(s"Removing executor $executorId with no recent heartbeats:
" +
- s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
- // Asynchronously kill the executor to avoid blocking the current
thread
- killExecutorThread.submit(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- // Note: we want to get an executor back after expiring this one,
- // so do not simply call `sc.killExecutor` here (SPARK-8119)
- sc.killAndReplaceExecutor(executorId)
- // SPARK-27348: in case of the executors which are not gracefully
shut down,
- // we should remove lost executors from
CoarseGrainedSchedulerBackend manually
- // here to guarantee two things:
- // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
- // a lost executor instead of waiting for disconnect message
- // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
- // those executors to avoid app hang
- sc.schedulerBackend match {
- case backend: CoarseGrainedSchedulerBackend =>
- backend.driverEndpoint.send(RemoveExecutor(executorId,
- ExecutorProcessLost(
- s"Executor heartbeat timed out after ${now - lastSeenMs}
ms")))
-
- // LocalSchedulerBackend is used locally and only has one single
executor
- case _: LocalSchedulerBackend =>
-
- case other => throw new UnsupportedOperationException(
- s"Unknown scheduler backend: ${other.getClass}")
- }
+ if (!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT)) {
+ for ((executorId, lastSeenMs) <- executorLastSeen) {
+ if (now - lastSeenMs > executorTimeoutMs) {
+ killExecutor(executorId, now - lastSeenMs)
+ waitingList.remove(executorId)
+ executorLastSeen.remove(executorId)
+ }
+ }
+ } else {
+ for ((executorId, workerLastHeartbeat) <- waitingList) {
+ if (now - workerLastHeartbeat > executorTimeoutMs / 2) {
Review Comment:
why `executorTimeoutMs / 2` ?
##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -199,41 +222,131 @@ private[spark] class HeartbeatReceiver(sc: SparkContext,
clock: Clock)
removeExecutor(executorRemoved.executorId)
}
+ private def killExecutor(executorId: String, timeout: Long): Unit = {
+ logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+ s"${timeout} ms exceeds timeout $executorTimeoutMs ms")
+ killExecutorThread.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ // Note: we want to get an executor back after expiring this one,
+ // so do not simply call `sc.killExecutor` here (SPARK-8119)
+ sc.killAndReplaceExecutor(executorId)
+ // SPARK-27348: in case of the executors which are not gracefully shut
down,
+ // we should remove lost executors from CoarseGrainedSchedulerBackend
manually
+ // here to guarantee two things:
+ // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
+ // a lost executor instead of waiting for disconnect message
+ // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
+ // those executors to avoid app hang
+ sc.schedulerBackend match {
+ case backend: CoarseGrainedSchedulerBackend =>
+ backend.driverEndpoint.send(RemoveExecutor(executorId,
+ ExecutorProcessLost(
+ s"Executor heartbeat timed out after ${timeout} ms",
+ causedByApp =
!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT))))
+
+ // LocalSchedulerBackend is used locally and only has one single
executor
+ case _: LocalSchedulerBackend =>
+
+ case other => throw new UnsupportedOperationException(
+ s"Unknown scheduler backend: ${other.getClass}")
+ }
+ }
+ })
+ }
+
private def expireDeadHosts(): Unit = {
+ /**
+ * [SC-105641]
+ * Originally, the driver’s HeartbeatReceiver will expire an executor if it
does not receive any
+ * heartbeat from the executor for 120 seconds. However, 120 seconds is too
long, but we will face
+ * other challenges when we try to lower the timeout threshold. To
elaborate, when an executor is
+ * performing full GC, it cannot send/reply any message. Next paragraphs
describe the solution to
+ * detect network disconnection between driver and executor in a short time.
+ *
+ * An executor is running on a worker but in different JVMs, and a driver is
running on a master
+ * but in different JVMs. Hence, the network connection between
driver/executor and master/worker
+ * is the same. Because executor and worker are running on different JVMs,
worker can still send
+ * heartbeat to master when executor performs GC.
+ *
+ * For new Heartbeat Receiver, if driver does not receive any heartbeat from
the executor for
+ * `executorTimeoutMs` (default: 60s) seconds, HeartbeatReceiver will send a
request to master to
+ * ask for the latest heartbeat from the worker which the executor runs on
`workerLastHeartbeat`.
+ * HeartbeatReceiver can determine whether the heartbeat loss is caused by
network issues or other
+ * issues (e.g. GC). If the heartbeat loss is not caused by network issues,
the HeartbeatReceiver
+ * will put the executor into a waitingList rather than expiring it
immediately.
+ *
+ * [Note]: Definition of `network issues`
+ * Here, the definition `network issues` is the issues that related to
network directly. If the
+ * network is connected, the issues do not included in `network issues`. For
example, an
+ * executor's JVM is closed by a problematic task, so the JVM will notify
driver that the socket
+ * is closed. If the network is connected, driver will receive the
notification and trigger the
+ * function `onDisconnected`. This issue is not a `network issue` because
the network is
+ * connected.
+ *
+ * [Warning 1]
+ * Worker will send heartbeats to Master every (conf.get(WORKER_TIMEOUT) *
1000 / 4) milliseconds.
+ * Check deploy/worker/Worker.scala for more details. This new mechanism
design is based on the
+ * assumption: (executorTimeoutMs / 2) > (conf.get(WORKER_TIMEOUT) * 1000 /
4).
+ *
+ * [Warning 2]
+ * Not every deployment method schedules driver on master.
+ */
logTrace("Checking for hosts with no recent heartbeats in
HeartbeatReceiver.")
val now = clock.getTimeMillis()
- for ((executorId, lastSeenMs) <- executorLastSeen) {
- if (now - lastSeenMs > executorTimeoutMs) {
- logWarning(s"Removing executor $executorId with no recent heartbeats:
" +
- s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
- // Asynchronously kill the executor to avoid blocking the current
thread
- killExecutorThread.submit(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- // Note: we want to get an executor back after expiring this one,
- // so do not simply call `sc.killExecutor` here (SPARK-8119)
- sc.killAndReplaceExecutor(executorId)
- // SPARK-27348: in case of the executors which are not gracefully
shut down,
- // we should remove lost executors from
CoarseGrainedSchedulerBackend manually
- // here to guarantee two things:
- // 1) explicitly remove executor information from
CoarseGrainedSchedulerBackend for
- // a lost executor instead of waiting for disconnect message
- // 2) call scheduler.executorLost() underlying to fail any tasks
assigned to
- // those executors to avoid app hang
- sc.schedulerBackend match {
- case backend: CoarseGrainedSchedulerBackend =>
- backend.driverEndpoint.send(RemoveExecutor(executorId,
- ExecutorProcessLost(
- s"Executor heartbeat timed out after ${now - lastSeenMs}
ms")))
-
- // LocalSchedulerBackend is used locally and only has one single
executor
- case _: LocalSchedulerBackend =>
-
- case other => throw new UnsupportedOperationException(
- s"Unknown scheduler backend: ${other.getClass}")
- }
+ if (!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT)) {
Review Comment:
Pull this out as an instance variable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]