Github user tigerquoll commented on a diff in the pull request:
https://github.com/apache/spark/pull/2516#discussion_r18128789
--- Diff:
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args:
Seq[String]) {
}
}
-object SparkSubmitArguments {
- /** Load properties present in the given file. */
- def getPropertiesFromFile(file: File): Seq[(String, String)] = {
- require(file.exists(), s"Properties file $file does not exist")
- require(file.isFile(), s"Properties file $file is not a normal file")
- val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+ /**
+ * Resolves Configuration sources in order of highest to lowest
+ * 1. Each map passed in as additionalConfig from first to last
+ * 2. Environment variables (including legacy variable mappings)
+ * 3. System config variables (eg by using -Dspark.var.name)
+ * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or
SPARK_HOME/conf/spark-defaults.conf
+ * 5. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read
in and the properties are
+ * considered to be at the priority of the method that specified the
files.
+ * A property specified in a property file will not override an existing
+ * config value at that same level
+ *
+ * @param additionalConfigs Seq of additional
Map[ConfigName->ConfigValue] in order of highest
+ * priority to lowest this will have priority
over internal sources
+ * @return Map[propName->propFile] containing values merged from all
sources in order of priority
+ */
+ def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+ // Configuration read in from spark-submit-defaults.prop file found on
the classpath
+ var hardCodedDefaultConfig: Option[Map[String,String]] = None
+ var is: InputStream = null
+ var isr: Option[InputStreamReader] = None
try {
- val properties = new Properties()
- properties.load(inputStream)
- properties.stringPropertyNames().toSeq.map(k => (k,
properties(k).trim))
- } catch {
- case e: IOException =>
- val message = s"Failed when loading Spark properties file $file"
- throw new SparkException(message, e)
+ is =
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+ // only open InputStreamReader if InputStream was successfully opened
+ isr = Option(is).map{is: InputStream =>
+ new InputStreamReader(is, CharEncoding.UTF_8)
+ }
+
+ hardCodedDefaultConfig = isr.map( defaultValueStream =>
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
} finally {
- inputStream.close()
+ Option(is).foreach(_.close)
+ isr.foreach(_.close)
}
+
+ if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size
== 0)) {
+ throw new IllegalStateException(s"Default values not found at
classpath $ClassPathSparkSubmitDefaults")
+ }
+
+ // Configuration read in from defaults file if it exists
+ var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+ if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) {
+ SparkSubmitArguments.getPropertyValuesFromFile(
+ sparkDefaultConfig.get(SparkPropertiesFile).get)
+ } else {
+ Map.empty
+ }
+
+ // Configuration from java system properties
+ val systemPropertyConfig =
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+ // Configuration variables from the environment
+ // support legacy variables
+ val environmentConfig = System.getenv().asScala
+
+ val legacyEnvVars = Seq("MASTER"->SparkMaster,
"DEPLOY_MODE"->SparkDeployMode,
+ "SPARK_DRIVER_MEMORY"->SparkDriverMemory,
"SPARK_EXECUTOR_MEMORY"->SparkExecutorMemory)
+
+ // legacy variables act at the priority of a system property
+ val propsWithEnvVars : mutable.Map[String,String] = new
mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars
+ .map( {case(varName, propName) => (environmentConfig.get(varName),
propName) })
+ .filter( {case(varVariable, _) => varVariable.isDefined &&
!varVariable.get.isEmpty} )
+ .map{case(varVariable, propName) => (propName, varVariable.get)}
+
+ val ConfigSources = additionalConfigs ++ Seq (
+ environmentConfig,
+ propsWithEnvVars,
+ sparkDefaultConfig,
+ hardCodedDefaultConfig.get
+ )
+
+ // Load properties file at priority level of source that specified the
property file
+ // loaded property file configs will not override existing configs at
the priority
+ // level the property file was specified at
+ val processedConfigSource = ConfigSources
+ .map( configMap => getFileBasedPropertiesIfSpecified(configMap) ++
configMap)
+
+ val test = Utils.mergePropertyMaps(processedConfigSource)
+
+ test
+ }
+
+ /**
+ * Returns a map of config values from a property file if
+ * the passed configMap has a SparkPropertiesFile defined pointing to a
file
+ * @param configMap Map of config values to check for file path from
+ * @return Map of config values if map holds a valid
SparkPropertiesFile, Map.Empty otherwise
+ */
+ def getFileBasedPropertiesIfSpecified(configMap: Map[String, String]) = {
+ if (configMap.contains(SparkPropertiesFile)) {
+
SparkSubmitArguments.getPropertyValuesFromFile(configMap.get(SparkPropertiesFile).get)
+ } else {
+ Map.empty
+ }
+ }
+
+ /**
+ * Gets configuration from reading SPARK_CONF_DIR/spark-defaults.conf if
it exists
+ * otherwise reads SPARK_HOME/conf/spark-defaults.conf if it exists
+ * otherwise returns an empty config structure
+ * @return Map[PropName->PropValue]
+ */
+ def getSparkDefaultFileConfig: Map[String, String] = {
+ val baseConfDir: Option[String] = sys.env.get(EnvSparkHome).map(_ +
File.separator + DirNameSparkConf)
+ val altConfDir: Option[String] = sys.env.get(EnvAltSparkConfPath)
+ val confDir: Option[String] = altConfDir.orElse(baseConfDir)
+
+ confDir.flatMap(path => Some(path + File.separator +
FileNameSparkDefaultsConf))
+ .flatMap(path => Some(new File(path)))
+ .filter(confFile => confFile.exists())
+ .map(confFile => loadPropFile(confFile))
+ .get
+ }
+
+ /**
+ * Parses a property file using the java properties file parser
+ * @param filePath Path to property file
+ * @return Map of config values parsed from file, empty map if file does
not exist
+ */
+ def getPropertyValuesFromFile(filePath: String): Map[String, String] = {
+ val propFile = new File(filePath)
+ loadPropFile(propFile)
+ }
+
+ /**
+ * returns a loaded property file
+ * @param propFile File object pointing to properties file
+ * @return java properties object
+ */
+ def loadPropFile(propFile: File): Map[String, String] = {
--- End diff --
Thanks for the heads up, looks like some duplication there, though his
efforts are more advanced than mine. When his PR gets merged I'll update and
use whatever code I can..
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]