kevin85421 opened a new pull request, #37385:
URL: https://github.com/apache/spark/pull/37385

   ### What changes were proposed in this pull request?
   ### Q1. How many ExecutorExitCode do we have?
   Based on ExecutorExitCode.scala, we found there are 4 ExecutorExitCode.
   * `DISK_STORE_FAILED_TO_CREATE_DIR`: Executor has a variable BlockManager 
(`env.blockManager`), and BlockManager will create a DiskBlockManager when it 
initializes. When a DiskBlockManager is created, it will try to create a local 
directory. As shown in the following code snippet, if the directory creation is 
fail, DiskBlockManager will close the JVM with the exit code 
`DISK_STORE_FAILED_TO_CREATE_DIR`.
   ```scala
   private[spark] val localDirs: Array[File] = createLocalDirs(conf)
   if (localDirs.isEmpty) {
     logError("Failed to create any local dir.")
     System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
   }
   ```
   
   * `EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE`: Unused
   * `EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR`: Unused
   * `HEARTBEAT_FAILURE`: When an Executor is created, it will start a 
Heartbeater thread to execute the function `reportHeartBeat()` periodically. 
The function `reportHeartBeat()` will send a heartbeat to driver. If the 
heartbeater does not receive the response in time, it will increment 
`heartbeatFailures` by one. When the value of `heartbeatFailures` is higher or 
equal to `HEARTBEAT_MAX_FAILURES`, the heartbeater will close the JVM with the 
exit code `ExecutorExitCode.HEARTBEAT_FAILURE`.
   
   ```scala
   if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
     logError(s"Exit as unable to send heartbeats to driver " +
       s"more than $HEARTBEAT_MAX_FAILURES times")
     System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
   }
   ```
   
   ### Q2. How does ExecutorExitCode pass to driver?
   In ExecutorRunner, an Executor will be run as a new process. Hence, Executor 
is running in a different JVM. When Executor called the function 
`System.exit(...)` to close Executor's JVM, the exitCode will be returned to 
ExecutorRunner. Then, Executor will send the exit code to Worker. 
   
   * A high-level overview: **ExecutorRunner -> Worker -> Master -> Driver**
   
   Here, I will talk more details about how does the Executor exit code pass 
from ExecutorRunner to Driver.
   
   1. ExecutorRunner sends an `ExecutorStateChanged` message to Worker.
   ```scala
   val exitCode = process.waitFor()
   state = ExecutorState.EXITED
   val message = "Command exited with code " + exitCode
   worker.send(ExecutorStateChanged(appId, execId, state, Some(message), 
Some(exitCode)))
   ```
   
   2. When Worker receives the `ExecutorStateChanged` message, Worker will 
forward the message to Master.
   ```scala
   syncExecutorStateWithMaster(executorStateChanged)
   ```
   
   3. When Master receives the `ExecutorStateChanged` message, it will use the 
exit code to create an `ExecutorUpdated` message and send the message to 
StandaloneAppClient (Driver).
   ```scala
   exec.application.driver.send(ExecutorUpdated(execId, state, message, 
exitStatus, None))
   ```
   
   4. When StandaloneAppClient receives the `ExecutorUpdated` message, it will 
call the function `executorRemoved` (SchedulerBackend's function) and the 
function assumes every executor exit is caused by task failure 
(`exitCausedByApp = true`).
   ```scala
   listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, 
workerHost)
   ```
   
   ```scala
   override def executorRemoved(
       fullId: String,
       message: String,
       exitStatus: Option[Int],
       workerHost: Option[String]): Unit = {
     val reason: ExecutorLossReason = exitStatus match {
       case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
       case None => ExecutorProcessLost(message, workerHost)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
     removeExecutor(fullId.split("/")(1), reason)
   }
   ```
   
   ### Q3. How to determine `exitCausedByApp` based on ExecutorExitCode?
   * `DISK_STORE_FAILED_TO_CREATE_DIR` => hardware failure => `exitCausedByApp 
= false`
   * `HEARTBEAT_FAILURE` => network failure => `exitCausedByApp = false`
   
   ### Why are the changes needed?
   There are a lot of possible reasons to cause an executor exit. However, the 
driver will assume every executor exit is caused by task failure. As shown in 
the following code snippet, `StandaloneSchedulerBackend` will determine the 
executor exit is caused by task failure (`exitCausedByApp = true`) no matter 
the value of ExecutorExitCode (`code`).
   
   ```scala
   override def executorRemoved(
       fullId: String,
       message: String,
       exitStatus: Option[Int],
       workerHost: Option[String]): Unit = {
     val reason: ExecutorLossReason = exitStatus match {
       case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
       case None => ExecutorProcessLost(message, workerHost)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
     removeExecutor(fullId.split("/")(1), reason)
   }
   ```
   
   The assumption is wrong. For example, when DiskBlockManager on Executor 
fails to create a directory, it will close executor’s JVM with the exit code 
`DISK_STORE_FAILED_TO_CREATE_DIR`. Obviously, when the driver received the exit 
code `DISK_STORE_FAILED_TO_CREATE_DIR`, the executor exit is caused by hardware 
failure rather than task failure. Hence, this pull request aims to determine 
whether the executor exit is caused by task failures or not based on 
ExecutorExitCode.
   
   To address this issue, we should to understand three questions:
   
   * Q1. How many ExecutorExitCode do we have?
   * Q2. How does ExecutorExitCode pass to driver?
   * Q3. How to determine `exitCausedByApp` based on ExecutorExitCode?
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   [TODO] We need to test this PR with the support of other PR. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to