kevin85421 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r940733267


##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -199,41 +222,131 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
     removeExecutor(executorRemoved.executorId)
   }
 
+  private def killExecutor(executorId: String, timeout: Long): Unit = {
+    logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+      s"${timeout} ms exceeds timeout $executorTimeoutMs ms")
+    killExecutorThread.submit(new Runnable {
+      override def run(): Unit = Utils.tryLogNonFatalError {
+        // Note: we want to get an executor back after expiring this one,
+        // so do not simply call `sc.killExecutor` here (SPARK-8119)
+        sc.killAndReplaceExecutor(executorId)
+        // SPARK-27348: in case of the executors which are not gracefully shut 
down,
+        // we should remove lost executors from CoarseGrainedSchedulerBackend 
manually
+        // here to guarantee two things:
+        // 1) explicitly remove executor information from 
CoarseGrainedSchedulerBackend for
+        //    a lost executor instead of waiting for disconnect message
+        // 2) call scheduler.executorLost() underlying to fail any tasks 
assigned to
+        //    those executors to avoid app hang
+        sc.schedulerBackend match {
+          case backend: CoarseGrainedSchedulerBackend =>
+            backend.driverEndpoint.send(RemoveExecutor(executorId,
+              ExecutorProcessLost(
+                s"Executor heartbeat timed out after ${timeout} ms",
+                causedByApp = 
!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT))))
+
+          // LocalSchedulerBackend is used locally and only has one single 
executor
+          case _: LocalSchedulerBackend =>
+
+          case other => throw new UnsupportedOperationException(
+            s"Unknown scheduler backend: ${other.getClass}")
+        }
+      }
+    })
+  }
+
   private def expireDeadHosts(): Unit = {
+  /**
+   * [SC-105641]
+   * Originally, the driver’s HeartbeatReceiver will expire an executor if it 
does not receive any
+   * heartbeat from the executor for 120 seconds. However, 120 seconds is too 
long, but we will face
+   * other challenges when we try to lower the timeout threshold. To 
elaborate, when an executor is
+   * performing full GC, it cannot send/reply any message. Next paragraphs 
describe the solution to
+   * detect network disconnection between driver and executor in a short time.
+   *
+   * An executor is running on a worker but in different JVMs, and a driver is 
running on a master
+   * but in different JVMs. Hence, the network connection between 
driver/executor and master/worker
+   * is the same. Because executor and worker are running on different JVMs, 
worker can still send
+   * heartbeat to master when executor performs GC.
+   *
+   * For new Heartbeat Receiver, if driver does not receive any heartbeat from 
the executor for
+   * `executorTimeoutMs` (default: 60s) seconds, HeartbeatReceiver will send a 
request to master to
+   * ask for the latest heartbeat from the worker which the executor runs on 
`workerLastHeartbeat`.
+   * HeartbeatReceiver can determine whether the heartbeat loss is caused by 
network issues or other
+   * issues (e.g. GC). If the heartbeat loss is not caused by network issues, 
the HeartbeatReceiver
+   * will put the executor into a waitingList rather than expiring it 
immediately.
+   *
+   * [Note]: Definition of `network issues`
+   * Here, the definition `network issues` is the issues that related to 
network directly. If the
+   * network is connected, the issues do not included in `network issues`. For 
example, an
+   * executor's JVM is closed by a problematic task, so the JVM will notify 
driver that the socket
+   * is closed. If the network is connected, driver will receive the 
notification and trigger the
+   * function `onDisconnected`. This issue is not a `network issue` because 
the network is
+   * connected.
+   *
+   * [Warning 1]
+   * Worker will send heartbeats to Master every (conf.get(WORKER_TIMEOUT) * 
1000 / 4) milliseconds.
+   * Check deploy/worker/Worker.scala for more details. This new mechanism 
design is based on the
+   * assumption: (executorTimeoutMs / 2) > (conf.get(WORKER_TIMEOUT) * 1000 / 
4).
+   *
+   * [Warning 2]
+   * Not every deployment method schedules driver on master.
+   */
     logTrace("Checking for hosts with no recent heartbeats in 
HeartbeatReceiver.")
     val now = clock.getTimeMillis()
-    for ((executorId, lastSeenMs) <- executorLastSeen) {
-      if (now - lastSeenMs > executorTimeoutMs) {
-        logWarning(s"Removing executor $executorId with no recent heartbeats: 
" +
-          s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
-        // Asynchronously kill the executor to avoid blocking the current 
thread
-        killExecutorThread.submit(new Runnable {
-          override def run(): Unit = Utils.tryLogNonFatalError {
-            // Note: we want to get an executor back after expiring this one,
-            // so do not simply call `sc.killExecutor` here (SPARK-8119)
-            sc.killAndReplaceExecutor(executorId)
-            // SPARK-27348: in case of the executors which are not gracefully 
shut down,
-            // we should remove lost executors from 
CoarseGrainedSchedulerBackend manually
-            // here to guarantee two things:
-            // 1) explicitly remove executor information from 
CoarseGrainedSchedulerBackend for
-            //    a lost executor instead of waiting for disconnect message
-            // 2) call scheduler.executorLost() underlying to fail any tasks 
assigned to
-            //    those executors to avoid app hang
-            sc.schedulerBackend match {
-              case backend: CoarseGrainedSchedulerBackend =>
-                backend.driverEndpoint.send(RemoveExecutor(executorId,
-                  ExecutorProcessLost(
-                    s"Executor heartbeat timed out after ${now - lastSeenMs} 
ms")))
-
-              // LocalSchedulerBackend is used locally and only has one single 
executor
-              case _: LocalSchedulerBackend =>
-
-              case other => throw new UnsupportedOperationException(
-                s"Unknown scheduler backend: ${other.getClass}")
-            }
+    if (!sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT)) {

Review Comment:
   Resolved.



##########
core/src/main/scala/org/apache/spark/internal/config/Network.scala:
##########
@@ -49,7 +49,13 @@ private[spark] object Network {
     ConfigBuilder("spark.network.timeoutInterval")
       .version("1.3.2")
       .timeConf(TimeUnit.MILLISECONDS)
-      
.createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString)
+      .createWithDefaultString("15s")
+
+  private[spark] val NETWORK_EXECUTOR_TIMEOUT =
+    ConfigBuilder("spark.network.executorTimeout")
+      .version("1.3.0")

Review Comment:
   Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to