[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor
Ngone51 commented on code in PR #37411: URL: https://github.com/apache/spark/pull/37411#discussion_r960664710 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2422,4 +2422,24 @@ package object config { .version("3.4.0") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("5s") + + private[spark] val HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT = +ConfigBuilder("spark.driver.heartbeat.checkWorkerLastHeartbeat") + .doc("If this config is set to true, heartbeat receiver will check worker's heartbeat when" + +"the receiver does not receive any heartbeat from an executor during a period. In" + +"addition, this config is only for standalone scheduler. See [SPARK-39984] for more" + +"details.") + .internal() + .version("3.4.0") + .booleanConf + .createWithDefault(false) + + private[spark] val HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT = +ConfigBuilder("spark.driver.heartbeat.expiryCandidatesTimeout") + .doc("This config is a timeout used for heartbeat receiver `executorExpiryCandidates`. Be" + +"effective only when HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT is enabled. See" + Review Comment: ```suggestion s"effective only when ${HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT.key} is enabled. See" + ``` ## core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala: ## @@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) private[spark] var scheduler: TaskScheduler = null - // executor ID -> timestamp of when the last heartbeat from this executor was received + /** + * [SPARK-39984] + * Please make sure the intersection between `executorLastSeen` and `executorExpiryCandidates` is + * an empty set. If the intersection is not empty, it is possible to never kill the executor until + * the executor recovers. When an executor is in both `executorLastSeen` and + * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in `executorExpiryCandidates` + * may update if the worker sends heartbeats to master normally. + * + * `executorLastSeen`: + * - key: executor ID + * - value: timestamp of when the last heartbeat from this executor was received + * + * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat + * - key: executor ID + * - value: timestamp of when the last heartbeat from the worker was received + * + * when driver does not receive any heartbeat from an executor for `executorTimeoutMs` seconds, + * the driver will ask master for the last heartbeat from the worker which the executor is running + * on. + */ private val executorLastSeen = new HashMap[String, Long] + private val executorExpiryCandidates = new HashMap[String, Long] private val executorTimeoutMs = sc.conf.get( config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT - ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")) + ).getOrElse( +sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match { + case Some(executorTimeout) => executorTimeout + case None => Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s") +} + ) private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) private val executorHeartbeatIntervalMs = sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL) + /** + * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend. + * + * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout. + * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates. + */ + private val checkWorkerLastHeartbeat = { Review Comment: > However, it cannot totally prevent this edge case Theoretically, I agree it doesn't but given that checkTimeoutIntervalMs by default is 60s it's almost impossible to have scheduler backend not initialized when the first expireDeadHosts raised. > Hence, my thought is to define checkWorkerLastHeartbeat as a function, and we can totally prevent the edge case. Does it make sense? I don't think it makes a big difference with the already defined `isStandalone()` function. Maybe just leave it as is if we have no better idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor
Ngone51 commented on code in PR #37411: URL: https://github.com/apache/spark/pull/37411#discussion_r960646158 ## core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala: ## @@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) private[spark] var scheduler: TaskScheduler = null - // executor ID -> timestamp of when the last heartbeat from this executor was received + /** + * [SPARK-39984] + * Please make sure the intersection between `executorLastSeen` and `executorExpiryCandidates` is + * an empty set. If the intersection is not empty, it is possible to never kill the executor until + * the executor recovers. When an executor is in both `executorLastSeen` and + * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in `executorExpiryCandidates` + * may update if the worker sends heartbeats to master normally. + * + * `executorLastSeen`: + * - key: executor ID + * - value: timestamp of when the last heartbeat from this executor was received + * + * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat + * - key: executor ID + * - value: timestamp of when the last heartbeat from the worker was received + * + * when driver does not receive any heartbeat from an executor for `executorTimeoutMs` seconds, + * the driver will ask master for the last heartbeat from the worker which the executor is running + * on. + */ private val executorLastSeen = new HashMap[String, Long] + private val executorExpiryCandidates = new HashMap[String, Long] private val executorTimeoutMs = sc.conf.get( config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT - ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")) + ).getOrElse( +sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match { + case Some(executorTimeout) => executorTimeout + case None => Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s") +} + ) private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) private val executorHeartbeatIntervalMs = sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL) + /** + * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend. + * + * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout. + * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates. + */ + private val checkWorkerLastHeartbeat = { Review Comment: > However, if the first expireDeadHosts trigger is prior to scheduler backend initialization, the value of checkWorkerLastHeartbeat will be false I thought there would be an initial delay for the first `expireDeadHosts` so `checkWorkerLastHeartbeat` is less likely to return false. But seems it doesn't have the initial delay, though I think it doesn't make sense. I actually think we can set the initial delay to its check interval: `checkTimeoutIntervalMs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor
Ngone51 commented on code in PR #37411: URL: https://github.com/apache/spark/pull/37411#discussion_r953806281 ## core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala: ## @@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) private[spark] var scheduler: TaskScheduler = null - // executor ID -> timestamp of when the last heartbeat from this executor was received + /** + * [SPARK-39984] + * Please make sure the intersection between `executorLastSeen` and `executorExpiryCandidates` is + * an empty set. If the intersection is not empty, it is possible to never kill the executor until + * the executor recovers. When an executor is in both `executorLastSeen` and + * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in `executorExpiryCandidates` + * may update if the worker sends heartbeats to master normally. + * + * `executorLastSeen`: + * - key: executor ID + * - value: timestamp of when the last heartbeat from this executor was received + * + * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat + * - key: executor ID + * - value: timestamp of when the last heartbeat from the worker was received + * + * when driver does not receive any heartbeat from an executor for `executorTimeoutMs` seconds, + * the driver will ask master for the last heartbeat from the worker which the executor is running + * on. + */ private val executorLastSeen = new HashMap[String, Long] + private val executorExpiryCandidates = new HashMap[String, Long] private val executorTimeoutMs = sc.conf.get( config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT - ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")) + ).getOrElse( +sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match { + case Some(executorTimeout) => executorTimeout + case None => Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s") +} + ) private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) private val executorHeartbeatIntervalMs = sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL) + /** + * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend. + * + * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout. + * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates. + */ + private val checkWorkerLastHeartbeat = { Review Comment: Shall we make this a lazy val? Then, we can check `sc.schedulerBackend.isInstanceOf[StandaloneSchedulerBackend]` directly here instead of calling a fucntion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor
Ngone51 commented on code in PR #37411: URL: https://github.com/apache/spark/pull/37411#discussion_r953806461 ## core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala: ## @@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) private[spark] var scheduler: TaskScheduler = null - // executor ID -> timestamp of when the last heartbeat from this executor was received + /** + * [SPARK-39984] + * Please make sure the intersection between `executorLastSeen` and `executorExpiryCandidates` is + * an empty set. If the intersection is not empty, it is possible to never kill the executor until + * the executor recovers. When an executor is in both `executorLastSeen` and + * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in `executorExpiryCandidates` + * may update if the worker sends heartbeats to master normally. + * + * `executorLastSeen`: + * - key: executor ID + * - value: timestamp of when the last heartbeat from this executor was received + * + * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat + * - key: executor ID + * - value: timestamp of when the last heartbeat from the worker was received + * + * when driver does not receive any heartbeat from an executor for `executorTimeoutMs` seconds, + * the driver will ask master for the last heartbeat from the worker which the executor is running + * on. + */ private val executorLastSeen = new HashMap[String, Long] + private val executorExpiryCandidates = new HashMap[String, Long] private val executorTimeoutMs = sc.conf.get( config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT - ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")) + ).getOrElse( +sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match { + case Some(executorTimeout) => executorTimeout + case None => Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s") +} + ) private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) private val executorHeartbeatIntervalMs = sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL) + /** + * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend. + * + * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout. + * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates. + */ + private val checkWorkerLastHeartbeat = { +val isEnabled = sc.conf.get(config.HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT) +if (isEnabled) logWarning(s"Keep `expiryCandidatesTimeout` larger than `HEARTBEAT_MILLIS` in" + + s"deploy/worker/Worker.scala to know whether master lost any heartbeat from the" + + s"worker or not.") +isEnabled + } + + private val expiryCandidatesTimeout = checkWorkerLastHeartbeat match { Review Comment: lazy too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor
Ngone51 commented on code in PR #37411: URL: https://github.com/apache/spark/pull/37411#discussion_r953803448 ## core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala: ## @@ -77,17 +77,61 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) private[spark] var scheduler: TaskScheduler = null - // executor ID -> timestamp of when the last heartbeat from this executor was received + /** + * [SPARK-39984] + * Please make sure the intersection between `executorLastSeen` and `executorExpiryCandidates` is + * an empty set. If the intersection is not empty, it is possible to never kill the executor until + * the executor recovers. When an executor is in both `executorLastSeen` and + * `executorExpiryCandidates`, the value of `workerLastHeartbeat` in `executorExpiryCandidates` + * may update if the worker sends heartbeats to master normally. + * + * `executorLastSeen`: + * - key: executor ID + * - value: timestamp of when the last heartbeat from this executor was received + * + * `executorExpiryCandidates`: executor ID -> WorkerLastHeartbeat + * - key: executor ID + * - value: timestamp of when the last heartbeat from the worker was received + * + * when driver does not receive any heartbeat from an executor for `executorTimeoutMs` seconds, + * the driver will ask master for the last heartbeat from the worker which the executor is running + * on. + */ private val executorLastSeen = new HashMap[String, Long] + private val executorExpiryCandidates = new HashMap[String, Long] private val executorTimeoutMs = sc.conf.get( config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT - ).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")) + ).getOrElse( +sc.conf.get(Network.NETWORK_EXECUTOR_TIMEOUT) match { + case Some(executorTimeout) => executorTimeout + case None => Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s") +} + ) private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) private val executorHeartbeatIntervalMs = sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL) + /** + * Currently, [SPARK-39984] is only for StandaloneSchedulerBackend. + * + * `checkWorkerLastHeartbeat`: A flag to enable two-phase executor timeout. + * `expiryCandidatesTimeout`: The timeout used for executorExpiryCandidates. + */ + private val checkWorkerLastHeartbeat = { +val isEnabled = sc.conf.get(config.HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT) +if (isEnabled) logWarning(s"Keep `expiryCandidatesTimeout` larger than `HEARTBEAT_MILLIS` in" + + s"deploy/worker/Worker.scala to know whether master lost any heartbeat from the" + + s"worker or not.") +isEnabled + } + + private val expiryCandidatesTimeout = checkWorkerLastHeartbeat match { +case true => sc.conf.get(config.HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT) Review Comment: Shall we move this warning to here? And reword it like this: "Worker heartbeat check enabled. Note it only works normally if the configured ${config.HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT.key} is larger than worker's heartbeat interval." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor
Ngone51 commented on code in PR #37411: URL: https://github.com/apache/spark/pull/37411#discussion_r953765470 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2398,4 +2398,20 @@ package object config { .version("3.3.0") .intConf .createWithDefault(5) + + private[spark] val HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT = +ConfigBuilder("spark.driver.heartbeat.checkWorkerLastHeartbeat") + .internal() Review Comment: Add doc for it and mention that this only works for Standalone. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor
Ngone51 commented on code in PR #37411: URL: https://github.com/apache/spark/pull/37411#discussion_r945376133 ## core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala: ## @@ -199,41 +241,120 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) removeExecutor(executorRemoved.executorId) } + private def killExecutor(executorId: String, timeout: Long): Unit = { +logWarning(s"Removing executor $executorId with no recent heartbeats: " + + s"${timeout} ms exceeds timeout $executorTimeoutMs ms") +killExecutorThread.submit(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { +// Note: we want to get an executor back after expiring this one, +// so do not simply call `sc.killExecutor` here (SPARK-8119) +sc.killAndReplaceExecutor(executorId) +// SPARK-27348: in case of the executors which are not gracefully shut down, +// we should remove lost executors from CoarseGrainedSchedulerBackend manually +// here to guarantee two things: +// 1) explicitly remove executor information from CoarseGrainedSchedulerBackend for +//a lost executor instead of waiting for disconnect message +// 2) call scheduler.executorLost() underlying to fail any tasks assigned to +//those executors to avoid app hang +sc.schedulerBackend match { + case backend: CoarseGrainedSchedulerBackend => +// TODO (SPARK-39984): Update causedByApp when we have a hanging task detector +backend.driverEndpoint.send(RemoveExecutor(executorId, + ExecutorProcessLost( +s"Executor heartbeat timed out after ${timeout} ms"))) + // LocalSchedulerBackend is used locally and only has one single executor + case _: LocalSchedulerBackend => + + case other => throw new UnsupportedOperationException( +s"Unknown scheduler backend: ${other.getClass}") +} + } +}) + } + + private def isStandalone(): Boolean = { +sc.schedulerBackend match { + case backend: StandaloneSchedulerBackend => true + case _ => false +} + } + + private def removeExecutorFromExpiryCandidates(executorId: String): Unit = { +if (checkWorkerLastHeartbeat && isStandalone()) { + executorExpiryCandidates.remove(executorId) +} + } + private def expireDeadHosts(): Unit = { + /** + * [SPARK-39984] + * The driver’s HeartbeatReceiver will expire an executor if it does not receive any heartbeat + * from the executor for `executorTimeoutMs` (default 120s) seconds. However, lowering from 120 + * seconds has other challenges. For example: when executor is performing full GC, it cannot + * send/reply any message for tens of seconds (based on your environment). Hence, + * HeartbeatReceiver cannot whether the heartbeat loss is caused by network issues or other + * reasons (e.g. full GC). To address this, we designed a new Heartbeat Receiver mechanism for + * standalone deployments. + * + * For standalone deployments: + * If driver does not receive any heartbeat from the executor for `executorTimeoutMs` seconds, + * HeartbeatReceiver will send a request to master to ask for the latest heartbeat from the + * worker which the executor runs on. HeartbeatReceiver can determine whether the heartbeat loss + * is caused by network issues or other issues (e.g. GC). If the heartbeat loss is not caused by + * network issues, the HeartbeatReceiver will put the executor into `executorExpiryCandidates` + * rather than expiring it immediately. + */ logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.") +logWarning(s"Keep `expiryCandidatesTimeout` larger than `HEARTBEAT_MILLIS` in" + + s"deploy/worker/Worker.scala to know whether master lost any heartbeat from the" + + s"worker or not.") val now = clock.getTimeMillis() -for ((executorId, lastSeenMs) <- executorLastSeen) { - if (now - lastSeenMs > executorTimeoutMs) { -logWarning(s"Removing executor $executorId with no recent heartbeats: " + - s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -// Asynchronously kill the executor to avoid blocking the current thread -killExecutorThread.submit(new Runnable { - override def run(): Unit = Utils.tryLogNonFatalError { -// Note: we want to get an executor back after expiring this one, -// so do not simply call `sc.killExecutor` here (SPARK-8119) -sc.killAndReplaceExecutor(executorId) -// SPARK-27348: in case of the executors which are not gracefully shut down, -// we should remove lost executors from CoarseGrainedSchedulerBackend manually -// here to guarantee two things: -// 1) explicitly remove executor information from CoarseGrainedSchedulerBackend
[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor
Ngone51 commented on code in PR #37411: URL: https://github.com/apache/spark/pull/37411#discussion_r945367796 ## core/src/main/scala/org/apache/spark/internal/config/Network.scala: ## @@ -51,6 +51,21 @@ private[spark] object Network { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString) + private[spark] val NETWORK_EXECUTOR_TIMEOUT = +ConfigBuilder("spark.network.executorTimeout") + .version("3.4.0") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + + private[spark] val HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT = +ConfigBuilder("spark.network.expiryCandidatesTimeout") + .doc("This config is a timeout used for heartbeat receiver `executorExpiryCandidates`. Be" + +"effective only when HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT is enabled. See" + +"[SPARK-39984] for more details") + .version("3.4.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("30s") Review Comment: +1. We can add this config to the namespace `spark.driver.heartbeat`. cc @kevin85421 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #37411: [SPARK-39984][CORE] Check workerLastHeartbeat with master before HeartbeatReceiver expires an executor
Ngone51 commented on code in PR #37411: URL: https://github.com/apache/spark/pull/37411#discussion_r945367796 ## core/src/main/scala/org/apache/spark/internal/config/Network.scala: ## @@ -51,6 +51,21 @@ private[spark] object Network { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString) + private[spark] val NETWORK_EXECUTOR_TIMEOUT = +ConfigBuilder("spark.network.executorTimeout") + .version("3.4.0") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + + private[spark] val HEARTBEAT_EXPIRY_CANDIDATES_TIMEOUT = +ConfigBuilder("spark.network.expiryCandidatesTimeout") + .doc("This config is a timeout used for heartbeat receiver `executorExpiryCandidates`. Be" + +"effective only when HEARTBEAT_RECEIVER_CHECK_WORKER_LAST_HEARTBEAT is enabled. See" + +"[SPARK-39984] for more details") + .version("3.4.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("30s") Review Comment: +1. We can add this config to the namespace `spark.executor.heartbeat`. cc @kevin85421 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org