kevin85421 opened a new pull request, #37385:
URL: https://github.com/apache/spark/pull/37385
### What changes were proposed in this pull request?
### Q1. How many ExecutorExitCode do we have?
Based on ExecutorExitCode.scala, we found there are 4 ExecutorExitCode.
* `DISK_STORE_FAILED_TO_CREATE_DIR`: Executor has a variable BlockManager
(`env.blockManager`), and BlockManager will create a DiskBlockManager when it
initializes. When a DiskBlockManager is created, it will try to create a local
directory. As shown in the following code snippet, if the directory creation is
fail, DiskBlockManager will close the JVM with the exit code
`DISK_STORE_FAILED_TO_CREATE_DIR`.
```scala
private[spark] val localDirs: Array[File] = createLocalDirs(conf)
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
```
* `EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE`: Unused
* `EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR`: Unused
* `HEARTBEAT_FAILURE`: When an Executor is created, it will start a
Heartbeater thread to execute the function `reportHeartBeat()` periodically.
The function `reportHeartBeat()` will send a heartbeat to driver. If the
heartbeater does not receive the response in time, it will increment
`heartbeatFailures` by one. When the value of `heartbeatFailures` is higher or
equal to `HEARTBEAT_MAX_FAILURES`, the heartbeater will close the JVM with the
exit code `ExecutorExitCode.HEARTBEAT_FAILURE`.
```scala
if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
logError(s"Exit as unable to send heartbeats to driver " +
s"more than $HEARTBEAT_MAX_FAILURES times")
System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
}
```
### Q2. How does ExecutorExitCode pass to driver?
In ExecutorRunner, an Executor will be run as a new process. Hence, Executor
is running in a different JVM. When Executor called the function
`System.exit(...)` to close Executor's JVM, the exitCode will be returned to
ExecutorRunner. Then, Executor will send the exit code to Worker.
* A high-level overview: **ExecutorRunner -> Worker -> Master -> Driver**
Here, I will talk more details about how does the Executor exit code pass
from ExecutorRunner to Driver.
1. ExecutorRunner sends an `ExecutorStateChanged` message to Worker.
```scala
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
worker.send(ExecutorStateChanged(appId, execId, state, Some(message),
Some(exitCode)))
```
2. When Worker receives the `ExecutorStateChanged` message, Worker will
forward the message to Master.
```scala
syncExecutorStateWithMaster(executorStateChanged)
```
3. When Master receives the `ExecutorStateChanged` message, it will use the
exit code to create an `ExecutorUpdated` message and send the message to
StandaloneAppClient (Driver).
```scala
exec.application.driver.send(ExecutorUpdated(execId, state, message,
exitStatus, None))
```
4. When StandaloneAppClient receives the `ExecutorUpdated` message, it will
call the function `executorRemoved` (SchedulerBackend's function) and the
function assumes every executor exit is caused by task failure
(`exitCausedByApp = true`).
```scala
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus,
workerHost)
```
```scala
override def executorRemoved(
fullId: String,
message: String,
exitStatus: Option[Int],
workerHost: Option[String]): Unit = {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => ExecutorProcessLost(message, workerHost)
}
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason)
}
```
### Q3. How to determine `exitCausedByApp` based on ExecutorExitCode?
* `DISK_STORE_FAILED_TO_CREATE_DIR` => hardware failure => `exitCausedByApp
= false`
* `HEARTBEAT_FAILURE` => network failure => `exitCausedByApp = false`
### Why are the changes needed?
There are a lot of possible reasons to cause an executor exit. However, the
driver will assume every executor exit is caused by task failure. As shown in
the following code snippet, `StandaloneSchedulerBackend` will determine the
executor exit is caused by task failure (`exitCausedByApp = true`) no matter
the value of ExecutorExitCode (`code`).
```scala
override def executorRemoved(
fullId: String,
message: String,
exitStatus: Option[Int],
workerHost: Option[String]): Unit = {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => ExecutorProcessLost(message, workerHost)
}
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason)
}
```
The assumption is wrong. For example, when DiskBlockManager on Executor
fails to create a directory, it will close executor’s JVM with the exit code
`DISK_STORE_FAILED_TO_CREATE_DIR`. Obviously, when the driver received the exit
code `DISK_STORE_FAILED_TO_CREATE_DIR`, the executor exit is caused by hardware
failure rather than task failure. Hence, this pull request aims to determine
whether the executor exit is caused by task failures or not based on
ExecutorExitCode.
To address this issue, we should to understand three questions:
* Q1. How many ExecutorExitCode do we have?
* Q2. How does ExecutorExitCode pass to driver?
* Q3. How to determine `exitCausedByApp` based on ExecutorExitCode?
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
[TODO] We need to test this PR with the support of other PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]