Repository: spark Updated Branches: refs/heads/master 18ab6bd70 -> 293a0b5db
[SPARK-2098] All Spark processes should support spark-defaults.conf, config file This is another implementation about #1256 cc andrewor14 vanzin Author: GuoQiang Li <wi...@qq.com> Closes #2379 from witgo/SPARK-2098-new and squashes the following commits: 4ef1cbd [GuoQiang Li] review commit 49ef70e [GuoQiang Li] Refactor getDefaultPropertiesFile c45d20c [GuoQiang Li] All Spark processes should support spark-defaults.conf, config file Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/293a0b5d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/293a0b5d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/293a0b5d Branch: refs/heads/master Commit: 293a0b5dbba0474832dc7e9d387f3b10f6c452ea Parents: 18ab6bd Author: GuoQiang Li <wi...@qq.com> Authored: Tue Oct 14 22:16:38 2014 -0700 Committer: Andrew Or <andrewo...@gmail.com> Committed: Tue Oct 14 22:16:38 2014 -0700 ---------------------------------------------------------------------- .../spark/deploy/SparkSubmitArguments.scala | 42 ++--------------- .../deploy/SparkSubmitDriverBootstrapper.scala | 2 +- .../deploy/history/HistoryServerArguments.scala | 16 ++++++- .../spark/deploy/master/MasterArguments.scala | 19 ++++++-- .../spark/deploy/worker/WorkerArguments.scala | 21 +++++++-- .../scala/org/apache/spark/util/Utils.scala | 48 ++++++++++++++++++++ .../org/apache/spark/util/UtilsSuite.scala | 19 ++++++++ docs/monitoring.md | 7 +++ 8 files changed, 124 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 57b251f..72a452e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -17,14 +17,11 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties import java.util.jar.JarFile import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} -import org.apache.spark.SparkException import org.apache.spark.util.Utils /** @@ -63,9 +60,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St val defaultProperties = new HashMap[String, String]() if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile") Option(propertiesFile).foreach { filename => - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) => - if (k.startsWith("spark")) { + Utils.getPropertiesFromFile(filename).foreach { case (k, v) => + if (k.startsWith("spark.")) { defaultProperties(k) = v if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v") } else { @@ -90,19 +86,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St */ private def mergeSparkProperties(): Unit = { // Use common defaults file, if not specified by user - if (propertiesFile == null) { - val sep = File.separator - val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf") - val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig) - - confDir.foreach { sparkConfDir => - val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf" - val file = new File(defaultPath) - if (file.exists()) { - propertiesFile = file.getAbsolutePath - } - } - } + propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env)) val properties = HashMap[String, String]() properties.putAll(defaultSparkProperties) @@ -397,23 +381,3 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St SparkSubmit.exitFn() } } - -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { - require(file.exists(), s"Properties file $file does not exist") - require(file.isFile(), s"Properties file $file is not a normal file") - val inputStream = new FileInputStream(file) - try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim)) - } catch { - case e: IOException => - val message = s"Failed when loading Spark properties file $file" - throw new SparkException(message, e) - } finally { - inputStream.close() - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index a64170a..0125330 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -68,7 +68,7 @@ private[spark] object SparkSubmitDriverBootstrapper { assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set") // Parse the properties file for the equivalent spark.driver.* configs - val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap + val properties = Utils.getPropertiesFromFile(propertiesFile) val confDriverMemory = properties.get("spark.driver.memory") val confLibraryPath = properties.get("spark.driver.extraLibraryPath") val confClasspath = properties.get("spark.driver.extraClassPath") http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index 25fc76c..5bce32a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -18,12 +18,14 @@ package org.apache.spark.deploy.history import org.apache.spark.SparkConf +import org.apache.spark.util.Utils /** * Command-line parser for the master. */ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) { private var logDir: String = null + private var propertiesFile: String = null parse(args.toList) @@ -32,11 +34,16 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] case ("--dir" | "-d") :: value :: tail => logDir = value conf.set("spark.history.fs.logDirectory", value) + System.setProperty("spark.history.fs.logDirectory", value) parse(tail) case ("--help" | "-h") :: tail => printUsageAndExit(0) + case ("--properties-file") :: value :: tail => + propertiesFile = value + parse(tail) + case Nil => case _ => @@ -44,10 +51,17 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] } } + // This mutates the SparkConf, so all accesses to it must be made after this line + Utils.loadDefaultSparkProperties(conf, propertiesFile) + private def printUsageAndExit(exitCode: Int) { System.err.println( """ - |Usage: HistoryServer + |Usage: HistoryServer [options] + | + |Options: + | --properties-file FILE Path to a custom Spark properties file. + | Default is conf/spark-defaults.conf. | |Configuration options can be set by setting the corresponding JVM system property. |History Server options are always available; additional options depend on the provider. http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala index 4b0dbbe..e34bee7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala @@ -27,6 +27,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) { var host = Utils.localHostName() var port = 7077 var webUiPort = 8080 + var propertiesFile: String = null // Check for settings in environment variables if (System.getenv("SPARK_MASTER_HOST") != null) { @@ -38,12 +39,16 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) { if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) { webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt } + + parse(args.toList) + + // This mutates the SparkConf, so all accesses to it must be made after this line + propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) + if (conf.contains("spark.master.ui.port")) { webUiPort = conf.get("spark.master.ui.port").toInt } - parse(args.toList) - def parse(args: List[String]): Unit = args match { case ("--ip" | "-i") :: value :: tail => Utils.checkHost(value, "ip no longer supported, please use hostname " + value) @@ -63,7 +68,11 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) { webUiPort = value parse(tail) - case ("--help" | "-h") :: tail => + case ("--properties-file") :: value :: tail => + propertiesFile = value + parse(tail) + + case ("--help") :: tail => printUsageAndExit(0) case Nil => {} @@ -83,7 +92,9 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) { " -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" + " -h HOST, --host HOST Hostname to listen on\n" + " -p PORT, --port PORT Port to listen on (default: 7077)\n" + - " --webui-port PORT Port for web UI (default: 8080)") + " --webui-port PORT Port for web UI (default: 8080)\n" + + " --properties-file FILE Path to a custom Spark properties file.\n" + + " Default is conf/spark-defaults.conf.") System.exit(exitCode) } } http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 54e3937..019cd70 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -33,6 +33,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { var memory = inferDefaultMemory() var masters: Array[String] = null var workDir: String = null + var propertiesFile: String = null // Check for settings in environment variables if (System.getenv("SPARK_WORKER_PORT") != null) { @@ -47,15 +48,19 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) { webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt } - if (conf.contains("spark.worker.ui.port")) { - webUiPort = conf.get("spark.worker.ui.port").toInt - } if (System.getenv("SPARK_WORKER_DIR") != null) { workDir = System.getenv("SPARK_WORKER_DIR") } parse(args.toList) + // This mutates the SparkConf, so all accesses to it must be made after this line + propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) + + if (conf.contains("spark.worker.ui.port")) { + webUiPort = conf.get("spark.worker.ui.port").toInt + } + checkWorkerMemory() def parse(args: List[String]): Unit = args match { @@ -89,7 +94,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { webUiPort = value parse(tail) - case ("--help" | "-h") :: tail => + case ("--properties-file") :: value :: tail => + propertiesFile = value + parse(tail) + + case ("--help") :: tail => printUsageAndExit(0) case value :: tail => @@ -124,7 +133,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { " -i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)\n" + " -h HOST, --host HOST Hostname to listen on\n" + " -p PORT, --port PORT Port to listen on (default: random)\n" + - " --webui-port PORT Port for web UI (default: 8081)") + " --webui-port PORT Port for web UI (default: 8081)\n" + + " --properties-file FILE Path to a custom Spark properties file.\n" + + " Default is conf/spark-defaults.conf.") System.exit(exitCode) } http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index aad9016..cbc4095 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1410,6 +1410,54 @@ private[spark] object Utils extends Logging { } } + /** + * Load default Spark properties from the given file. If no file is provided, + * use the common defaults file. This mutates state in the given SparkConf and + * in this JVM's system properties if the config specified in the file is not + * already set. Return the path of the properties file used. + */ + def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = { + val path = Option(filePath).getOrElse(getDefaultPropertiesFile()) + Option(path).foreach { confFile => + getPropertiesFromFile(confFile).filter { case (k, v) => + k.startsWith("spark.") + }.foreach { case (k, v) => + conf.setIfMissing(k, v) + sys.props.getOrElseUpdate(k, v) + } + } + path + } + + /** Load properties present in the given file. */ + def getPropertiesFromFile(filename: String): Map[String, String] = { + val file = new File(filename) + require(file.exists(), s"Properties file $file does not exist") + require(file.isFile(), s"Properties file $file is not a normal file") + + val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8") + try { + val properties = new Properties() + properties.load(inReader) + properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap + } catch { + case e: IOException => + throw new SparkException(s"Failed when loading Spark properties from $filename", e) + } finally { + inReader.close() + } + } + + /** Return the path of the default Spark properties file. */ + def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = { + env.get("SPARK_CONF_DIR") + .orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" }) + .map { t => new File(s"$t${File.separator}spark-defaults.conf")} + .filter(_.isFile) + .map(_.getAbsolutePath) + .orNull + } + /** Return a nice string representation of the exception, including the stack trace. */ def exceptionString(e: Exception): String = { if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace) http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 0344da6..ea7ef05 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -27,6 +27,8 @@ import com.google.common.base.Charsets import com.google.common.io.Files import org.scalatest.FunSuite +import org.apache.spark.SparkConf + class UtilsSuite extends FunSuite { test("bytesToString") { @@ -332,4 +334,21 @@ class UtilsSuite extends FunSuite { assert(!tempFile2.exists()) } + test("loading properties from file") { + val outFile = File.createTempFile("test-load-spark-properties", "test") + try { + System.setProperty("spark.test.fileNameLoadB", "2") + Files.write("spark.test.fileNameLoadA true\n" + + "spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8) + val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath) + properties + .filter { case (k, v) => k.startsWith("spark.")} + .foreach { case (k, v) => sys.props.getOrElseUpdate(k, v)} + val sparkConf = new SparkConf + assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true) + assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2) + } finally { + outFile.delete() + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/docs/monitoring.md ---------------------------------------------------------------------- diff --git a/docs/monitoring.md b/docs/monitoring.md index d07ec4a..e3f81a7 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -78,6 +78,13 @@ follows: file system.</td> </tr> <tr> + <td>spark.history.fs.logDirectory</td> + <td>(none)</td> + <td> + Directory that contains application event logs to be loaded by the history server + </td> + </tr> + <tr> <td>spark.history.fs.updateInterval</td> <td>10</td> <td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org