httfighter edited a comment on issue #23437: [SPARK-26524] If the application 
directory fails to be created on the SPARK_WORKER_…
URL: https://github.com/apache/spark/pull/23437#issuecomment-452184077
 
 
   @srowen The main process leading to this problem is analyzed below.
   
   1、The wordir is created when the worker starts. If the directory creation 
fails, the worker will fail to start directly.The code is as follows,
   
      private def createWorkDir() {
           workDir = Option(workDirPath).map(new File(_)).getOrElse(new 
File(sparkHome, "work"))
           try {
             // This sporadically fails - not sure why ... !workDir.exists() && 
!workDir.mkdirs()
             // So attempting to create and then check if directory was created 
or not.
             workDir.mkdirs()
             if ( !workDir.exists() || !workDir.isDirectory) {
               logError("Failed to create work directory " + workDir)
               System.exit(1)
             }
             assert (workDir.isDirectory)
           } catch {
             case e: Exception =>
               logError("Failed to create work directory " + workDir, e)
               System.exit(1)
           }
         }
   
   
   2、When the worker assigns an executor to an application, the application 
directory is created under the worker dir directory. If the application 
directory creation fails, the executor fails to be assigned, and the worker 
returns a failure message to the Master. At this point, these treatments are 
correct. 
   The code is as follows,
   //
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
         if (masterUrl != activeMasterUrl) {
           logWarning("Invalid Master (" + masterUrl + ") attempted to launch 
executor.")
         } else {
           try {
             logInfo("Asked to launch executor %s/%d for %s".format(appId, 
execId, appDesc.name))
   
             // Create the executor's working directory
             val executorDir = new File(workDir, appId + "/" + execId)
             if (!executorDir.mkdirs()) {
               throw new IOException("Failed to create directory " + 
executorDir)
             }
   
             // Create local dirs for the executor. These are passed to the 
executor via the
             // SPARK_EXECUTOR_DIRS environment variable, and deleted by the 
Worker when the
             // application finishes.
             val appLocalDirs = appDirectories.getOrElse(appId, {
               val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
               val dirs = localRootDirs.flatMap { dir =>
                 try {
                   val appDir = Utils.createDirectory(dir, namePrefix = 
"executor")
                   Utils.chmod700(appDir)
                   Some(appDir.getAbsolutePath())
                 } catch {
                   case e: IOException =>
                     logWarning(s"${e.getMessage}. Ignoring this directory.")
                     None
                 }
               }.toSeq
               if (dirs.isEmpty) {
                 throw new IOException("No subfolder can be created in " +
                   s"${localRootDirs.mkString(",")}.")
               }
               dirs
             })
             appDirectories(appId) = appLocalDirs
             val manager = new ExecutorRunner(
               appId,
               execId,
               appDesc.copy(command = 
Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
               cores_,
               memory_,
               self,
               workerId,
               host,
               webUi.boundPort,
               publicAddress,
               sparkHome,
               executorDir,
               workerUri,
               conf,
               appLocalDirs, ExecutorState.RUNNING)
             executors(appId + "/" + execId) = manager
             manager.start()
             coresUsed += cores_
             memoryUsed += memory_
             sendToMaster(ExecutorStateChanged(appId, execId, manager.state, 
None, None))
           } catch {
             case e: Exception =>
               logError(s"Failed to launch executor $appId/$execId for 
${appDesc.name}.", e)
               if (executors.contains(appId + "/" + execId)) {
                 executors(appId + "/" + execId).kill()
                 executors -= appId + "/" + execId
               }
               sendToMaster(ExecutorStateChanged(appId, execId, 
ExecutorState.FAILED,
                 Some(e.toString), None))
           }
         }
   
   3、After the Master receives the message, it will do the processing. If the 
application does not have any executors that have been successfully assigned, 
the application will be forced to quit if the number of failed executors has 
reached the maximum number of times. 
   However, the worker dir corruption does not meet the above application exit 
condition, on the worker node with the normal worker dir state, the executor is 
successfully allocated. So the master does not do anything. Then Master 
continue to assign executors to the app.  The code is as follows,
   
    case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
         val execOption = idToApp.get(appId).flatMap(app => 
app.executors.get(execId))
         execOption match {
           case Some(exec) =>
             val appInfo = idToApp(appId)
             val oldState = exec.state
             exec.state = state
   
             if (state == ExecutorState.RUNNING) {
               assert(oldState == ExecutorState.LAUNCHING,
                 s"executor $execId state transfer from $oldState to RUNNING is 
illegal")
               appInfo.resetRetryCount()
             }
   
             exec.application.driver.send(ExecutorUpdated(execId, state, 
message, exitStatus, false))
   
             if (ExecutorState.isFinished(state)) {
               // Remove this executor from the worker and app
               logInfo(s"Removing executor ${exec.fullId} because it is $state")
               // If an application has already finished, preserve its
               // state to display its information properly on the UI
               if (!appInfo.isFinished) {
                 appInfo.removeExecutor(exec)
               }
               exec.worker.removeExecutor(exec)
   
               val normalExit = exitStatus == Some(0)
               // Only retry certain number of times so we don't go into an 
infinite loop.
               // Important note: this code path is not exercised by tests, so 
be very careful when
               // changing this `if` condition.
               if (!normalExit
                   && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
                   && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this 
application-killing path
                 val execs = appInfo.executors.values
                 if (!execs.exists(_.state == ExecutorState.RUNNING)) {
                   logError(s"Application ${appInfo.desc.name} with ID 
${appInfo.id} failed " +
                     s"${appInfo.retryCount} times; removing it")
                   removeApplication(appInfo, ApplicationState.FAILED)
                 }
               }
             }
             schedule()
           case None =>
             logWarning(s"Got status update for unknown executor 
$appId/$execId")
         }
   
   4、When the Master allocates an executor, it will only judge whether the 
Worker node is available according to the resource. The Master continues to 
allocate executors on the Worker node where the worker dir is corrupted, and 
the allocation result continues to fail, looping back and forth, and the 
executor number reaches tens of thousands in a short time. If the application 
resides for a long time (for example, the ThriftServer2 ), the Master service  
and Worker service has been busy with RPC communication, affecting service 
status.
   The code is as follows,
           // Filter out workers that don't have enough resources to launch an 
executor
           val usableWorkers = workers.toArray.filter(_.state == 
WorkerState.ALIVE)
             .filter(worker => worker.memoryFree >= 
app.desc.memoryPerExecutorMB &&
               worker.coresFree >= coresPerExecutor)
             .sortBy(_.coresFree).reverse

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to