kevin85421 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r942063379


##########
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##########
@@ -77,17 +77,44 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 
   private[spark] var scheduler: TaskScheduler = null
 
-  // executor ID -> timestamp of when the last heartbeat from this executor 
was received
+  /**
+   * [SPARK-39984]
+   * Please make sure the intersection between `executorLastSeen` and 
`waitingList` is an empty set.
+   * If the intersection is not empty, it is possible to never kill the 
executor until the executor
+   * recovers. When an executor is in both `executorLastSeen` and 
`waitingList`, the value of
+   * `workerLastHeartbeat` in waitingList may update if the worker sends 
heartbeats to master
+   * normally.
+   *
+   * `executorLastSeen`:
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from this executor was 
received
+   *
+   *  `waitingList`: executor ID -> WorkerLastHeartbeat
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from the worker was 
received
+   *
+   * when driver does not receive any heartbeat from an executor for 
`executorTimeoutMs` seconds,
+   * the driver will ask master for the last heartbeat from the worker which 
the executor is running
+   * on.
+   */
   private val executorLastSeen = new HashMap[String, Long]
+  private val waitingList = new HashMap[String, Long]
 
   private val executorTimeoutMs = sc.conf.get(
     config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
-  
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
+  
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT)}s"))
 
   private val checkTimeoutIntervalMs = 
sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
   private val executorHeartbeatIntervalMs = 
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
 
+  /**
+   * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend.
+   */
+  private val checkWorkerLastHeartbeat =
+    sc.conf.get(HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT) &&
+      sc.schedulerBackend.isInstanceOf[StandaloneSchedulerBackend]

Review Comment:
   I have revisited all `sc.schedulerBackend` occurrences to prevent the 
exceptions caused by the lack of scheduler backend.
   
   
   1. `ExecutorRegistered` / `ExecutorRemoved` / `Heartbeat` => As mentioned 
above, heartbeat receiver will receive all of these messages only after 
SchedulerBackend is initialized.
   
   2. `killExecutor` => pattern matching is exhaustive
   
   3. `expireDeadHosts` => Handled by new commit 
https://github.com/apache/spark/pull/37411/commits/77192300cb6990dfc60acc3227c6ae95e0223726



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to