[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

2022-09-02 Thread GitBox


Ngone51 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r960664710


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2422,4 +2422,24 @@ package object config {
   .version("3.4.0")
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefaultString("5s")
+
+  private[spark] val HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT =
+ConfigBuilder("spark.driver.heartbeat.checkWorkerLastHeartbeat")
+  .doc("If this config is set to true, heartbeat receiver will check 
worker's heartbeat when" +
+"the receiver does not receive any heartbeat from an executor during a 
period. In" +
+"addition, this config is only for standalone scheduler. See 
[SPARK-39984] for more" +
+"details.")
+  .internal()
+  .version("3.4.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT =
+ConfigBuilder("spark.driver.heartbeat.expiryCandidatesTimeout")
+  .doc("This config is a timeout used for heartbeat receiver 
`executorExpiryCandidates`. Be" +
+"effective only when HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT is 
enabled. See" +

Review Comment:
   ```suggestion
   s"effective only when 
${HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT.key} is enabled. See" +
   ```



##
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##
@@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 
   private[spark] var scheduler: TaskScheduler = null
 
-  // executor ID -> timestamp of when the last heartbeat from this executor 
was received
+  /**
+   * [SPARK-39984]
+   * Please make sure the intersection between `executorLastSeen` and 
`executorExpiryCandidates` is
+   * an empty set. If the intersection is not empty, it is possible to never 
kill the executor until
+   * the executor recovers. When an executor is in both `executorLastSeen` and
+   * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in 
`executorExpiryCandidates`
+   * may update if the worker sends heartbeats to master normally.
+   *
+   * `executorLastSeen`:
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from this executor was 
received
+   *
+   * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from the worker was 
received
+   *
+   * when driver does not receive any heartbeat from an executor for 
`executorTimeoutMs` seconds,
+   * the driver will ask master for the last heartbeat from the worker which 
the executor is running
+   * on.
+   */
   private val executorLastSeen = new HashMap[String, Long]
+  private val executorExpiryCandidates = new HashMap[String, Long]
 
   private val executorTimeoutMs = sc.conf.get(
 config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
-  
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
+  ).getOrElse(
+sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match {
+  case Some(executorTimeout) => executorTimeout
+  case None => 
Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")
+}
+  )
 
   private val checkTimeoutIntervalMs = 
sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
   private val executorHeartbeatIntervalMs = 
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
 
+  /**
+   * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend.
+   *
+   * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout.
+   * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates.
+   */
+  private val checkWorkerLastHeartbeat = {

Review Comment:
   > However, it cannot totally prevent this edge case
   
   Theoretically, I agree it doesn't but given that checkTimeoutIntervalMs by 
default is 60s it's almost impossible to have scheduler backend not initialized 
when the first expireDeadHosts raised.
   
   
   > Hence, my thought is to define
   checkWorkerLastHeartbeat as a function, and we can totally prevent the edge 
case. Does it make sense?
   
   I don't think it makes a big difference with the already defined 
`isStandalone()` function. Maybe just leave it as is if we have no better idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

2022-09-01 Thread GitBox


Ngone51 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r960646158


##
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##
@@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 
   private[spark] var scheduler: TaskScheduler = null
 
-  // executor ID -> timestamp of when the last heartbeat from this executor 
was received
+  /**
+   * [SPARK-39984]
+   * Please make sure the intersection between `executorLastSeen` and 
`executorExpiryCandidates` is
+   * an empty set. If the intersection is not empty, it is possible to never 
kill the executor until
+   * the executor recovers. When an executor is in both `executorLastSeen` and
+   * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in 
`executorExpiryCandidates`
+   * may update if the worker sends heartbeats to master normally.
+   *
+   * `executorLastSeen`:
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from this executor was 
received
+   *
+   * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from the worker was 
received
+   *
+   * when driver does not receive any heartbeat from an executor for 
`executorTimeoutMs` seconds,
+   * the driver will ask master for the last heartbeat from the worker which 
the executor is running
+   * on.
+   */
   private val executorLastSeen = new HashMap[String, Long]
+  private val executorExpiryCandidates = new HashMap[String, Long]
 
   private val executorTimeoutMs = sc.conf.get(
 config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
-  
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
+  ).getOrElse(
+sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match {
+  case Some(executorTimeout) => executorTimeout
+  case None => 
Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")
+}
+  )
 
   private val checkTimeoutIntervalMs = 
sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
   private val executorHeartbeatIntervalMs = 
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
 
+  /**
+   * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend.
+   *
+   * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout.
+   * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates.
+   */
+  private val checkWorkerLastHeartbeat = {

Review Comment:
   > However, if the first expireDeadHosts trigger is prior to scheduler 
backend initialization, the value of checkWorkerLastHeartbeat will be false
   
   I thought there would be an initial delay for the first `expireDeadHosts` so 
`checkWorkerLastHeartbeat` is less likely to return false. But seems it doesn't 
have the initial delay, though I think it doesn't make sense.
   
   I actually think we can set the initial delay to its check interval: 
`checkTimeoutIntervalMs`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

2022-08-24 Thread GitBox


Ngone51 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r953806281


##
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##
@@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 
   private[spark] var scheduler: TaskScheduler = null
 
-  // executor ID -> timestamp of when the last heartbeat from this executor 
was received
+  /**
+   * [SPARK-39984]
+   * Please make sure the intersection between `executorLastSeen` and 
`executorExpiryCandidates` is
+   * an empty set. If the intersection is not empty, it is possible to never 
kill the executor until
+   * the executor recovers. When an executor is in both `executorLastSeen` and
+   * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in 
`executorExpiryCandidates`
+   * may update if the worker sends heartbeats to master normally.
+   *
+   * `executorLastSeen`:
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from this executor was 
received
+   *
+   * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from the worker was 
received
+   *
+   * when driver does not receive any heartbeat from an executor for 
`executorTimeoutMs` seconds,
+   * the driver will ask master for the last heartbeat from the worker which 
the executor is running
+   * on.
+   */
   private val executorLastSeen = new HashMap[String, Long]
+  private val executorExpiryCandidates = new HashMap[String, Long]
 
   private val executorTimeoutMs = sc.conf.get(
 config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
-  
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
+  ).getOrElse(
+sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match {
+  case Some(executorTimeout) => executorTimeout
+  case None => 
Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")
+}
+  )
 
   private val checkTimeoutIntervalMs = 
sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
   private val executorHeartbeatIntervalMs = 
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
 
+  /**
+   * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend.
+   *
+   * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout.
+   * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates.
+   */
+  private val checkWorkerLastHeartbeat = {

Review Comment:
   Shall we make this a lazy val? Then, we can check 
`sc.schedulerBackend.isInstanceOf[StandaloneSchedulerBackend]` directly here 
instead of calling a fucntion. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

2022-08-24 Thread GitBox


Ngone51 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r953806461


##
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##
@@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 
   private[spark] var scheduler: TaskScheduler = null
 
-  // executor ID -> timestamp of when the last heartbeat from this executor 
was received
+  /**
+   * [SPARK-39984]
+   * Please make sure the intersection between `executorLastSeen` and 
`executorExpiryCandidates` is
+   * an empty set. If the intersection is not empty, it is possible to never 
kill the executor until
+   * the executor recovers. When an executor is in both `executorLastSeen` and
+   * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in 
`executorExpiryCandidates`
+   * may update if the worker sends heartbeats to master normally.
+   *
+   * `executorLastSeen`:
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from this executor was 
received
+   *
+   * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from the worker was 
received
+   *
+   * when driver does not receive any heartbeat from an executor for 
`executorTimeoutMs` seconds,
+   * the driver will ask master for the last heartbeat from the worker which 
the executor is running
+   * on.
+   */
   private val executorLastSeen = new HashMap[String, Long]
+  private val executorExpiryCandidates = new HashMap[String, Long]
 
   private val executorTimeoutMs = sc.conf.get(
 config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
-  
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
+  ).getOrElse(
+sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match {
+  case Some(executorTimeout) => executorTimeout
+  case None => 
Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")
+}
+  )
 
   private val checkTimeoutIntervalMs = 
sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
   private val executorHeartbeatIntervalMs = 
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
 
+  /**
+   * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend.
+   *
+   * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout.
+   * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates.
+   */
+  private val checkWorkerLastHeartbeat = {
+val isEnabled = 
sc.conf.get(config.HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT)
+if (isEnabled) logWarning(s"Keep `expiryCandidatesTimeout` larger than 
`HEARTBEAT_MILLIS` in" +
+  s"deploy/worker/Worker.scala to know whether master lost any heartbeat 
from the" +
+  s"worker or not.")
+isEnabled
+  }
+
+  private val expiryCandidatesTimeout = checkWorkerLastHeartbeat match {

Review Comment:
   lazy too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

2022-08-24 Thread GitBox


Ngone51 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r953803448


##
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##
@@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 
   private[spark] var scheduler: TaskScheduler = null
 
-  // executor ID -> timestamp of when the last heartbeat from this executor 
was received
+  /**
+   * [SPARK-39984]
+   * Please make sure the intersection between `executorLastSeen` and 
`executorExpiryCandidates` is
+   * an empty set. If the intersection is not empty, it is possible to never 
kill the executor until
+   * the executor recovers. When an executor is in both `executorLastSeen` and
+   * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in 
`executorExpiryCandidates`
+   * may update if the worker sends heartbeats to master normally.
+   *
+   * `executorLastSeen`:
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from this executor was 
received
+   *
+   * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat
+   *  - key: executor ID
+   *  - value: timestamp of when the last heartbeat from the worker was 
received
+   *
+   * when driver does not receive any heartbeat from an executor for 
`executorTimeoutMs` seconds,
+   * the driver will ask master for the last heartbeat from the worker which 
the executor is running
+   * on.
+   */
   private val executorLastSeen = new HashMap[String, Long]
+  private val executorExpiryCandidates = new HashMap[String, Long]
 
   private val executorTimeoutMs = sc.conf.get(
 config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
-  
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
+  ).getOrElse(
+sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match {
+  case Some(executorTimeout) => executorTimeout
+  case None => 
Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")
+}
+  )
 
   private val checkTimeoutIntervalMs = 
sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
   private val executorHeartbeatIntervalMs = 
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)
 
+  /**
+   * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend.
+   *
+   * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout.
+   * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates.
+   */
+  private val checkWorkerLastHeartbeat = {
+val isEnabled = 
sc.conf.get(config.HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT)
+if (isEnabled) logWarning(s"Keep `expiryCandidatesTimeout` larger than 
`HEARTBEAT_MILLIS` in" +
+  s"deploy/worker/Worker.scala to know whether master lost any heartbeat 
from the" +
+  s"worker or not.")
+isEnabled
+  }
+
+  private val expiryCandidatesTimeout = checkWorkerLastHeartbeat match {
+case true => sc.conf.get(config.HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT)

Review Comment:
   Shall we move this warning to here? And reword it like this:
   
   "Worker heartbeat check enabled. Note it only works normally if the 
configured ${config.HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT.key} is 
larger than worker's heartbeat interval."
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

2022-08-24 Thread GitBox


Ngone51 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r953765470


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2398,4 +2398,20 @@ package object config {
   .version("3.3.0")
   .intConf
   .createWithDefault(5)
+
+  private[spark] val HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT =
+ConfigBuilder("spark.driver.heartbeat.checkWorkerLastHeartbeat")
+  .internal()

Review Comment:
   Add doc for it and mention that this only works for Standalone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

2022-08-14 Thread GitBox


Ngone51 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r945376133


##
core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala:
##
@@ -199,41 +241,120 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 removeExecutor(executorRemoved.executorId)
   }
 
+  private def killExecutor(executorId: String, timeout: Long): Unit = {
+logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+  s"${timeout} ms exceeds timeout $executorTimeoutMs ms")
+killExecutorThread.submit(new Runnable {
+  override def run(): Unit = Utils.tryLogNonFatalError {
+// Note: we want to get an executor back after expiring this one,
+// so do not simply call `sc.killExecutor` here (SPARK-8119)
+sc.killAndReplaceExecutor(executorId)
+// SPARK-27348: in case of the executors which are not gracefully shut 
down,
+// we should remove lost executors from CoarseGrainedSchedulerBackend 
manually
+// here to guarantee two things:
+// 1) explicitly remove executor information from 
CoarseGrainedSchedulerBackend for
+//a lost executor instead of waiting for disconnect message
+// 2) call scheduler.executorLost() underlying to fail any tasks 
assigned to
+//those executors to avoid app hang
+sc.schedulerBackend match {
+  case backend: CoarseGrainedSchedulerBackend =>
+// TODO (SPARK-39984): Update causedByApp when we have a hanging 
task detector
+backend.driverEndpoint.send(RemoveExecutor(executorId,
+  ExecutorProcessLost(
+s"Executor heartbeat timed out after ${timeout} ms")))
+  // LocalSchedulerBackend is used locally and only has one single 
executor
+  case _: LocalSchedulerBackend =>
+
+  case other => throw new UnsupportedOperationException(
+s"Unknown scheduler backend: ${other.getClass}")
+}
+  }
+})
+  }
+
+  private def isStandalone(): Boolean = {
+sc.schedulerBackend match {
+  case backend: StandaloneSchedulerBackend => true
+  case _ => false
+}
+  }
+
+  private def removeExecutorFromExpiryCandidates(executorId: String): Unit = {
+if (checkWorkerLastHeartbeat && isStandalone()) {
+  executorExpiryCandidates.remove(executorId)
+}
+  }
+
   private def expireDeadHosts(): Unit = {
+  /**
+   * [SPARK-39984]
+   * The driver’s HeartbeatReceiver will expire an executor if it does not 
receive any heartbeat
+   * from the executor for `executorTimeoutMs` (default 120s) seconds. 
However, lowering from 120
+   * seconds has other challenges. For example: when executor is performing 
full GC, it cannot
+   * send/reply any message for tens of seconds (based on your environment). 
Hence,
+   * HeartbeatReceiver cannot whether the heartbeat loss is caused by network 
issues or other
+   * reasons (e.g. full GC). To address this, we designed a new Heartbeat 
Receiver mechanism for
+   * standalone deployments.
+   *
+   * For standalone deployments:
+   * If driver does not receive any heartbeat from the executor for 
`executorTimeoutMs` seconds,
+   * HeartbeatReceiver will send a request to master to ask for the latest 
heartbeat from the
+   * worker which the executor runs on. HeartbeatReceiver can determine 
whether the heartbeat loss
+   * is caused by network issues or other issues (e.g. GC). If the heartbeat 
loss is not caused by
+   * network issues, the HeartbeatReceiver will put the executor into 
`executorExpiryCandidates`
+   * rather than expiring it immediately.
+   */
 logTrace("Checking for hosts with no recent heartbeats in 
HeartbeatReceiver.")
+logWarning(s"Keep `expiryCandidatesTimeout` larger than `HEARTBEAT_MILLIS` 
in" +
+  s"deploy/worker/Worker.scala to know whether master lost any heartbeat 
from the" +
+  s"worker or not.")
 val now = clock.getTimeMillis()
-for ((executorId, lastSeenMs) <- executorLastSeen) {
-  if (now - lastSeenMs > executorTimeoutMs) {
-logWarning(s"Removing executor $executorId with no recent heartbeats: 
" +
-  s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
-// Asynchronously kill the executor to avoid blocking the current 
thread
-killExecutorThread.submit(new Runnable {
-  override def run(): Unit = Utils.tryLogNonFatalError {
-// Note: we want to get an executor back after expiring this one,
-// so do not simply call `sc.killExecutor` here (SPARK-8119)
-sc.killAndReplaceExecutor(executorId)
-// SPARK-27348: in case of the executors which are not gracefully 
shut down,
-// we should remove lost executors from 
CoarseGrainedSchedulerBackend manually
-// here to guarantee two things:
-// 1) explicitly remove executor information from 
CoarseGrainedSchedulerBackend 

[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

2022-08-14 Thread GitBox


Ngone51 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r945367796


##
core/src/main/scala/org/apache/spark/internal/config/Network.scala:
##
@@ -51,6 +51,21 @@ private[spark] object Network {
   .timeConf(TimeUnit.MILLISECONDS)
   
.createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString)
 
+  private[spark] val NETWORK_EXECUTOR_TIMEOUT =
+ConfigBuilder("spark.network.executorTimeout")
+  .version("3.4.0")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createOptional
+
+  private[spark] val HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT =
+ConfigBuilder("spark.network.expiryCandidatesTimeout")
+  .doc("This config is a timeout used for heartbeat receiver 
`executorExpiryCandidates`. Be" +
+"effective only when HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT is 
enabled. See" +
+"[SPARK-39984] for more details")
+  .version("3.4.0")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("30s")

Review Comment:
   +1. 
   
   We can add this config to the namespace `spark.driver.heartbeat`. cc 
@kevin85421 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor

2022-08-14 Thread GitBox


Ngone51 commented on code in PR #37411:
URL: https://github.com/apache/spark/pull/37411#discussion_r945367796


##
core/src/main/scala/org/apache/spark/internal/config/Network.scala:
##
@@ -51,6 +51,21 @@ private[spark] object Network {
   .timeConf(TimeUnit.MILLISECONDS)
   
.createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString)
 
+  private[spark] val NETWORK_EXECUTOR_TIMEOUT =
+ConfigBuilder("spark.network.executorTimeout")
+  .version("3.4.0")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createOptional
+
+  private[spark] val HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT =
+ConfigBuilder("spark.network.expiryCandidatesTimeout")
+  .doc("This config is a timeout used for heartbeat receiver 
`executorExpiryCandidates`. Be" +
+"effective only when HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT is 
enabled. See" +
+"[SPARK-39984] for more details")
+  .version("3.4.0")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("30s")

Review Comment:
   +1. 
   
   We can add this config to the namespace `spark.executor.heartbeat`. cc 
@kevin85421 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org