Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/2350#discussion_r17398299
--- Diff:
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
---
@@ -36,113 +36,114 @@ private[spark] class YarnClientSchedulerBackend(
var client: Client = null
var appId: ApplicationId = null
- var checkerThread: Thread = null
var stopping: Boolean = false
var totalExpectedExecutors = 0
- private[spark] def addArg(optionName: String, envVar: String, sysProp:
String,
- arrayBuf: ArrayBuffer[String]) {
- if (System.getenv(envVar) != null) {
- arrayBuf += (optionName, System.getenv(envVar))
- } else if (sc.getConf.contains(sysProp)) {
- arrayBuf += (optionName, sc.getConf.get(sysProp))
- }
- }
-
+ /**
+ * Create a Yarn client to submit an application to the ResourceManager.
+ * This waits until the application is running.
+ */
override def start() {
super.start()
-
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
val argsArrayBuf = new ArrayBuffer[String]()
- argsArrayBuf += (
- "--args", hostport
- )
-
- // process any optional arguments, given either as environment
variables
- // or system properties. use the defaults already defined in
ClientArguments
- // if things aren't specified. system properties override environment
- // variables.
- List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
- ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
- ("--num-executors", "SPARK_WORKER_INSTANCES",
"spark.executor.instances"),
- ("--num-executors", "SPARK_EXECUTOR_INSTANCES",
"spark.executor.instances"),
- ("--executor-memory", "SPARK_WORKER_MEMORY",
"spark.executor.memory"),
- ("--executor-memory", "SPARK_EXECUTOR_MEMORY",
"spark.executor.memory"),
- ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
- ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
- ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
- ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"))
- .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar,
sysProp, argsArrayBuf) }
-
- logDebug("ClientArguments called with: " + argsArrayBuf)
+ argsArrayBuf += ("--arg", hostport)
+ argsArrayBuf ++= getExtraClientArguments
+
+ logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors = args.numExecutors
client = new Client(args, conf)
- appId = client.runApp()
- waitForApp()
- checkerThread = yarnApplicationStateCheckerThread()
+ appId = client.submitApplication()
+ waitForApplication()
+ asyncMonitorApplication()
}
- def waitForApp() {
-
- // TODO : need a better way to find out whether the executors are
ready or not
- // maybe by resource usage report?
- while(true) {
- val report = client.getApplicationReport(appId)
-
- logInfo("Application report from ASM: \n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
+ /**
+ * Return any extra command line arguments to be passed to Client
provided in the form of
+ * environment variables or Spark properties.
+ */
+ private def getExtraClientArguments: Seq[String] = {
+ val extraArgs = new ArrayBuffer[String]
+ val optionTuples = // List of (target Client argument, environment
variable, Spark property)
+ List(
+ ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
+ ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
+ ("--num-executors", "SPARK_WORKER_INSTANCES",
"spark.executor.instances"),
+ ("--num-executors", "SPARK_EXECUTOR_INSTANCES",
"spark.executor.instances"),
+ ("--executor-memory", "SPARK_WORKER_MEMORY",
"spark.executor.memory"),
+ ("--executor-memory", "SPARK_EXECUTOR_MEMORY",
"spark.executor.memory"),
+ ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
+ ("--executor-cores", "SPARK_EXECUTOR_CORES",
"spark.executor.cores"),
+ ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+ ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")
)
-
- // Ready to go, or already gone.
- val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.RUNNING) {
- return
- } else if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- throw new SparkException("Yarn application already ended," +
- "might be killed or not able to launch application master.")
+ optionTuples.foreach { case (optionName, envVar, sparkProp) =>
+ if (System.getenv(envVar) != null) {
+ extraArgs += (optionName, System.getenv(envVar))
+ } else if (sc.getConf.contains(sparkProp)) {
+ extraArgs += (optionName, sc.getConf.get(sparkProp))
--- End diff --
@vanzin I just noticed this gives preference to environment variables over
Spark configs. Elsewhere in Spark we try to honor the system property before
the env var if both are set, and this is consistent with @sryza's original
comment "system properties override environment variables" in L67 of the old
code.
I realize I gave the green lights for this change in #539, but now this has
become slightly inconsistent. On the other hand, without the changes in that PR
the Yarn app name won't be picked up. Should we special case the
`SPARK_YARN_NAME` (and maybe `SPARK_YARN_QUEUE`) and use your fix in #539 only
for this variable? Any thoughts?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]