Repository: spark
Updated Branches:
  refs/heads/master 18ab6bd70 -> 293a0b5db


[SPARK-2098] All Spark processes should support spark-defaults.conf, config file

This is another implementation about #1256
cc andrewor14 vanzin

Author: GuoQiang Li <wi...@qq.com>

Closes #2379 from witgo/SPARK-2098-new and squashes the following commits:

4ef1cbd [GuoQiang Li] review commit
49ef70e [GuoQiang Li] Refactor getDefaultPropertiesFile
c45d20c [GuoQiang Li] All Spark processes should support spark-defaults.conf, 
config file


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/293a0b5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/293a0b5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/293a0b5d

Branch: refs/heads/master
Commit: 293a0b5dbba0474832dc7e9d387f3b10f6c452ea
Parents: 18ab6bd
Author: GuoQiang Li <wi...@qq.com>
Authored: Tue Oct 14 22:16:38 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Tue Oct 14 22:16:38 2014 -0700

----------------------------------------------------------------------
 .../spark/deploy/SparkSubmitArguments.scala     | 42 ++---------------
 .../deploy/SparkSubmitDriverBootstrapper.scala  |  2 +-
 .../deploy/history/HistoryServerArguments.scala | 16 ++++++-
 .../spark/deploy/master/MasterArguments.scala   | 19 ++++++--
 .../spark/deploy/worker/WorkerArguments.scala   | 21 +++++++--
 .../scala/org/apache/spark/util/Utils.scala     | 48 ++++++++++++++++++++
 .../org/apache/spark/util/UtilsSuite.scala      | 19 ++++++++
 docs/monitoring.md                              |  7 +++
 8 files changed, 124 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 57b251f..72a452e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -17,14 +17,11 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
 import java.util.jar.JarFile
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
-import org.apache.spark.SparkException
 import org.apache.spark.util.Utils
 
 /**
@@ -63,9 +60,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], 
env: Map[String, St
     val defaultProperties = new HashMap[String, String]()
     if (verbose) SparkSubmit.printStream.println(s"Using properties file: 
$propertiesFile")
     Option(propertiesFile).foreach { filename =>
-      val file = new File(filename)
-      SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
-        if (k.startsWith("spark")) {
+      Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
+        if (k.startsWith("spark.")) {
           defaultProperties(k) = v
           if (verbose) SparkSubmit.printStream.println(s"Adding default 
property: $k=$v")
         } else {
@@ -90,19 +86,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], 
env: Map[String, St
    */
   private def mergeSparkProperties(): Unit = {
     // Use common defaults file, if not specified by user
-    if (propertiesFile == null) {
-      val sep = File.separator
-      val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => 
s"${sparkHome}${sep}conf")
-      val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig)
-
-      confDir.foreach { sparkConfDir =>
-        val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
-        val file = new File(defaultPath)
-        if (file.exists()) {
-          propertiesFile = file.getAbsolutePath
-        }
-      }
-    }
+    propertiesFile = 
Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
 
     val properties = HashMap[String, String]()
     properties.putAll(defaultSparkProperties)
@@ -397,23 +381,3 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, St
     SparkSubmit.exitFn()
   }
 }
