httfighter edited a comment on issue #23437: [SPARK-26524] If the application directory fails to be created on the SPARK_WORKER_… URL: https://github.com/apache/spark/pull/23437#issuecomment-452184077 @srowen The main process leading to this problem is analyzed below. 1、The wordir is created when the worker starts. If the directory creation fails, the worker will fail to start directly.The code is as follows, private def createWorkDir() { workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) try { // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs() // So attempting to create and then check if directory was created or not. workDir.mkdirs() if ( !workDir.exists() || !workDir.isDirectory) { logError("Failed to create work directory " + workDir) System.exit(1) } assert (workDir.isDirectory) } catch { case e: Exception => logError("Failed to create work directory " + workDir, e) System.exit(1) } } 2、When the worker assigns an executor to an application, the application directory is created under the worker dir directory. If the application directory creation fails, the executor fails to be assigned, and the worker returns a failure message to the Master. At this point, these treatments are correct. The code is as follows, case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // Create the executor's working directory val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } // Create local dirs for the executor. These are passed to the executor via the // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the // application finishes. val appLocalDirs = appDirectories.getOrElse(appId, { val localRootDirs = Utils.getOrCreateLocalRootDirs(conf) val dirs = localRootDirs.flatMap { dir => try { val appDir = Utils.createDirectory(dir, namePrefix = "executor") Utils.chmod700(appDir) Some(appDir.getAbsolutePath()) } catch { case e: IOException => logWarning(s"${e.getMessage}. Ignoring this directory.") None } }.toSeq if (dirs.isEmpty) { throw new IOException("No subfolder can be created in " + s"${localRootDirs.mkString(",")}.") } dirs }) appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, workerUri, conf, appLocalDirs, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) } catch { case e: Exception => logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(e.toString), None)) } } 3、After the Master receives the message, it will do the processing. If the application does not have any executors that have been successfully assigned, the application will be forced to quit if the number of failed executors has reached the maximum number of times. However, the worker dir corruption does not meet the above application exit condition, on the worker node with the normal worker dir state, the executor is successfully allocated. So the master does not do anything. Then Master continue to assign executors to the app. The code is as follows, case ExecutorStateChanged(appId, execId, state, message, exitStatus) => val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => val appInfo = idToApp(appId) val oldState = exec.state exec.state = state if (state == ExecutorState.RUNNING) { assert(oldState == ExecutorState.LAUNCHING, s"executor $execId state transfer from $oldState to RUNNING is illegal") appInfo.resetRetryCount() } exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app logInfo(s"Removing executor ${exec.fullId} because it is $state") // If an application has already finished, preserve its // state to display its information properly on the UI if (!appInfo.isFinished) { appInfo.removeExecutor(exec) } exec.worker.removeExecutor(exec) val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. // Important note: this code path is not exercised by tests, so be very careful when // changing this `if` condition. if (!normalExit && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount} times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } } schedule() case None => logWarning(s"Got status update for unknown executor $appId/$execId") } 4、When the Master allocates an executor, it will only judge whether the Worker node is available according to the resource. The Master continues to allocate executors on the Worker node where the worker dir is corrupted, and the allocation result continues to fail, looping back and forth, and the executor number reaches tens of thousands in a short time. If the application resides for a long time (for example, the ThriftServer2 ), the Master service and Worker service has been busy with RPC communication, affecting service status.The code is as follows, // Filter out workers that don't have enough resources to launch an executor val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor) .sortBy(_.coresFree).reverse
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org