Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2516#discussion_r18909952
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
    @@ -83,216 +79,163 @@ object SparkSubmit {
        *           (4) the main class for the child
        */
       private[spark] def createLaunchEnv(args: SparkSubmitArguments)
    -      : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], 
String) = {
    +      : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], 
Map[String, String], String) = {
     
         // Values to return
    -    val childArgs = new ArrayBuffer[String]()
    -    val childClasspath = new ArrayBuffer[String]()
    -    val sysProps = new HashMap[String, String]()
    +    val childArgs = new mutable.ArrayBuffer[String]()
    +    val childClasspath = new mutable.ArrayBuffer[String]()
    +    val sysProps = new mutable.HashMap[String, String]()
         var childMainClass = ""
     
    -    // Set the cluster manager
    -    val clusterManager: Int = args.master match {
    -      case m if m.startsWith("yarn") => YARN
    -      case m if m.startsWith("spark") => STANDALONE
    -      case m if m.startsWith("mesos") => MESOS
    -      case m if m.startsWith("local") => LOCAL
    -      case _ => printErrorAndExit("Master must start with yarn, spark, 
mesos, or local"); -1
    -    }
    -
    -    // Set the deploy mode; default is client mode
    -    var deployMode: Int = args.deployMode match {
    -      case "client" | null => CLIENT
    -      case "cluster" => CLUSTER
    -      case _ => printErrorAndExit("Deploy mode must be either client or 
cluster"); -1
    -    }
    -
    -    // Because "yarn-cluster" and "yarn-client" encapsulate both the master
    -    // and deploy mode, we have some logic to infer the master and deploy 
mode
    -    // from each other if only one is specified, or exit early if they are 
at odds.
    -    if (clusterManager == YARN) {
    -      if (args.master == "yarn-standalone") {
    -        printWarning("\"yarn-standalone\" is deprecated. Use 
\"yarn-cluster\" instead.")
    -        args.master = "yarn-cluster"
    -      }
    -      (args.master, args.deployMode) match {
    -        case ("yarn-cluster", null) =>
    -          deployMode = CLUSTER
    -        case ("yarn-cluster", "client") =>
    -          printErrorAndExit("Client deploy mode is not compatible with 
master \"yarn-cluster\"")
    -        case ("yarn-client", "cluster") =>
    -          printErrorAndExit("Cluster deploy mode is not compatible with 
master \"yarn-client\"")
    -        case (_, mode) =>
    -          args.master = "yarn-" + Option(mode).getOrElse("client")
    -      }
    -
    +    if (args.clusterManagerFlag == CM_YARN) {
           // Make sure YARN is included in our build if we're trying to use it
           if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && 
!Utils.isTesting) {
             printErrorAndExit(
               "Could not load YARN classes. " +
               "This copy of Spark may not have been compiled with YARN 
support.")
           }
    -    }
    -
    -    // The following modes are not supported or applicable
    -    (clusterManager, deployMode) match {
    -      case (MESOS, CLUSTER) =>
    -        printErrorAndExit("Cluster deploy mode is currently not supported 
for Mesos clusters.")
    -      case (_, CLUSTER) if args.isPython =>
    -        printErrorAndExit("Cluster deploy mode is currently not supported 
for python applications.")
    -      case (_, CLUSTER) if isShell(args.primaryResource) =>
    -        printErrorAndExit("Cluster deploy mode is not applicable to Spark 
shells.")
    -      case _ =>
    +      val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || 
sys.env.contains("YARN_CONF_DIR")
    +      if (!hasHadoopEnv && !Utils.isTesting) {
    +        throw new Exception("When running with master '" + args.master + 
"'" +
    +          "either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the 
environment.")
    +      }
         }
     
         // If we're running a python app, set the main class to our specific 
python runner
         if (args.isPython) {
           if (args.primaryResource == PYSPARK_SHELL) {
    -        args.mainClass = "py4j.GatewayServer"
    -        args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
    +        args.mainClass = PY4J_GATEWAYSERVER
    +        args.childArgs = mutable.ArrayBuffer("--die-on-broken-pipe", "0")
           } else {
             // If a python file is provided, add it to the child arguments and 
list of files to deploy.
             // Usage: PythonAppRunner <main python file> <extra python files> 
[app arguments]
    -        args.mainClass = "org.apache.spark.deploy.PythonRunner"
    -        args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) 
++ args.childArgs
    -        args.files = mergeFileLists(args.files, args.primaryResource)
    +        args.mainClass = PYTHON_RUNNER
    +        args.childArgs = mutable.ArrayBuffer(args.primaryResource,
    +          args.pyFiles.getOrElse("")) ++ args.childArgs
    +        args.files = mergeFileLists(args.files.orNull, 
args.primaryResource)
           }
    -      args.files = mergeFileLists(args.files, args.pyFiles)
    +      args.files = mergeFileLists(args.files.orNull, args.pyFiles.orNull)
           // Format python file paths properly before adding them to the 
PYTHONPATH
    -      sysProps("spark.submit.pyFiles") = 
PythonRunner.formatPaths(args.pyFiles).mkString(",")
    +      sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(
    +        args.pyFiles.getOrElse("")).mkString(",")
         }
     
    +    // As arguments get processed they are removed from this map.
    +    val unprocessedArgs = new mutable.HashSet ++= args.conf.keySet
    +
         // Special flag to avoid deprecation warnings at the client
         sysProps("SPARK_SUBMIT") = "true"
     
    -    // A list of rules to map each argument to system properties or 
command-line options in
    -    // each deploy mode; we iterate through these below
    +    /* By default, spark config properties will be passed to child 
processes as system properties
    +     * unless they are mentioned in a list of rules below which map 
arguments to either
    +     * 1. a command line option (clOption=...)
    +     * 2. a differently named system property (sysProp=...)
    +     */
         val options = List[OptionAssigner](
    -
    -      // All cluster managers
    -      OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 
sysProp = "spark.master"),
    -      OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 
sysProp = "spark.app.name"),
    -      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = 
"spark.jars"),
    -      OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
    -        sysProp = "spark.driver.memory"),
    -      OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, 
ALL_DEPLOY_MODES,
    -        sysProp = "spark.driver.extraClassPath"),
    -      OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, 
ALL_DEPLOY_MODES,
    -        sysProp = "spark.driver.extraJavaOptions"),
    -      OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, 
ALL_DEPLOY_MODES,
    -        sysProp = "spark.driver.extraLibraryPath"),
    -
           // Standalone cluster only
    -      OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = 
"spark.jars"),
    -      OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = 
