Repository: spark
Updated Branches:
  refs/heads/master ccf010f27 -> 658814c89


[SPARK-8129] [CORE] [Sec] Pass auth secrets to executors via env variables

Env variables are not visible to non-Spark users, based on suggestion from 
vanzin.

Author: Kan Zhang <kzh...@apache.org>

Closes #6774 from kanzhang/env and squashes the following commits:

5dd84c6 [Kan Zhang] remove auth secret conf from initial set up for executors
90cb7d2 [Kan Zhang] always filter out auth secret
af4d89d [Kan Zhang] minor refactering
e88993e [Kan Zhang] pass auth secret to executors via env variable


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/658814c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/658814c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/658814c8

Branch: refs/heads/master
Commit: 658814c898bec04c31a8e57f8da0103497aac6ec
Parents: ccf010f
Author: Kan Zhang <kzh...@apache.org>
Authored: Tue Jun 16 08:18:26 2015 +0200
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jun 16 08:18:26 2015 +0200

----------------------------------------------------------------------
 .../org/apache/spark/SecurityManager.scala      | 17 +++++++--
 .../main/scala/org/apache/spark/SparkConf.scala |  2 +-
 .../spark/deploy/worker/CommandUtils.scala      | 16 ++++++--
 .../spark/deploy/worker/DriverRunner.scala      |  4 +-
 .../spark/deploy/worker/ExecutorRunner.scala    |  6 +--
 .../spark/deploy/worker/CommandUtilsSuite.scala | 39 ++++++++++++++++++--
 .../deploy/worker/ExecutorRunnerTest.scala      |  7 ++--
 7 files changed, 72 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/658814c8/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 8aed1e2..673ef49 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -192,7 +192,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
   // key used to store the spark secret in the Hadoop UGI
   private val sparkSecretLookupKey = "sparkCookie"
 
-  private val authOn = sparkConf.getBoolean("spark.authenticate", false)
+  private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, 
false)
   // keep spark.ui.acls.enable for backwards compatibility with 1.0
   private var aclsOn =
     sparkConf.getBoolean("spark.acls.enable", 
sparkConf.getBoolean("spark.ui.acls.enable", false))
@@ -365,10 +365,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
       cookie
     } else {
       // user must have set spark.authenticate.secret config
-      sparkConf.getOption("spark.authenticate.secret") match {
+      // For Master/Worker, auth secret is in conf; for Executors, it is in 
env variable
+      sys.env.get(SecurityManager.ENV_AUTH_SECRET)
+        .orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) 
match {
         case Some(value) => value
         case None => throw new Exception("Error: a secret key must be 
specified via the " +
-          "spark.authenticate.secret config")
+          SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
       }
     }
     sCookie
@@ -449,3 +451,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
   override def getSaslUser(appId: String): String = getSaslUser()
   override def getSecretKey(appId: String): String = getSecretKey()
 }
