Repository: spark
Updated Branches:
  refs/heads/master 192e42a29 -> df3d559b3


[SPARK-5801] [core] Avoid creating nested directories.

Cache the value of the local root dirs to use for storing local data,
so that the same directories are reused.

Also, to avoid an extra level of nesting, use a different env variable
to propagate the local dirs from the Worker to the executors. And make
the executor directory use a different name.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #4747 from vanzin/SPARK-5801 and squashes the following commits:

e0114e1 [Marcelo Vanzin] Update unit test.
18ee0a7 [Marcelo Vanzin] [SPARK-5801] [core] Avoid creating nested directories.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df3d559b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df3d559b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df3d559b

Branch: refs/heads/master
Commit: df3d559b32f1ceb8ca3491e2a1169c56a6faab58
Parents: 192e42a
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Thu Feb 26 17:35:03 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Feb 26 17:35:03 2015 +0000

----------------------------------------------------------------------
 .../spark/deploy/worker/ExecutorRunner.scala    |  2 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  4 ++--
 .../scala/org/apache/spark/util/Utils.scala     | 23 ++++++++++++++++++++
 .../apache/spark/storage/LocalDirsSuite.scala   |  8 +++++--
 4 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/df3d559b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index bea04cd..6653aca 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -135,7 +135,7 @@ private[spark] class ExecutorRunner(
       logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
 
       builder.directory(executorDir)
-      builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
+      builder.environment.put("SPARK_EXECUTOR_DIRS", 
appLocalDirs.mkString(File.pathSeparator))
       // In case we are running this from within the Spark Shell, avoid 
creating a "scala"
       // parent process for the executor command
       builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

http://git-wip-us.apache.org/repos/asf/spark/blob/df3d559b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 10929eb..2473a90 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -345,11 +345,11 @@ private[spark] class Worker(
           }
 
           // Create local dirs for the executor. These are passed to the 
executor via the
-          // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker 
when the
+          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the 
Worker when the
           // application finishes.
           val appLocalDirs = appDirectories.get(appId).getOrElse {
             Utils.getOrCreateLocalRootDirs(conf).map { dir =>
-              Utils.createDirectory(dir).getAbsolutePath()
+              Utils.createDirectory(dir, namePrefix = 
"executor").getAbsolutePath()
             }.toSeq
           }
           appDirectories(appId) = appLocalDirs

http://git-wip-us.apache.org/repos/asf/spark/blob/df3d559b/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1396f16..4644088 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -63,6 +63,7 @@ private[spark] object Utils extends Logging {
   val random = new Random()
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
+  @volatile private var localRootDirs: Array[String] = null
 
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
@@ -683,14 +684,31 @@ private[spark] object Utils extends Logging {
    * and returns only the directories that exist / could be created.
    *
    * If no directories could be created, this will return an empty list.
+   *
+   * This method will cache the local directories for the application when 
it's first invoked.
+   * So calling it multiple times with a different configuration will always 
return the same
+   * set of directories.
    */
   private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] 
= {
+    if (localRootDirs == null) {
+      this.synchronized {
+        if (localRootDirs == null) {
+          localRootDirs = getOrCreateLocalRootDirsImpl(conf)
+        }
+      }
+    }
+    localRootDirs
+  }
+
+  private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
     if (isRunningInYarnContainer(conf)) {
       // If we are in yarn mode, systems can have different disk layouts so we 
must set it
       // to what Yarn on this system said was available. Note this assumes 
that Yarn has
       // created the directories already, and that they are secured so that 
only the
       // user has access to them.
       getYarnLocalDirs(conf).split(",")
+    } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
+      conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
     } else {
       // In non-Yarn mode (or for the driver in yarn-client mode), we cannot 
trust the user
       // configuration to point to a secure directory. So create a 
subdirectory with restricted
@@ -734,6 +752,11 @@ private[spark] object Utils extends Logging {
     localDirs
   }
 
+  /** Used by unit tests. Do not call from other places. */
+  private[spark] def clearLocalRootDirs(): Unit = {
+    localRootDirs = null
+  }
+
   /**
    * Shuffle the elements of a collection into a random order, returning the
    * result in a new collection. Unlike scala.util.Random.shuffle, this method

http://git-wip-us.apache.org/repos/asf/spark/blob/df3d559b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
index 8cf951a..82a82e2 100644
--- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
 import java.io.File
 
 import org.apache.spark.util.Utils
-import org.scalatest.FunSuite
+import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.SparkConf
 
@@ -28,7 +28,11 @@ import org.apache.spark.SparkConf
 /**
  * Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
  */
-class LocalDirsSuite extends FunSuite {
+class LocalDirsSuite extends FunSuite with BeforeAndAfter {
+
+  before {
+    Utils.clearLocalRootDirs()
+  }
 
   test("Utils.getLocalDir() returns a valid directory, even if some local dirs 
are missing") {
     // Regression test for SPARK-2974


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to