"--memory"),
    -      OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = 
"--cores"),
    +      OptionAssigner(SPARK_DRIVER_MEMORY, CM_STANDALONE, DM_CLUSTER, 
clOption = "--memory"),
    +      OptionAssigner(SPARK_DRIVER_CORES, CM_STANDALONE, DM_CLUSTER, 
clOption = "--cores"),
     
    -      // Yarn client only
    -      OptionAssigner(args.queue, YARN, CLIENT, sysProp = 
"spark.yarn.queue"),
    -      OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = 
"spark.executor.instances"),
    -      OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = 
"spark.executor.cores"),
    -      OptionAssigner(args.files, YARN, CLIENT, sysProp = 
"spark.yarn.dist.files"),
    -      OptionAssigner(args.archives, YARN, CLIENT, sysProp = 
"spark.yarn.dist.archives"),
    +      //  yarn client
    +      OptionAssigner(SPARK_FILES, CM_YARN, DM_CLIENT, sysProp = 
SPARK_YARN_DIST_FILES,
    +        keepProperty=true),
     
           // Yarn cluster only
    -      OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
    -      OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = 
"--driver-memory"),
    -      OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
    -      OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = 
"--num-executors"),
    -      OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = 
"--executor-memory"),
    -      OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = 
"--executor-cores"),
    -      OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
    -      OptionAssigner(args.archives, YARN, CLUSTER, clOption = 
"--archives"),
    -      OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
    -
    -      // Other options
    -      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, 
ALL_DEPLOY_MODES,
    -        sysProp = "spark.executor.memory"),
    -      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, 
ALL_DEPLOY_MODES,
    -        sysProp = "spark.cores.max"),
    -      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, 
ALL_DEPLOY_MODES,
    -        sysProp = "spark.files")
    +      OptionAssigner(SPARK_APP_NAME, CM_YARN, DM_CLUSTER, clOption = 
"--name", keepProperty = true),
    +      OptionAssigner(SPARK_DRIVER_MEMORY, CM_YARN, DM_CLUSTER, clOption = 
"--driver-memory"),
    +      OptionAssigner(SPARK_YARN_QUEUE, CM_YARN, DM_CLUSTER, clOption = 
"--queue"),
    +      OptionAssigner(SPARK_EXECUTOR_INSTANCES, CM_YARN, DM_CLUSTER, 
clOption = "--num-executors"),
    +      OptionAssigner(SPARK_EXECUTOR_MEMORY, CM_YARN, DM_CLUSTER, clOption 
= "--executor-memory",
    +        keepProperty=true),
    +      OptionAssigner(SPARK_EXECUTOR_CORES, CM_YARN, DM_CLUSTER, clOption = 
"--executor-cores"),
    +      OptionAssigner(SPARK_FILES, CM_YARN, DM_CLUSTER, clOption = 
"--files", keepProperty=true),
    +      OptionAssigner(SPARK_YARN_DIST_ARCHIVES, CM_YARN, DM_CLUSTER, 
clOption = "--archives",
    +        keepProperty=true),
    +      OptionAssigner(SPARK_JARS, CM_YARN, DM_CLUSTER, clOption = 
"--addJars")
         )
     
         // In client mode, launch the application main class directly
         // In addition, add the main application jar and any added jars (if 