-
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-    require(file.exists(), s"Properties file $file does not exist")
-    require(file.isFile(), s"Properties file $file is not a normal file")
-    val inputStream = new FileInputStream(file)
-    try {
-      val properties = new Properties()
-      properties.load(inputStream)
-      properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
-    } catch {
-      case e: IOException =>
-        val message = s"Failed when loading Spark properties file $file"
-        throw new SparkException(message, e)
-    } finally {
-      inputStream.close()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index a64170a..0125330 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -68,7 +68,7 @@ private[spark] object SparkSubmitDriverBootstrapper {
     assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be 
set")
 
     // Parse the properties file for the equivalent spark.driver.* configs
-    val properties = SparkSubmitArguments.getPropertiesFromFile(new 
File(propertiesFile)).toMap
+    val properties = Utils.getPropertiesFromFile(propertiesFile)
     val confDriverMemory = properties.get("spark.driver.memory")
     val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
     val confClasspath = properties.get("spark.driver.extraClassPath")

http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 25fc76c..5bce32a 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.deploy.history
 
 import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
 
 /**
  * Command-line parser for the master.
  */
 private[spark] class HistoryServerArguments(conf: SparkConf, args: 
Array[String]) {
   private var logDir: String = null
+  private var propertiesFile: String = null
 
   parse(args.toList)
 
@@ -32,11 +34,16 @@ private[spark] class HistoryServerArguments(conf: 
SparkConf, args: Array[String]
       case ("--dir" | "-d") :: value :: tail =>
         logDir = value
         conf.set("spark.history.fs.logDirectory", value)
+        System.setProperty("spark.history.fs.logDirectory", value)
         parse(tail)
 
       case ("--help" | "-h") :: tail =>
         printUsageAndExit(0)
 
+      case ("--properties-file") :: value :: tail =>
+        propertiesFile = value
+        parse(tail)
+
       case Nil =>
 
       case _ =>
@@ -44,10 +51,17 @@ private[spark] class HistoryServerArguments(conf: 
SparkConf, args: Array[String]
     }
   }
 
+   // This mutates the SparkConf, so all accesses to it must be made after 
this line
+   Utils.loadDefaultSparkProperties(conf, propertiesFile)
+
   private def printUsageAndExit(exitCode: Int) {
     System.err.println(
       """
-      |Usage: HistoryServer
+      |Usage: HistoryServer [options]
+      |
+      |Options:
+      |  --properties-file FILE      Path to a custom Spark properties file.
+      |                              Default is conf/spark-defaults.conf.
       |
       |Configuration options can be set by setting the corresponding JVM 
system property.
       |History Server options are always available; additional options depend 
on the provider.

http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 4b0dbbe..e34bee7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -27,6 +27,7 @@ private[spark] class MasterArguments(args: Array[String], 
conf: SparkConf) {
   var host = Utils.localHostName()
   var port = 7077
   var webUiPort = 8080
+  var propertiesFile: String = null
 
   // Check for settings in environment variables
   if (System.getenv("SPARK_MASTER_HOST") != null) {
@@ -38,12 +39,16 @@ private[spark] class MasterArguments(args: Array[String], 
conf: SparkConf) {
   if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
     webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
   }
+
+  parse(args.toList)
+
+  // This mutates the SparkConf, so all accesses to it must be made after this 
line
+  propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
+
   if (conf.contains("spark.master.ui.port")) {
     webUiPort = conf.get("spark.master.ui.port").toInt
   }
 
-  parse(args.toList)
-
   def parse(args: List[String]): Unit = args match {
     case ("--ip" | "-i") :: value :: tail =>
       Utils.checkHost(value, "ip no longer supported, please use hostname " + 
value)
@@ -63,7 +68,11 @@ private[spark] class MasterArguments(args: Array[String], 
conf: SparkConf) {
       webUiPort = value
       parse(tail)
 
-    case ("--help" | "-h") :: tail =>
+    case ("--properties-file") :: value :: tail =>
+      propertiesFile = value
+      parse(tail)
+
+    case ("--help") :: tail =>
       printUsageAndExit(0)
 
     case Nil => {}
@@ -83,7 +92,9 @@ private[spark] class MasterArguments(args: Array[String], 
conf: SparkConf) {
       "  -i HOST, --ip HOST     Hostname to listen on (deprecated, please use 
--host or -h) \n" +
       "  -h HOST, --host HOST   Hostname to listen on\n" +
       "  -p PORT, --port PORT   Port to listen on (default: 7077)\n" +
-      "  --webui-port PORT      Port for web UI (default: 8080)")
+      "  --webui-port PORT      Port for web UI (default: 8080)\n" +
+      "  --properties-file FILE Path to a custom Spark properties file.\n" +
+      "                         Default is conf/spark-defaults.conf.")
     System.exit(exitCode)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 54e3937..019cd70 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -33,6 +33,7 @@ private[spark] class WorkerArguments(args: Array[String], 
conf: SparkConf) {
   var memory = inferDefaultMemory()
   var masters: Array[String] = null
   var workDir: String = null
+  var propertiesFile: String = null
 
   // Check for settings in environment variables
   if (System.getenv("SPARK_WORKER_PORT") != null) {
@@ -47,15 +48,19 @@ private[spark] class WorkerArguments(args: Array[String], 
conf: SparkConf) {
   if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
     webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
   }
-  if (conf.contains("spark.worker.ui.port")) {
-    webUiPort = conf.get("spark.worker.ui.port").toInt
-  }
   if (System.getenv("SPARK_WORKER_DIR") != null) {
     workDir = System.getenv("SPARK_WORKER_DIR")
   }
 
   parse(args.toList)
 
+  // This mutates the SparkConf, so all accesses to it must be made after this 
line
+  propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
+
+  if (conf.contains("spark.worker.ui.port")) {
+    webUiPort = conf.get("spark.worker.ui.port").toInt
+  }
+
   checkWorkerMemory()
 
   def parse(args: List[String]): Unit = args match {
@@ -89,7 +94,11 @@ private[spark] class WorkerArguments(args: Array[String], 
conf: SparkConf) {
       webUiPort = value
       parse(tail)
 
-    case ("--help" | "-h") :: tail =>
+    case ("--properties-file") :: value :: tail =>
+      propertiesFile = value
+      parse(tail)
+
+    case ("--help") :: tail =>
       printUsageAndExit(0)
 
     case value :: tail =>
@@ -124,7 +133,9 @@ private[spark] class WorkerArguments(args: Array[String], 
conf: SparkConf) {
       "  -i HOST, --ip IP         Hostname to listen on (deprecated, please 
use --host or -h)\n" +
       "  -h HOST, --host HOST     Hostname to listen on\n" +
       "  -p PORT, --port PORT     Port to listen on (default: random)\n" +
-      "  --webui-port PORT        Port for web UI (default: 8081)")
+      "  --webui-port PORT        Port for web UI (default: 8081)\n" +
+      "  --properties-file FILE   Path to a custom Spark properties file.\n" +
+      "                           Default is conf/spark-defaults.conf.")
     System.exit(exitCode)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index aad9016..cbc4095 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1410,6 +1410,54 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /**
+   * Load default Spark properties from the given file. If no file is provided,
+   * use the common defaults file. This mutates state in the given SparkConf 
and
+   * in this JVM's system properties if the config specified in the file is not
+   * already set. Return the path of the properties file used.
+   */
+  def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): 
String = {
+    val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
+    Option(path).foreach { confFile =>
+      getPropertiesFromFile(confFile).filter { case (k, v) =>
+        k.startsWith("spark.")
+      }.foreach { case (k, v) =>
+        conf.setIfMissing(k, v)
+        sys.props.getOrElseUpdate(k, v)
+      }
+    }
+    path
+  }
+
+  /** Load properties present in the given file. */
+  def getPropertiesFromFile(filename: String): Map[String, String] = {
+    val file = new File(filename)
+    require(file.exists(), s"Properties file $file does not exist")
+    require(file.isFile(), s"Properties file $file is not a normal file")
+
+    val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
+    try {
+      val properties = new Properties()
+      properties.load(inReader)
+      properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap
+    } catch {
+      case e: IOException =>
+        throw new SparkException(s"Failed when loading Spark properties from 
$filename", e)
+    } finally {
+      inReader.close()
+    }
+  }
+
+  /** Return the path of the default Spark properties file. */
+  def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
+    env.get("SPARK_CONF_DIR")
+      .orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
+      .map { t => new File(s"$t${File.separator}spark-defaults.conf")}
+      .filter(_.isFile)
+      .map(_.getAbsolutePath)
+      .orNull
+  }
+
   /** Return a nice string representation of the exception, including the 
stack trace. */
   def exceptionString(e: Exception): String = {
     if (e == null) "" else exceptionString(getFormattedClassName(e), 
e.getMessage, e.getStackTrace)

http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 0344da6..ea7ef05 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -27,6 +27,8 @@ import com.google.common.base.Charsets
 import com.google.common.io.Files
 import org.scalatest.FunSuite
 
+import org.apache.spark.SparkConf
+
 class UtilsSuite extends FunSuite {
 
   test("bytesToString") {
@@ -332,4 +334,21 @@ class UtilsSuite extends FunSuite {
     assert(!tempFile2.exists())
   }
 
+  test("loading properties from file") {
+    val outFile = File.createTempFile("test-load-spark-properties", "test")
+    try {
+      System.setProperty("spark.test.fileNameLoadB", "2")
+      Files.write("spark.test.fileNameLoadA true\n" +
+        "spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8)
+      val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
+      properties
+        .filter { case (k, v) => k.startsWith("spark.")}
+        .foreach { case (k, v) => sys.props.getOrElseUpdate(k, v)}
+      val sparkConf = new SparkConf
+      assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
+      assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
+    } finally {
+      outFile.delete()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/293a0b5d/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index d07ec4a..e3f81a7 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -78,6 +78,13 @@ follows:
     file system.</td>
   </tr>
   <tr>
+    <td>spark.history.fs.logDirectory</td>
+    <td>(none)</td>
+    <td>
+     Directory that contains application event logs to be loaded by the 
history server
+    </td>
+  </tr>
+  <tr>
     <td>spark.history.fs.updateInterval</td>
     <td>10</td>
     <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to