Repository: spark
Updated Branches:
  refs/heads/master ba4aab9b8 -> c6b8eb71a


[SPARK-18842][TESTS][LAUNCHER] De-duplicate paths in classpaths in commands for 
local-cluster mode to work around the path length limitation on Windows

## What changes were proposed in this pull request?

Currently, some tests are being failed and hanging on Windows due to this 
problem. For the reason in SPARK-18718, some tests using `local-cluster` mode 
were disabled on Windows due to the length limitation by paths given to 
classpaths.

The limitation seems roughly 32K (see the [blog in 
MS](https://blogs.msdn.microsoft.com/oldnewthing/20031210-00/?p=41553/) and 
[another 
reference](https://support.thoughtworks.com/hc/en-us/articles/213248526-Getting-around-maximum-command-line-length-is-32767-characters-on-Windows))
 but in `local-cluster` mode, executors were being launched as processes with 
the command such as 
[here](https://gist.github.com/HyukjinKwon/5bc81061c250d4af5a180869b59d42ea) in 
(only) tests.

This length is roughly 40K due to the classpaths given to `java` command. 
However, it seems duplicates are almost half of them. So, if we deduplicate the 
paths, it seems reduced to roughly 20K with the command, 
[here](https://gist.github.com/HyukjinKwon/dad0c8db897e5e094684a2dc6a417790).

Maybe, we should consider as some more paths are added in the future but it 
seems better than disabling all the tests for now with minimised changes.

Therefore, this PR proposes to deduplicate the paths in classpaths in case of 
launching executors as processes in `local-cluster` mode.

## How was this patch tested?

Existing tests in `ShuffleSuite` and `BroadcastJoinSuite` manually via AppVeyor

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #16266 from HyukjinKwon/disable-local-cluster-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6b8eb71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6b8eb71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6b8eb71

Branch: refs/heads/master
Commit: c6b8eb71a9638c9a8ce02d11d5fe26f4c5be531e
Parents: ba4aab9
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Wed Dec 14 19:24:24 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Dec 14 19:24:24 2016 +0000

----------------------------------------------------------------------
 core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 12 ------------
 .../apache/spark/launcher/AbstractCommandBuilder.java   |  8 +++++---
 project/SparkBuild.scala                                |  3 ++-
 .../spark/sql/execution/joins/BroadcastJoinSuite.scala  | 10 ----------
 4 files changed, 7 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c6b8eb71/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index dc3a28e..e626ed3 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -51,10 +51,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
     assert(valuesFor2.toList.sorted === List(1))
   }
 
-  // Some tests using `local-cluster` here are failed on Windows due to the 
failure of initiating
-  // executors by the path length limitation. See SPARK-18718.
   test("shuffle non-zero block size") {
-    assume(!Utils.isWindows)
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val NUM_BLOCKS = 3
 
@@ -80,7 +77,6 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("shuffle serializer") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val a = sc.parallelize(1 to 10, 2)
@@ -97,7 +93,6 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("zero sized blocks") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
@@ -125,7 +120,6 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("zero sized blocks without kryo") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
@@ -151,7 +145,6 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("shuffle on mutable pairs") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -164,7 +157,6 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("sorting on mutable pairs") {
-    assume(!Utils.isWindows)
     // This is not in SortingSuite because of the local cluster setup.
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
@@ -180,7 +172,6 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("cogroup using mutable pairs") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -208,7 +199,6 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("subtract mutable pairs") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -223,7 +213,6 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("sort with Java non serializable class - Kryo") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     val myConf = conf.clone().set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
     sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
@@ -238,7 +227,6 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("sort with Java non serializable class - Java") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val a = sc.parallelize(1 to 10, 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/c6b8eb71/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index ba43659..0622fef 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -26,9 +26,11 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.spark.launcher.CommandBuilderUtils.*;
@@ -135,7 +137,7 @@ abstract class AbstractCommandBuilder {
   List<String> buildClassPath(String appClassPath) throws IOException {
     String sparkHome = getSparkHome();
 
-    List<String> cp = new ArrayList<>();
+    Set<String> cp = new LinkedHashSet<>();
     addToClassPath(cp, getenv("SPARK_CLASSPATH"));
     addToClassPath(cp, appClassPath);
 
@@ -201,7 +203,7 @@ abstract class AbstractCommandBuilder {
     addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
     addToClassPath(cp, getenv("YARN_CONF_DIR"));
     addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH"));
-    return cp;
+    return new ArrayList<>(cp);
   }
 
   /**
@@ -210,7 +212,7 @@ abstract class AbstractCommandBuilder {
    * @param cp List to which the new entries are appended.
    * @param entries New classpath entries (separated by File.pathSeparator).
    */
-  private void addToClassPath(List<String> cp, String entries) {
+  private void addToClassPath(Set<String> cp, String entries) {
     if (isEmpty(entries)) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c6b8eb71/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index fdc33c7..74edd53 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -824,7 +824,8 @@ object TestSettings {
     // launched by the tests have access to the correct test-time classpath.
     envVars in Test ++= Map(
       "SPARK_DIST_CLASSPATH" ->
-        (fullClasspath in 
Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
+        (fullClasspath in Test).value.files.map(_.getAbsolutePath)
+          .mkString(File.pathSeparator).stripSuffix(File.pathSeparator),
       "SPARK_PREPEND_CLASSES" -> "1",
       "SPARK_SCALA_VERSION" -> scalaBinaryVersion,
       "SPARK_TESTING" -> "1",

http://git-wip-us.apache.org/repos/asf/spark/blob/c6b8eb71/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 0783935..119d6e2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -86,39 +86,31 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
     plan
   }
 
-  // The tests here are failed on Windows due to the failure of initiating 
executors
-  // by the path length limitation. See SPARK-18718.
   test("unsafe broadcast hash join updates peak execution memory") {
-    assume(!Utils.isWindows)
     testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast hash join", 
"inner")
   }
 
   test("unsafe broadcast hash outer join updates peak execution memory") {
-    assume(!Utils.isWindows)
     testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast hash outer 
join", "left_outer")
   }
 
   test("unsafe broadcast left semi join updates peak execution memory") {
-    assume(!Utils.isWindows)
     testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast left semi 
join", "leftsemi")
   }
 
   test("broadcast hint isn't bothered by authBroadcastJoinThreshold set to low 
values") {
-    assume(!Utils.isWindows)
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
       testBroadcastJoin[BroadcastHashJoinExec]("inner", true)
     }
   }
 
   test("broadcast hint isn't bothered by a disabled 
authBroadcastJoinThreshold") {
-    assume(!Utils.isWindows)
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       testBroadcastJoin[BroadcastHashJoinExec]("inner", true)
     }
   }
 
   test("broadcast hint isn't propagated after a join") {
-    assume(!Utils.isWindows)
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value")
       val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", 
"value")
@@ -146,7 +138,6 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
   }
 
   test("broadcast hint is propagated correctly") {
-    assume(!Utils.isWindows)
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, 
"2"))).toDF("key", "value")
       val broadcasted = broadcast(df2)
@@ -167,7 +158,6 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
   }
 
   test("join key rewritten") {
-    assume(!Utils.isWindows)
     val l = Literal(1L)
     val i = Literal(2)
     val s = Literal.create(3, ShortType)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to