Github user tigerquoll commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2516#discussion_r19795115
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
    @@ -17,201 +17,286 @@
     
     package org.apache.spark.deploy
     
    -import java.io.{File, FileInputStream, IOException}
    -import java.util.Properties
    +import java.io._
     import java.util.jar.JarFile
     
    -import scala.collection.JavaConversions._
    -import scala.collection.mutable.{ArrayBuffer, HashMap}
    +import scala.collection._
     
     import org.apache.spark.SparkException
    +import org.apache.spark.deploy.ConfigConstants._
     import org.apache.spark.util.Utils
    +import org.apache.spark.deploy.SparkSubmitArguments._
     
     /**
    - * Parses and encapsulates arguments from the spark-submit script.
    - * The env argument is used for testing.
    - */
    -private[spark] class SparkSubmitArguments(args: Seq[String], env: 
Map[String, String] = sys.env) {
    -  var master: String = null
    -  var deployMode: String = null
    -  var executorMemory: String = null
    -  var executorCores: String = null
    -  var totalExecutorCores: String = null
    -  var propertiesFile: String = null
    -  var driverMemory: String = null
    -  var driverExtraClassPath: String = null
    -  var driverExtraLibraryPath: String = null
    -  var driverExtraJavaOptions: String = null
    -  var driverCores: String = null
    -  var supervise: Boolean = false
    -  var queue: String = null
    -  var numExecutors: String = null
    -  var files: String = null
    -  var archives: String = null
    -  var mainClass: String = null
    -  var primaryResource: String = null
    -  var name: String = null
    -  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
    -  var jars: String = null
    -  var verbose: Boolean = false
    -  var isPython: Boolean = false
    -  var pyFiles: String = null
    -  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
    -
    -  /** Default properties present in the currently defined defaults file. */
    -  lazy val defaultSparkProperties: HashMap[String, String] = {
    -    val defaultProperties = new HashMap[String, String]()
    -    if (verbose) SparkSubmit.printStream.println(s"Using properties file: 
$propertiesFile")
    -    Option(propertiesFile).foreach { filename =>
    -      val file = new File(filename)
    -      SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =>
    -        if (k.startsWith("spark")) {
    -          defaultProperties(k) = v
    -          if (verbose) SparkSubmit.printStream.println(s"Adding default 
property: $k=$v")
    -        } else {
    -          SparkSubmit.printWarning(s"Ignoring non-spark config property: 
$k=$v")
    -        }
    -      }
    -    }
    -    defaultProperties
    + * Pulls and validates configuration information together in order of 
priority
    + *
    + * Entries in the conf Map will be filled in the following priority order
    + * 1. entries specified on the command line (except from --conf entries)
    + * 2. Entries specified on the command line with --conf
    + * 3. Legacy environment variables
    + * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
    + * 5. hard coded defaults
    + *
    +*/
    +private[spark] class SparkSubmitArguments(args: Seq[String]) {
    +  /**
    +   * Stores all configuration items except for child arguments,
    +   * referenced by the constants defined in ConfigConstants.scala.
    +   */
    +  val conf = new mutable.HashMap[String, String]()
    +
    +  def master  = conf(SPARK_MASTER)
    +  def master_= (value: String):Unit = conf.put(SPARK_MASTER, value)
    +
    +  def executorMemory = conf(SPARK_EXECUTOR_MEMORY)
    +  def executorMemory_= (value: String):Unit = 
conf.put(SPARK_EXECUTOR_MEMORY, value)
    +
    +  def executorCores = conf(SPARK_EXECUTOR_CORES)
    +  def executorCores_= (value: String):Unit = 
conf.put(SPARK_EXECUTOR_CORES, value)
    +
    +  def totalExecutorCores = conf.get(SPARK_CORES_MAX)
    +  def totalExecutorCores_= (value: String):Unit = 
conf.put(SPARK_CORES_MAX, value)
    +
    +  def driverMemory = conf(SPARK_DRIVER_MEMORY)
    +  def driverMemory_= (value: String):Unit = conf.put(SPARK_DRIVER_MEMORY, 
value)
    +
    +  def driverExtraClassPath = conf.get(SPARK_DRIVER_EXTRA_CLASSPATH)
    +  def driverExtraClassPath_= (value: String):Unit = 
conf.put(SPARK_DRIVER_EXTRA_CLASSPATH, value)
    +
    +  def driverExtraLibraryPath = conf.get(SPARK_DRIVER_EXTRA_LIBRARY_PATH)
    +  def driverExtraLibraryPath_= (value: String):Unit = 
conf.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value)
    +
    +  def driverExtraJavaOptions = conf.get(SPARK_DRIVER_EXTRA_JAVA_OPTIONS)
    +  def driverExtraJavaOptions_= (value: String):Unit = 
conf.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value)
    +
    +  def driverCores = conf(SPARK_DRIVER_CORES)
    +  def driverCores_= (value: String):Unit = conf.put(SPARK_DRIVER_CORES, 
value)
    +
    +  def supervise = conf(SPARK_DRIVER_SUPERVISE).toBoolean
    +  def supervise_= (value: String):Unit = conf.put(SPARK_DRIVER_SUPERVISE, 
value)
    +
    +  def queue = conf(SPARK_YARN_QUEUE)
    +  def queue_= (value: String):Unit = conf.put(SPARK_YARN_QUEUE, value)
    +
    +  def numExecutors = conf(SPARK_EXECUTOR_INSTANCES)
    +  def numExecutors_= (value: String):Unit = 
conf.put(SPARK_EXECUTOR_INSTANCES, value)
    +
    +  def files = conf.get(SPARK_FILES)
    +  def files_= (value: String):Unit = conf.put(SPARK_FILES, value)
    +
    +  def archives = conf.get(SPARK_YARN_DIST_ARCHIVES)
    +  def archives_= (value: String):Unit = conf.put(SPARK_YARN_DIST_ARCHIVES, 
value)
    +
    +  def mainClass = conf(SPARK_APP_CLASS)
    +  def mainClass_= (value: String):Unit = conf.put(SPARK_APP_CLASS, value)
    +
    +  def primaryResource = conf(SPARK_APP_PRIMARY_RESOURCE)
    +  def primaryResource_= (value: String):Unit = 
conf.put(SPARK_APP_PRIMARY_RESOURCE, value)
    +
    +  def name = conf(SPARK_APP_NAME)
    +  def name_= (value: String):Unit = conf.put(SPARK_APP_NAME, value)
    +
    +  def jars = conf.get(SPARK_JARS)
    +  def jars_= (value: String):Unit = conf.put(SPARK_JARS, value)
    +
    +  def pyFiles = conf.get(SPARK_SUBMIT_PYFILES)
    +  def pyFiles_= (value: String):Unit = conf.put(SPARK_SUBMIT_PYFILES, 
value)
    +
    +  def deployMode = conf.get(SPARK_DEPLOY_MODE)
    +  def deployMode_= (value: String):Unit = conf.put(SPARK_DEPLOY_MODE, 
value)
    +
    +  var childArgs = new mutable.ArrayBuffer[String]
    +
    +  lazy val verbose = conf(SPARK_VERBOSE).toBoolean
    +  lazy val isPython = conf.contains(SPARK_APP_PRIMARY_RESOURCE) &&
    +    SparkSubmit.isPython(primaryResource)
    +  lazy val isYarnCluster = clusterManagerFlag == CM_YARN && deployModeFlag 
== DM_CLUSTER
    +
    +  /**
    +   * Deploy mode - flags are defined in ConfigConstants module (DM_*)
    +   */
    +  lazy val deployModeFlag = deployMode match {
    +    case Some("client")  => DM_CLIENT
    +    case Some("cluster") => DM_CLUSTER
    +    case _ => throw new SparkException("Deploy mode must be either client 
or cluster")
       }
     
    -  // Respect SPARK_*_MEMORY for cluster mode
    -  driverMemory = sys.env.get("SPARK_DRIVER_MEMORY").orNull
    -  executorMemory = sys.env.get("SPARK_EXECUTOR_MEMORY").orNull
    +  /**
    +   * Cluster manager - flags are defined in ConfigConstants module (CM_*)
    +   */
    +  lazy val clusterManagerFlag = master match {
    +    case m if m.startsWith("yarn") => CM_YARN
    +    case m if m.startsWith("spark") => CM_STANDALONE
    +    case m if m.startsWith("mesos") => CM_MESOS
    +    case m if m.startsWith("local") => CM_LOCAL
    +    case _ => throw new SparkException("Master must start with yarn, 
spark, mesos, or local")
    +  }
     
    -  parseOpts(args.toList)
    -  mergeSparkProperties()
    -  checkRequiredArguments()
    +  /**
    +   * Used to store parameters parsed from command line (except for --conf 
and child arguments)
    +   */
    +  private val cmdLineConfig = new mutable.HashMap[String, String]
     
       /**
    -   * Fill in any undefined values based on the default properties file or 
options passed in through
    -   * the '--conf' flag.
    +   * arguments passed via --conf command line options
    +    */
    +  private val cmdLineConfConfig = new mutable.HashMap[String, String]
    +
    +  /**
    +   * Values from a property file specified with --properties
        */
    -  private def mergeSparkProperties(): Unit = {
    -    // Use common defaults file, if not specified by user
    -    if (propertiesFile == null) {
    -      val sep = File.separator
    -      val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => 
s"${sparkHome}${sep}conf")
    -      val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig)
    -
    -      confDir.foreach { sparkConfDir =>
    -        val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
    -        val file = new File(defaultPath)
    -        if (file.exists()) {
    -          propertiesFile = file.getAbsolutePath
    -        }
    -      }
    +  private val cmdLinePropertyFileValues = new 
mutable.HashMap[String,String]
    +
    +  try {
    +    parseOpts(args.toList)
    +
    +    // if property file exists then update the command line arguments, but 
don't override
    +    // existing arguments
    +    cmdLinePropertyFileValues.foreach{ case(k,v) =>
    +      cmdLineConfig.getOrElseUpdate(k,v)
         }
     
    -    val properties = HashMap[String, String]()
    -    properties.putAll(defaultSparkProperties)
    -    properties.putAll(sparkProperties)
    -
    -    // Use properties file as fallback for values which have a direct 
analog to
    -    // arguments in this script.
    -    master = Option(master).orElse(properties.get("spark.master")).orNull
    -    executorMemory = 
Option(executorMemory).orElse(properties.get("spark.executor.memory")).orNull
    -    executorCores = 
Option(executorCores).orElse(properties.get("spark.executor.cores")).orNull
    -    totalExecutorCores = Option(totalExecutorCores)
    -      .orElse(properties.get("spark.cores.max"))
    -      .orNull
    -    name = Option(name).orElse(properties.get("spark.app.name")).orNull
    -    jars = Option(jars).orElse(properties.get("spark.jars")).orNull
    +    // See comments at start of class definition for the location and 
priority of configuration sources.
    +    conf ++= mergeSparkProperties(Seq(cmdLineConfig, cmdLineConfConfig), 
CMD_LINE_ONLY_KEYS)
    +
    +    // Some configuration items can be derived if they are not yet present.
    +    deriveConfigsPreDefaults()
    +    DEFAULTS.foreach{ case (k,v) =>
    +      conf.getOrElseUpdate(k,v)
    +    }
    +    deriveConfigsPostDefaults()
    +
    +    checkRequiredArguments()
    +  } catch {
    +    // IOException are possible when we are attempting to read property 
files from the file system.
    +    case e @ (_: SparkException | _: IOException) => 
SparkSubmit.printErrorAndExit(e.getLocalizedMessage)
    +  }
    +
    +  /**
    +   * Some deriviations are only valid before we apply default values
    +   */
    +  private def deriveConfigsPreDefaults() = {
    +    // Because "yarn-cluster" and "yarn-client" encapsulate both the master
    +    // and deploy mode, we have some logic to infer the master and deploy 
mode
    +    // from each other if only one is specified
    +    if (conf.contains(SPARK_MASTER) && (clusterManagerFlag == CM_YARN)) {
    +      if (master == "yarn-standalone") {
    +        SparkSubmit.printWarning("\"yarn-standalone\" is deprecated. Use 
\"yarn-cluster\" instead.")
    +        master = "yarn-cluster"
    +      }
     
    -    // This supports env vars in older versions of Spark
    -    master = Option(master).orElse(env.get("MASTER")).orNull
    -    deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
    +      (master, deployMode) match {
    +        case ("yarn-cluster", None) => deployMode = "cluster"
    +        case ("yarn-cluster", Some("client")) => // invalid and caught in 
checkRequirements
    +        case ("yarn-client",  Some("cluster")) => // invalid and caught in 
checkRequirements
    +        case (_, Some(mode))  =>
    +          val newMaster = "yarn-" + mode
    +          if (master != newMaster) {
    +            SparkSubmit.printWarning(s"master is being changed to 
$newMaster")
    +            master = newMaster
    +          }
    +        case _ => // we cannot figure out deploymode - 
checkRequiredArguments will pick this up
    +      }
    +    }
    +  }
     
    -    // Try to set main class from JAR if no --class argument is given
    -    if (mainClass == null && !isPython && primaryResource != null) {
    +  private def deriveConfigsPostDefaults() = {
    +    // These config items point to file paths, but may need to be 
converted to absolute file uris.
    +    val configFileUris = Seq(
    +      SPARK_FILES,
    +      SPARK_SUBMIT_PYFILES,
    +      SPARK_YARN_DIST_ARCHIVES,
    +      SPARK_JARS,
    +      SPARK_APP_PRIMARY_RESOURCE)
    +
    +    // Process configFileUris with resolvedURIs function if they are 
present.
    +    def resolvableFileUri(key: String) = conf.contains(key) &&
    +      ((key != SPARK_APP_PRIMARY_RESOURCE) || 
(!SparkSubmit.isInternalOrShell(conf(key))))
    +
    +    configFileUris
    +      .filter( resolvableFileUri )
    +      .foreach( key => conf.put(key, Utils.resolveURIs(conf(key), 
testWindows=false)) )
    +
    +    // Try to set main class from JAR if no --class argument is given.
    +    if (!conf.contains(SPARK_APP_CLASS) && !isPython && primaryResource != 
null) {
           try {
             val jar = new JarFile(primaryResource)
    -        // Note that this might still return null if no main-class is set; 
we catch that later
    -        mainClass = 
jar.getManifest.getMainAttributes.getValue("Main-Class")
    +        // Note that this might still return null if no main-class is set; 
we catch that later.
    +        val manifestMainClass = 
jar.getManifest.getMainAttributes.getValue("Main-Class")
    +        if (manifestMainClass != null && !manifestMainClass.isEmpty) {
    +          mainClass = manifestMainClass
    +        }
           } catch {
             case e: Exception =>
               SparkSubmit.printErrorAndExit("Cannot load main class from JAR: 
" + primaryResource)
    -          return
           }
         }
     
    -    // Global defaults. These should be keep to minimum to avoid confusing 
behavior.
    -    master = Option(master).getOrElse("local[*]")
    -
    -    // Set name from main class if not given
    -    name = Option(name).orElse(Option(mainClass)).orNull
    -    if (name == null && primaryResource != null) {
    -      name = Utils.stripDirectory(primaryResource)
    -    }
    +    // Set name from main class if not given.
    +    name = conf.get(SPARK_APP_NAME)
    +      .orElse( conf.get(SPARK_APP_CLASS) )
    +      .orElse( conf.get(SPARK_APP_PRIMARY_RESOURCE).map(x => 
Utils.stripDirectory(x)) )
    +      .orNull
       }
     
       /** Ensure that required fields exists. Call this only once all defaults 
are loaded. */
       private def checkRequiredArguments() = {
    -    if (args.length == 0) {
    -      printUsageAndExit(-1)
    -    }
    -    if (primaryResource == null) {
    +    // accessing the lazy deployModeFlag val invokes validation on 
deployMode
    +    deployModeFlag
    +
    +    if (!conf.contains(SPARK_APP_PRIMARY_RESOURCE)) {
           SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR 
or Python file)")
         }
    -    if (mainClass == null && !isPython) {
    -      SparkSubmit.printErrorAndExit("No main class set in JAR; please 
specify one with --class")
    -    }
    -    if (pyFiles != null && !isPython) {
    -      SparkSubmit.printErrorAndExit("--py-files given but primary resource 
is not a Python script")
    -    }
     
    -    // Require all python files to be local, so we can add them to the 
PYTHONPATH
    +    // Require all python files to be local, so we can add them to the 
PYTHONPATH.
         if (isPython) {
           if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
             SparkSubmit.printErrorAndExit(s"Only local python files are 
supported: $primaryResource")
           }
    -      val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
    +
    +      val nonLocalPyFiles = 
Utils.nonLocalPaths(pyFiles.getOrElse("")).mkString(",")
           if (nonLocalPyFiles.nonEmpty) {
             SparkSubmit.printErrorAndExit(
               s"Only local additional python files are supported: 
$nonLocalPyFiles")
           }
    -    }
    +    } else {
    +      // Java primary resource
    +      if (!conf.contains(SPARK_APP_CLASS)) {
    +        SparkSubmit.printErrorAndExit("No main class set in JAR; please 
specify one with --class")
    +      }
     
    -    if (master.startsWith("yarn")) {
    -      val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || 
env.contains("YARN_CONF_DIR")
    -      if (!hasHadoopEnv && !Utils.isTesting) {
    -        throw new Exception(s"When running with master '$master' " +
    -          "either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the 
environment.")
    +      if (conf.contains(SPARK_SUBMIT_PYFILES)) {
    +        SparkSubmit.printErrorAndExit("--py-files given but primary 
resource is not a Python script")
           }
         }
    +
    +    (master, deployModeFlag) match {
    +      case ("yarn-cluster", DM_CLIENT) =>
    +        SparkSubmit.printErrorAndExit("Client deploy mode is not 
compatible with master \"yarn-cluster\"")
    +      case ("yarn-client", DM_CLUSTER) =>
    +        SparkSubmit.printErrorAndExit("Cluster deploy mode is not 
compatible with master \"yarn-client\"")
    +      case _ =>
    +    }
    +
    +    // The following modes are not supported or applicable
    +    (clusterManagerFlag, deployModeFlag) match {
    +      case (CM_MESOS, DM_CLUSTER) =>
    +        SparkSubmit.printErrorAndExit("Cluster deploy mode is currently 
not supported for Mesos clusters.")
    +      case (_, DM_CLUSTER) if isPython =>
    +        SparkSubmit.printErrorAndExit("Cluster deploy mode is currently 
not supported for python applications.")
    +      case (_, DM_CLUSTER) if SparkSubmit.isShell(primaryResource) =>
    +        SparkSubmit.printErrorAndExit("Cluster deploy mode is not 
applicable to Spark shells.")
    +      case _ =>
    +    }
       }
     
       override def toString =  {
    -    s"""Parsed arguments:
    -    |  master                  $master
    -    |  deployMode              $deployMode
    -    |  executorMemory          $executorMemory
    -    |  executorCores           $executorCores
    -    |  totalExecutorCores      $totalExecutorCores
    -    |  propertiesFile          $propertiesFile
    -    |  extraSparkProperties    $sparkProperties
    -    |  driverMemory            $driverMemory
    -    |  driverCores             $driverCores
    -    |  driverExtraClassPath    $driverExtraClassPath
    -    |  driverExtraLibraryPath  $driverExtraLibraryPath
    -    |  driverExtraJavaOptions  $driverExtraJavaOptions
    -    |  supervise               $supervise
    -    |  queue                   $queue
    -    |  numExecutors            $numExecutors
    -    |  files                   $files
    -    |  pyFiles                 $pyFiles
    -    |  archives                $archives
    -    |  mainClass               $mainClass
    -    |  primaryResource         $primaryResource
    -    |  name                    $name
    -    |  childArgs               [${childArgs.mkString(" ")}]
    -    |  jars                    $jars
    -    |  verbose                 $verbose
    -    |
    -    |Default properties from $propertiesFile:
    -    |${defaultSparkProperties.mkString("  ", "\n  ", "\n")}
    -    """.stripMargin
    +    val sb = new StringBuilder
    +    sb.append("Spark Configuration:\n")
    +    conf.foreach{ case (k,v) => sb.append(s"$k: $v\n") }
    --- End diff --
    
    Ok, append string updated, and I'll sort the conf map by key before 
printing out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to