+
+private[spark] object SecurityManager {
+
+  val SPARK_AUTH_CONF: String = "spark.authenticate"
+  val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
+  // This is used to set auth secret to an executor's env variable. It should 
have the same
+  // value as SPARK_AUTH_SECERET_CONF set in SparkConf
+  val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/658814c8/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 46d7284..6cf36fb 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -557,7 +557,7 @@ private[spark] object SparkConf extends Logging {
   def isExecutorStartupConf(name: String): Boolean = {
     isAkkaConf(name) ||
     name.startsWith("spark.akka") ||
-    name.startsWith("spark.auth") ||
+    (name.startsWith("spark.auth") && name != 
SecurityManager.SPARK_AUTH_SECRET_CONF) ||
     name.startsWith("spark.ssl") ||
     isSparkPortConf(name)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/658814c8/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 0a1d60f..45a3f43 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConversions._
 import scala.collection.Map
 
 import org.apache.spark.Logging
+import org.apache.spark.SecurityManager
 import org.apache.spark.deploy.Command
 import org.apache.spark.launcher.WorkerCommandBuilder
 import org.apache.spark.util.Utils
@@ -40,12 +41,14 @@ object CommandUtils extends Logging {
    */
   def buildProcessBuilder(
       command: Command,
+      securityMgr: SecurityManager,
       memory: Int,
       sparkHome: String,
       substituteArguments: String => String,
       classPaths: Seq[String] = Seq[String](),
       env: Map[String, String] = sys.env): ProcessBuilder = {
-    val localCommand = buildLocalCommand(command, substituteArguments, 
classPaths, env)
+    val localCommand = buildLocalCommand(
+      command, securityMgr, substituteArguments, classPaths, env)
     val commandSeq = buildCommandSeq(localCommand, memory, sparkHome)
     val builder = new ProcessBuilder(commandSeq: _*)
     val environment = builder.environment()
@@ -69,6 +72,7 @@ object CommandUtils extends Logging {
    */
   private def buildLocalCommand(
       command: Command,
+      securityMgr: SecurityManager,
       substituteArguments: String => String,
       classPath: Seq[String] = Seq[String](),
       env: Map[String, String]): Command = {
@@ -76,20 +80,26 @@ object CommandUtils extends Logging {
     val libraryPathEntries = command.libraryPathEntries
     val cmdLibraryPath = command.environment.get(libraryPathName)
 
-    val newEnvironment = if (libraryPathEntries.nonEmpty && 
libraryPathName.nonEmpty) {
+    var newEnvironment = if (libraryPathEntries.nonEmpty && 
libraryPathName.nonEmpty) {
       val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ 
env.get(libraryPathName)
       command.environment + ((libraryPathName, 
libraryPaths.mkString(File.pathSeparator)))
     } else {
       command.environment
     }
 
+    // set auth secret to env variable if needed
+    if (securityMgr.isAuthenticationEnabled) {
+      newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> 
securityMgr.getSecretKey)
+    }
+
     Command(
       command.mainClass,
       command.arguments.map(substituteArguments),
       newEnvironment,
       command.classPathEntries ++ classPath,
       Seq[String](), // library path already captured in environment variable
-      command.javaOpts)
+      // filter out auth secret from java options
+      command.javaOpts.filterNot(_.startsWith("-D" + 
SecurityManager.SPARK_AUTH_SECRET_CONF)))
   }
 
   /** Spawn a thread that will redirect a given stream to a file */

http://git-wip-us.apache.org/repos/asf/spark/blob/658814c8/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index ef7a703..1386055 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -85,8 +85,8 @@ private[deploy] class DriverRunner(
           }
 
           // TODO: If we add ability to submit multiple jars they should also 
be added here
-          val builder = CommandUtils.buildProcessBuilder(driverDesc.command, 
driverDesc.mem,
-            sparkHome.getAbsolutePath, substituteVariables)
+          val builder = CommandUtils.buildProcessBuilder(driverDesc.command, 
securityManager,
+            driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
           launchDriver(builder, driverDir, driverDesc.supervise)
         }
         catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/658814c8/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 7aa85b7..fff17e1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -25,7 +25,7 @@ import akka.actor.ActorRef
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
 
-import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.{SecurityManager, SparkConf, Logging}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
 import org.apache.spark.util.Utils
@@ -125,8 +125,8 @@ private[deploy] class ExecutorRunner(
   private def fetchAndRunExecutor() {
     try {
       // Launch the process
-      val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
-        sparkHome.getAbsolutePath, substituteVariables)
+      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new 
SecurityManager(conf),
+        memory, sparkHome.getAbsolutePath, substituteVariables)
       val command = builder.command()
       logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/658814c8/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala
index 5b3930c..7101cb9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala
@@ -17,21 +17,52 @@
 
 package org.apache.spark.deploy.worker
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.Command
 import org.apache.spark.util.Utils
-import org.scalatest.Matchers
+import org.scalatest.{Matchers, PrivateMethodTester}
 
-class CommandUtilsSuite extends SparkFunSuite with Matchers {
+class CommandUtilsSuite extends SparkFunSuite with Matchers with 
PrivateMethodTester {
 
   test("set libraryPath correctly") {
     val appId = "12345-worker321-9876"
     val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
     val cmd = new Command("mainClass", Seq(), Map(), Seq(), 
Seq("libraryPathToB"), Seq())
-    val builder = CommandUtils.buildProcessBuilder(cmd, 512, sparkHome, t => t)
+    val builder = CommandUtils.buildProcessBuilder(
+      cmd, new SecurityManager(new SparkConf), 512, sparkHome, t => t)
     val libraryPath = Utils.libraryPathEnvName
     val env = builder.environment
     env.keySet should contain(libraryPath)
     assert(env.get(libraryPath).startsWith("libraryPathToB"))
   }
+
+  test("auth secret shouldn't appear in java opts") {
+    val buildLocalCommand = PrivateMethod[Command]('buildLocalCommand)
+    val conf = new SparkConf
+    val secret = "This is the secret sauce"
+    // set auth secret
+    conf.set(SecurityManager.SPARK_AUTH_SECRET_CONF, secret)
+    val command = new Command("mainClass", Seq(), Map(), Seq(), Seq("lib"),
+      Seq("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF + "=" + secret))
+
+    // auth is not set
+    var cmd = CommandUtils invokePrivate buildLocalCommand(
+      command, new SecurityManager(conf), (t: String) => t, Seq(), Map())
+    assert(!cmd.javaOpts.exists(_.startsWith("-D" + 
SecurityManager.SPARK_AUTH_SECRET_CONF)))
+    assert(!cmd.environment.contains(SecurityManager.ENV_AUTH_SECRET))
+
+    // auth is set to false
+    conf.set(SecurityManager.SPARK_AUTH_CONF, "false")
+    cmd = CommandUtils invokePrivate buildLocalCommand(
+      command, new SecurityManager(conf), (t: String) => t, Seq(), Map())
+    assert(!cmd.javaOpts.exists(_.startsWith("-D" + 
SecurityManager.SPARK_AUTH_SECRET_CONF)))
+    assert(!cmd.environment.contains(SecurityManager.ENV_AUTH_SECRET))
+
+    // auth is set to true
+    conf.set(SecurityManager.SPARK_AUTH_CONF, "true")
+    cmd = CommandUtils invokePrivate buildLocalCommand(
+      command, new SecurityManager(conf), (t: String) => t, Seq(), Map())
+    assert(!cmd.javaOpts.exists(_.startsWith("-D" + 
SecurityManager.SPARK_AUTH_SECRET_CONF)))
+    assert(cmd.environment(SecurityManager.ENV_AUTH_SECRET) === secret)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/658814c8/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 3da9927..bed6f3e 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -22,19 +22,20 @@ import java.io.File
 import scala.collection.JavaConversions._
 
 import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 
 class ExecutorRunnerTest extends SparkFunSuite {
   test("command includes appId") {
     val appId = "12345-worker321-9876"
+    val conf = new SparkConf
     val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
     val appDesc = new ApplicationDescription("app name", Some(8), 500,
       Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
     val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", 
"worker321", 123,
-      "publicAddr", new File(sparkHome), new File("ooga"), "blah", new 
SparkConf, Seq("localDir"),
+      "publicAddr", new File(sparkHome), new File("ooga"), "blah", conf, 
Seq("localDir"),
       ExecutorState.RUNNING)
     val builder = CommandUtils.buildProcessBuilder(
-      appDesc.command, 512, sparkHome, er.substituteVariables)
+      appDesc.command, new SecurityManager(conf), 512, sparkHome, 
er.substituteVariables)
     assert(builder.command().last === appId)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to