Ngone51 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r960664710


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2422,4 +2422,24 @@ package object config {
       .version("3.4.0")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("5s")
+
+  private[spark] val HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT =
+    ConfigBuilder("spark.driver.heartbeat.checkWorkerLastHeartbeat")
+      .doc("If this config is set to true, heartbeat receiver will check 
worker's heartbeat when" +
+        "the receiver does not receive any heartbeat from an executor during a 
period. In" +
+        "addition, this config is only for standalone scheduler. See 
[SPARK-39984] for more" +
+        "details.")
+      .internal()
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT =
+    ConfigBuilder("spark.driver.heartbeat.expiryCandidatesTimeout")
+      .doc("This config is a timeout used for heartbeat receiver 
`executorExpiryCandidates`. Be" +
+        "effective only when HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT is 
enabled. See" +

Review Comment:
   ```suggestion
           s"effective only when 
${HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT.key} is enabled. See" +
   ```



##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 
   private[spark] var scheduler: TaskScheduler = null
 
-  // executor ID -> timestamp of when the last heartbeat from this executor 
was received
+  /**
+   * [SPARK-39984]
+   * Please make sure the intersection between `executorLastSeen` and 
`executorExpiryCandidates` is
+   * an empty set. If the intersection is not empty, it is possible to never 
kill the executor until
+   * the executor recovers. When an executor is in both `executorLastSeen` and
+   * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in 
`executorExpiryCandidates`
+   * may update if the worker sends heartbeats to master normally.
+   *
+   * `executorLastSeen`:
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from this executor was 
received
+   *
+   * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from the worker was 
received
+   *
+   * when driver does not receive any heartbeat from an executor for 
`executorTimeoutMs` seconds,
+   * the driver will ask master for the last heartbeat from the worker which 
the executor is running
+   * on.
+   */
   private val executorLastSeen = new HashMap[String, Long]
+  private val executorExpiryCandidates = new HashMap[String, Long]
 
   private val executorTimeoutMs = sc.conf.get(
     config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
-  
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
+  ).getOrElse(
+    sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match {
+      case Some(executorTimeout) => executorTimeout
+      case None => 
Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")
+    }
+  )
 
   private val checkTimeoutIntervalMs = 
sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
   private val executorHeartbeatIntervalMs = 
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
 
+  /**
+   * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend.
+   *
+   * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout.
+   * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates.
+   */
+  private val checkWorkerLastHeartbeat = {

Review Comment:
   > However, it cannot totally prevent this edge case
   
   Theoretically, I agree it doesn't but given that checkTimeoutIntervalMs by 
default is 60s it's almost impossible to have scheduler backend not initialized 
when the first expireDeadHosts raised.
   
   
   > Hence, my thought is to define
   checkWorkerLastHeartbeat as a function, and we can totally prevent the edge 
case. Does it make sense?
   
   I don't think it makes a big difference with the already defined 
`isStandalone()` function. Maybe just leave it as is if we have no better idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to