any) to the classpath
    -    if (deployMode == CLIENT) {
    +    if (args.deployModeFlag == DM_CLIENT) {
           childMainClass = args.mainClass
           if (isUserJar(args.primaryResource)) {
             childClasspath += args.primaryResource
           }
    -      if (args.jars != null) { childClasspath ++= args.jars.split(",") }
    +      if (args.jars.isDefined) { childClasspath ++= 
args.jars.get.split(",") }
           if (args.childArgs != null) { childArgs ++= args.childArgs }
         }
     
    -
         // Map all arguments to command-line options or system properties for 
our chosen mode
         for (opt <- options) {
    -      if (opt.value != null &&
    -          (deployMode & opt.deployMode) != 0 &&
    -          (clusterManager & opt.clusterManager) != 0) {
    -        if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) 
}
    -        if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
    +      if (unprocessedArgs.contains(opt.configKey) &&
    +          (args.deployModeFlag & opt.deployMode) != 0 &&
    +          (args.clusterManagerFlag & opt.clusterManager) != 0) {
    +        val optValue = args.conf(opt.configKey)
    +        if (opt.clOption != null) { childArgs += (opt.clOption, optValue) }
    +        if (opt.sysProp != null) { sysProps.put(opt.sysProp, optValue) }
    +        if (!opt.keepProperty) {
    +          unprocessedArgs -= opt.configKey
    +        }
           }
         }
     
         // Add the application jar automatically so the user doesn't have to 
call sc.addJar
         // For YARN cluster mode, the jar is already distributed on each node 
as "app.jar"
         // For python files, the primary resource is already distributed as a 
regular file
    -    val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
    -    if (!isYarnCluster && !args.isPython) {
    -      var jars = sysProps.get("spark.jars").map(x => 
x.split(",").toSeq).getOrElse(Seq.empty)
    +    if (!args.isYarnCluster && !args.isPython) {
    +      var jars = args.conf.get(SPARK_JARS).map(x => 
x.split(",").toSeq).getOrElse(Seq.empty)
           if (isUserJar(args.primaryResource)) {
             jars = jars ++ Seq(args.primaryResource)
           }
    -      sysProps.put("spark.jars", jars.mkString(","))
    +      sysProps.put(SPARK_JARS, jars.mkString(","))
    +      unprocessedArgs -= SPARK_JARS
         }
     
         // In standalone-cluster mode, use Client as a wrapper around the user 
class
    -    if (clusterManager == STANDALONE && deployMode == CLUSTER) {
    +    if (args.clusterManagerFlag == CM_STANDALONE && args.deployModeFlag == 
DM_CLUSTER) {
           childMainClass = "org.apache.spark.deploy.Client"
           if (args.supervise) {
             childArgs += "--supervise"
           }
           childArgs += "launch"
           childArgs += (args.master, args.primaryResource, args.mainClass)
    +
    +      unprocessedArgs --= Seq(SPARK_APP_PRIMARY_RESOURCE, SPARK_APP_CLASS,
    +        SPARK_DRIVER_SUPERVISE)
    +
           if (args.childArgs != null) {
             childArgs ++= args.childArgs
           }
         }
     
         // In yarn-cluster mode, use yarn.Client as a wrapper around the user 
class
    -    if (isYarnCluster) {
    +    if (args.isYarnCluster) {
           childMainClass = "org.apache.spark.deploy.yarn.Client"
           if (args.primaryResource != SPARK_INTERNAL) {
             childArgs += ("--jar", args.primaryResource)
    +        unprocessedArgs -= SPARK_APP_PRIMARY_RESOURCE
           }
           childArgs += ("--class", args.mainClass)
    +      unprocessedArgs -= SPARK_APP_CLASS
           if (args.childArgs != null) {
             args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
           }
         }
     
    -    // Properties given with --conf are superceded by other options, but 
take precedence over
    -    // properties in the defaults file.
    -    for ((k, v) <- args.sparkProperties) {
    -      sysProps.getOrElseUpdate(k, v)
    -    }
    -
    -    // Read from default spark properties, if any
    -    for ((k, v) <- args.defaultSparkProperties) {
    -      sysProps.getOrElseUpdate(k, v)
    +    // Those config items that haven't already been processed will get 
passed as system properties.
    +    for (k <- unprocessedArgs) {
    +      sysProps.getOrElseUpdate(k, args.conf(k))
         }
     
         (childArgs, childClasspath, sysProps, childMainClass)
       }
     
       private def launch(
    -      childArgs: ArrayBuffer[String],
    -      childClasspath: ArrayBuffer[String],
    -      sysProps: Map[String, String],
    -      childMainClass: String,
    -      verbose: Boolean = false) {
    +               childArgs: mutable.ArrayBuffer[String],
    --- End diff --
    
    Indentation is still broken.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to