spark git commit: [SPARK-4808] Removing minimum number of elements read before spill check

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 18fbed5b5 -> 5cea859fd


[SPARK-4808] Removing minimum number of elements read before spill check

In the general case, Spillable's heuristic of checking for memory stress
on every 32nd item after 1000 items are read is good enough. In general,
we do not want to be enacting the spilling checks until later on in the
job; checking for disk-spilling too early can produce unacceptable
performance impact in trivial cases.

However, there are non-trivial cases, particularly if each serialized
object is large, where checking for the necessity to spill too late
would allow the memory to overflow. Consider if every item is 1.5 MB in
size, and the heap size is 1000 MB. Then clearly if we only try to spill
the in-memory contents to disk after 1000 items are read, we would have
already accumulated 1500 MB of RAM and overflowed the heap.

Patch #3656 attempted to circumvent this by checking the need to spill
on every single item read, but that would cause unacceptable performance
in the general case. However, the convoluted cases above should not be
forced to be refactored to shrink the data items. Therefore it makes
sense that the memory spilling thresholds be configurable.

Author: mcheah 

Closes #4420 from mingyukim/memory-spill-configurable and squashes the 
following commits:

6e2509f [mcheah] [SPARK-4808] Removing minimum number of elements read before 
spill check


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cea859f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cea859f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cea859f

Branch: refs/heads/branch-1.2
Commit: 5cea859fd27dc6a216fa9d31d293c93407fbff01
Parents: 18fbed5
Author: mcheah 
Authored: Thu Feb 19 18:09:22 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 18:10:02 2015 -0800

--
 .../scala/org/apache/spark/util/collection/Spillable.scala | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5cea859f/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 9f54312..747ecf0 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -42,9 +42,6 @@ private[spark] trait Spillable[C] extends Logging {
   // Memory manager that can be used to acquire/release memory
   private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
 
-  // Threshold for `elementsRead` before we start tracking this collection's 
memory usage
-  private[this] val trackMemoryThreshold = 1000
-
   // Initial threshold for the size of a collection before we start tracking 
its memory usage
   // Exposed for testing
   private[this] val initialMemoryThreshold: Long =
@@ -72,8 +69,7 @@ private[spark] trait Spillable[C] extends Logging {
* @return true if `collection` was spilled to disk; false otherwise
*/
   protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
-if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
-currentMemory >= myMemoryThreshold) {
+if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
   // Claim up to double our current memory from the shuffle memory pool
   val amountToRequest = 2 * currentMemory - myMemoryThreshold
   val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4808] Removing minimum number of elements read before spill check

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 0cfd2cebd -> 3be92cdac


[SPARK-4808] Removing minimum number of elements read before spill check

In the general case, Spillable's heuristic of checking for memory stress
on every 32nd item after 1000 items are read is good enough. In general,
we do not want to be enacting the spilling checks until later on in the
job; checking for disk-spilling too early can produce unacceptable
performance impact in trivial cases.

However, there are non-trivial cases, particularly if each serialized
object is large, where checking for the necessity to spill too late
would allow the memory to overflow. Consider if every item is 1.5 MB in
size, and the heap size is 1000 MB. Then clearly if we only try to spill
the in-memory contents to disk after 1000 items are read, we would have
already accumulated 1500 MB of RAM and overflowed the heap.

Patch #3656 attempted to circumvent this by checking the need to spill
on every single item read, but that would cause unacceptable performance
in the general case. However, the convoluted cases above should not be
forced to be refactored to shrink the data items. Therefore it makes
sense that the memory spilling thresholds be configurable.

Author: mcheah 

Closes #4420 from mingyukim/memory-spill-configurable and squashes the 
following commits:

6e2509f [mcheah] [SPARK-4808] Removing minimum number of elements read before 
spill check


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3be92cda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3be92cda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3be92cda

Branch: refs/heads/master
Commit: 3be92cdac30cf488e09dbdaaa70e5c4cdaa9a099
Parents: 0cfd2ce
Author: mcheah 
Authored: Thu Feb 19 18:09:22 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 18:09:22 2015 -0800

--
 .../scala/org/apache/spark/util/collection/Spillable.scala | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3be92cda/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 9f54312..747ecf0 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -42,9 +42,6 @@ private[spark] trait Spillable[C] extends Logging {
   // Memory manager that can be used to acquire/release memory
   private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
 
-  // Threshold for `elementsRead` before we start tracking this collection's 
memory usage
-  private[this] val trackMemoryThreshold = 1000
-
   // Initial threshold for the size of a collection before we start tracking 
its memory usage
   // Exposed for testing
   private[this] val initialMemoryThreshold: Long =
@@ -72,8 +69,7 @@ private[spark] trait Spillable[C] extends Logging {
* @return true if `collection` was spilled to disk; false otherwise
*/
   protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
-if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
-currentMemory >= myMemoryThreshold) {
+if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
   // Claim up to double our current memory from the shuffle memory pool
   val amountToRequest = 2 * currentMemory - myMemoryThreshold
   val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4808] Removing minimum number of elements read before spill check

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 ba941ceb1 -> 0382dcc0a


[SPARK-4808] Removing minimum number of elements read before spill check

In the general case, Spillable's heuristic of checking for memory stress
on every 32nd item after 1000 items are read is good enough. In general,
we do not want to be enacting the spilling checks until later on in the
job; checking for disk-spilling too early can produce unacceptable
performance impact in trivial cases.

However, there are non-trivial cases, particularly if each serialized
object is large, where checking for the necessity to spill too late
would allow the memory to overflow. Consider if every item is 1.5 MB in
size, and the heap size is 1000 MB. Then clearly if we only try to spill
the in-memory contents to disk after 1000 items are read, we would have
already accumulated 1500 MB of RAM and overflowed the heap.

Patch #3656 attempted to circumvent this by checking the need to spill
on every single item read, but that would cause unacceptable performance
in the general case. However, the convoluted cases above should not be
forced to be refactored to shrink the data items. Therefore it makes
sense that the memory spilling thresholds be configurable.

Author: mcheah 

Closes #4420 from mingyukim/memory-spill-configurable and squashes the 
following commits:

6e2509f [mcheah] [SPARK-4808] Removing minimum number of elements read before 
spill check

(cherry picked from commit 3be92cdac30cf488e09dbdaaa70e5c4cdaa9a099)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0382dcc0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0382dcc0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0382dcc0

Branch: refs/heads/branch-1.3
Commit: 0382dcc0a94f8e619fd11ec2cc0b18459a690c2b
Parents: ba941ce
Author: mcheah 
Authored: Thu Feb 19 18:09:22 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 18:09:26 2015 -0800

--
 .../scala/org/apache/spark/util/collection/Spillable.scala | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0382dcc0/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 9f54312..747ecf0 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -42,9 +42,6 @@ private[spark] trait Spillable[C] extends Logging {
   // Memory manager that can be used to acquire/release memory
   private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
 
-  // Threshold for `elementsRead` before we start tracking this collection's 
memory usage
-  private[this] val trackMemoryThreshold = 1000
-
   // Initial threshold for the size of a collection before we start tracking 
its memory usage
   // Exposed for testing
   private[this] val initialMemoryThreshold: Long =
@@ -72,8 +69,7 @@ private[spark] trait Spillable[C] extends Logging {
* @return true if `collection` was spilled to disk; false otherwise
*/
   protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
-if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
-currentMemory >= myMemoryThreshold) {
+if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
   // Claim up to double our current memory from the shuffle memory pool
   val amountToRequest = 2 * currentMemory - myMemoryThreshold
   val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5900][MLLIB] make PIC and FPGrowth Java-friendly

2015-02-19 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 c5f3b9e02 -> ba941ceb1


[SPARK-5900][MLLIB] make PIC and FPGrowth Java-friendly

In the previous version, PIC stores clustering assignments as an `RDD[(Long, 
Int)]`. This is mapped to `RDD>` in Java and hence Java 
users have to cast types manually. We should either create a new method called 
`javaAssignments` that returns `JavaRDD[(java.lang.Long, java.lang.Int)]` or 
wrap the result pair in a class. I chose the latter approach in this PR. Now 
assignments are stored as an `RDD[Assignment]`, where `Assignment` is a class 
with `id` and `cluster`.

Similarly, in FPGrowth, the frequent itemsets are stored as an 
`RDD[(Array[Item], Long)]`, which is mapped to `RDD>`. 
Though we provide a "Java-friendly" method `javaFreqItemsets` that returns 
`JavaRDD[(Array[Item], java.lang.Long)]`. It doesn't really work because 
`Array[Item]` is mapped to `Object` in Java. So in this PR I created a class 
`FreqItemset` to wrap the results. It has `items` and `freq`, as well as a 
`javaItems` method that returns `List` in Java.

I'm not certain that the names I chose are proper: `Assignment`/`id`/`cluster` 
and `FreqItemset`/`items`/`freq`. Please let me know if there are better 
suggestions.

CC: jkbradley

Author: Xiangrui Meng 

Closes #4695 from mengxr/SPARK-5900 and squashes the following commits:

865b5ca [Xiangrui Meng] make Assignment serializable
cffa96e [Xiangrui Meng] fix test
9c0e590 [Xiangrui Meng] remove unused Tuple2
1b9db3d [Xiangrui Meng] make PIC and FPGrowth Java-friendly

(cherry picked from commit 0cfd2cebde0b7fac3779eda80d6e42223f8a3d9f)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba941ceb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba941ceb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba941ceb

Branch: refs/heads/branch-1.3
Commit: ba941ceb1f78b28ca5cfb18c770f4171b9c74b0a
Parents: c5f3b9e
Author: Xiangrui Meng 
Authored: Thu Feb 19 18:06:16 2015 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 19 18:06:26 2015 -0800

--
 docs/mllib-clustering.md|  8 ++--
 docs/mllib-frequent-pattern-mining.md   | 12 +++---
 .../examples/mllib/JavaFPGrowthExample.java |  8 ++--
 .../JavaPowerIterationClusteringExample.java|  5 +--
 .../spark/examples/mllib/FPGrowthExample.scala  |  4 +-
 .../mllib/PowerIterationClusteringExample.scala |  8 +---
 .../clustering/PowerIterationClustering.scala   | 33 +---
 .../org/apache/spark/mllib/fpm/FPGrowth.scala   | 41 ++--
 .../spark/mllib/fpm/JavaFPGrowthSuite.java  | 30 +-
 .../PowerIterationClusteringSuite.scala |  8 ++--
 .../apache/spark/mllib/fpm/FPGrowthSuite.scala  | 10 ++---
 11 files changed, 93 insertions(+), 74 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ba941ceb/docs/mllib-clustering.md
--
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 6e46a47..0b6db4f 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -314,8 +314,8 @@ val pic = new PowerIteartionClustering()
   .setMaxIterations(20)
 val model = pic.run(similarities)
 
-model.assignments.foreach { case (vertexId, clusterId) =>
-  println(s"$vertexId -> $clusterId")
+model.assignments.foreach { a =>
+  println(s"${a.id} -> ${a.cluster}")
 }
 {% endhighlight %}
 
@@ -349,8 +349,8 @@ PowerIterationClustering pic = new 
PowerIterationClustering()
   .setMaxIterations(10);
 PowerIterationClusteringModel model = pic.run(similarities);
 
-for (Tuple2 assignment: 
model.assignments().toJavaRDD().collect()) {
-  System.out.println(assignment._1() + " -> " + assignment._2());
+for (PowerIterationClustering.Assignment a: 
model.assignments().toJavaRDD().collect()) {
+  System.out.println(a.id() + " -> " + a.cluster());
 }
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba941ceb/docs/mllib-frequent-pattern-mining.md
--
diff --git a/docs/mllib-frequent-pattern-mining.md 
b/docs/mllib-frequent-pattern-mining.md
index 0ff9738..9fd9be0 100644
--- a/docs/mllib-frequent-pattern-mining.md
+++ b/docs/mllib-frequent-pattern-mining.md
@@ -57,8 +57,8 @@ val fpg = new FPGrowth()
   .setNumPartitions(10)
 val model = fpg.run(transactions)
 
-model.freqItemsets.collect().foreach { case (itemset, freq) =>
-  println(itemset.mkString("[", ",", "]") + ", " + freq)
+model.freqItemsets.collect().foreach { itemset =>
+  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
 }
 {% endhighlight %}
 
@@ -74,10 +74,9 @@ Calling `FPGrowth.run` with transactions returns an
 that stores the frequent 

spark git commit: [SPARK-5900][MLLIB] make PIC and FPGrowth Java-friendly

2015-02-19 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 6bddc4035 -> 0cfd2cebd


[SPARK-5900][MLLIB] make PIC and FPGrowth Java-friendly

In the previous version, PIC stores clustering assignments as an `RDD[(Long, 
Int)]`. This is mapped to `RDD>` in Java and hence Java 
users have to cast types manually. We should either create a new method called 
`javaAssignments` that returns `JavaRDD[(java.lang.Long, java.lang.Int)]` or 
wrap the result pair in a class. I chose the latter approach in this PR. Now 
assignments are stored as an `RDD[Assignment]`, where `Assignment` is a class 
with `id` and `cluster`.

Similarly, in FPGrowth, the frequent itemsets are stored as an 
`RDD[(Array[Item], Long)]`, which is mapped to `RDD>`. 
Though we provide a "Java-friendly" method `javaFreqItemsets` that returns 
`JavaRDD[(Array[Item], java.lang.Long)]`. It doesn't really work because 
`Array[Item]` is mapped to `Object` in Java. So in this PR I created a class 
`FreqItemset` to wrap the results. It has `items` and `freq`, as well as a 
`javaItems` method that returns `List` in Java.

I'm not certain that the names I chose are proper: `Assignment`/`id`/`cluster` 
and `FreqItemset`/`items`/`freq`. Please let me know if there are better 
suggestions.

CC: jkbradley

Author: Xiangrui Meng 

Closes #4695 from mengxr/SPARK-5900 and squashes the following commits:

865b5ca [Xiangrui Meng] make Assignment serializable
cffa96e [Xiangrui Meng] fix test
9c0e590 [Xiangrui Meng] remove unused Tuple2
1b9db3d [Xiangrui Meng] make PIC and FPGrowth Java-friendly


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cfd2ceb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cfd2ceb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cfd2ceb

Branch: refs/heads/master
Commit: 0cfd2cebde0b7fac3779eda80d6e42223f8a3d9f
Parents: 6bddc40
Author: Xiangrui Meng 
Authored: Thu Feb 19 18:06:16 2015 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 19 18:06:16 2015 -0800

--
 docs/mllib-clustering.md|  8 ++--
 docs/mllib-frequent-pattern-mining.md   | 12 +++---
 .../examples/mllib/JavaFPGrowthExample.java |  8 ++--
 .../JavaPowerIterationClusteringExample.java|  5 +--
 .../spark/examples/mllib/FPGrowthExample.scala  |  4 +-
 .../mllib/PowerIterationClusteringExample.scala |  8 +---
 .../clustering/PowerIterationClustering.scala   | 33 +---
 .../org/apache/spark/mllib/fpm/FPGrowth.scala   | 41 ++--
 .../spark/mllib/fpm/JavaFPGrowthSuite.java  | 30 +-
 .../PowerIterationClusteringSuite.scala |  8 ++--
 .../apache/spark/mllib/fpm/FPGrowthSuite.scala  | 10 ++---
 11 files changed, 93 insertions(+), 74 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd2ceb/docs/mllib-clustering.md
--
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 6e46a47..0b6db4f 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -314,8 +314,8 @@ val pic = new PowerIteartionClustering()
   .setMaxIterations(20)
 val model = pic.run(similarities)
 
-model.assignments.foreach { case (vertexId, clusterId) =>
-  println(s"$vertexId -> $clusterId")
+model.assignments.foreach { a =>
+  println(s"${a.id} -> ${a.cluster}")
 }
 {% endhighlight %}
 
@@ -349,8 +349,8 @@ PowerIterationClustering pic = new 
PowerIterationClustering()
   .setMaxIterations(10);
 PowerIterationClusteringModel model = pic.run(similarities);
 
-for (Tuple2 assignment: 
model.assignments().toJavaRDD().collect()) {
-  System.out.println(assignment._1() + " -> " + assignment._2());
+for (PowerIterationClustering.Assignment a: 
model.assignments().toJavaRDD().collect()) {
+  System.out.println(a.id() + " -> " + a.cluster());
 }
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd2ceb/docs/mllib-frequent-pattern-mining.md
--
diff --git a/docs/mllib-frequent-pattern-mining.md 
b/docs/mllib-frequent-pattern-mining.md
index 0ff9738..9fd9be0 100644
--- a/docs/mllib-frequent-pattern-mining.md
+++ b/docs/mllib-frequent-pattern-mining.md
@@ -57,8 +57,8 @@ val fpg = new FPGrowth()
   .setNumPartitions(10)
 val model = fpg.run(transactions)
 
-model.freqItemsets.collect().foreach { case (itemset, freq) =>
-  println(itemset.mkString("[", ",", "]") + ", " + freq)
+model.freqItemsets.collect().foreach { itemset =>
+  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
 }
 {% endhighlight %}
 
@@ -74,10 +74,9 @@ Calling `FPGrowth.run` with transactions returns an
 that stores the frequent itemsets with their frequencies.
 
 {% highlight java %}
-import java.util.Arrays;
 import java.util.List;
 

spark git commit: SPARK-5570: No docs stating that `new SparkConf().set("spark.driver.memory", ...) will not work

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 34b7c3538 -> 6bddc4035


SPARK-5570: No docs stating that `new SparkConf().set("spark.driver.memory", 
...) will not work

I've updated documentation to reflect true behavior of this setting in client 
vs. cluster mode.

Author: Ilya Ganelin 

Closes #4665 from ilganeli/SPARK-5570 and squashes the following commits:

5d1c8dd [Ilya Ganelin] Added example configuration code
a51700a [Ilya Ganelin] Getting rid of extra spaces
85f7a08 [Ilya Ganelin] Reworded note
5889d43 [Ilya Ganelin] Formatting adjustment
f149ba1 [Ilya Ganelin] Minor updates
1fec7a5 [Ilya Ganelin] Updated to add clarification for other driver properties
db47595 [Ilya Ganelin] Slight formatting update
c899564 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into 
SPARK-5570
17b751d [Ilya Ganelin] Updated documentation for driver-memory to reflect its 
true behavior in client vs cluster mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6bddc403
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6bddc403
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6bddc403

Branch: refs/heads/master
Commit: 6bddc40353057a562c78e75c5549c79a0d7d5f8b
Parents: 34b7c35
Author: Ilya Ganelin 
Authored: Thu Feb 19 15:50:58 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 15:53:20 2015 -0800

--
 docs/configuration.md | 23 ++-
 1 file changed, 22 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6bddc403/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index eb0d6d3..541695c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -115,7 +115,11 @@ of the most common options to set are:
   
 Amount of memory to use for the driver process, i.e. where SparkContext is 
initialized.
 (e.g. 512m, 2g).
-  
+
+Note: In client mode, this config must not be set through 
the SparkConf
+directly in your application, because the driver JVM has already started 
at that point.
+Instead, please set this through the --driver-memory command 
line option
+or in your default properties file.
 
 
   spark.executor.memory
@@ -214,6 +218,11 @@ Apart from these, the following properties are also 
available, and may be useful
   (none)
   
 A string of extra JVM options to pass to the driver. For instance, GC 
settings or other logging.
+
+Note: In client mode, this config must not be set through 
the SparkConf
+directly in your application, because the driver JVM has already started 
at that point.
+Instead, please set this through the --driver-java-options 
command line option or in 
+your default properties file.
   
 
 
@@ -221,6 +230,11 @@ Apart from these, the following properties are also 
available, and may be useful
   (none)
   
 Extra classpath entries to append to the classpath of the driver.
+
+Note: In client mode, this config must not be set through 
the SparkConf
+directly in your application, because the driver JVM has already started 
at that point.
+Instead, please set this through the --driver-class-path 
command line option or in 
+your default properties file.
   
 
 
@@ -228,6 +242,11 @@ Apart from these, the following properties are also 
available, and may be useful
   (none)
   
 Set a special library path to use when launching the driver JVM.
+
+Note: In client mode, this config must not be set through 
the SparkConf
+directly in your application, because the driver JVM has already started 
at that point.
+Instead, please set this through the --driver-library-path 
command line option or in 
+your default properties file.
   
 
 
@@ -237,6 +256,8 @@ Apart from these, the following properties are also 
available, and may be useful
 (Experimental) Whether to give user-added jars precedence over Spark's own 
jars when loading
 classes in the the driver. This feature can be used to mitigate conflicts 
between Spark's
 dependencies and user dependencies. It is currently an experimental 
feature.
+
+This is used in cluster mode only.
   
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-5570: No docs stating that `new SparkConf().set("spark.driver.memory", ...) will not work

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 bd49e8b96 -> c5f3b9e02


SPARK-5570: No docs stating that `new SparkConf().set("spark.driver.memory", 
...) will not work

I've updated documentation to reflect true behavior of this setting in client 
vs. cluster mode.

Author: Ilya Ganelin 

Closes #4665 from ilganeli/SPARK-5570 and squashes the following commits:

5d1c8dd [Ilya Ganelin] Added example configuration code
a51700a [Ilya Ganelin] Getting rid of extra spaces
85f7a08 [Ilya Ganelin] Reworded note
5889d43 [Ilya Ganelin] Formatting adjustment
f149ba1 [Ilya Ganelin] Minor updates
1fec7a5 [Ilya Ganelin] Updated to add clarification for other driver properties
db47595 [Ilya Ganelin] Slight formatting update
c899564 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into 
SPARK-5570
17b751d [Ilya Ganelin] Updated documentation for driver-memory to reflect its 
true behavior in client vs cluster mode

(cherry picked from commit 6bddc40353057a562c78e75c5549c79a0d7d5f8b)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5f3b9e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5f3b9e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5f3b9e0

Branch: refs/heads/branch-1.3
Commit: c5f3b9e02f8b1d1b09d4309df9a2c8633da82910
Parents: bd49e8b
Author: Ilya Ganelin 
Authored: Thu Feb 19 15:50:58 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 15:53:33 2015 -0800

--
 docs/configuration.md | 23 ++-
 1 file changed, 22 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c5f3b9e0/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index eb0d6d3..541695c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -115,7 +115,11 @@ of the most common options to set are:
   
 Amount of memory to use for the driver process, i.e. where SparkContext is 
initialized.
 (e.g. 512m, 2g).
-  
+
+Note: In client mode, this config must not be set through 
the SparkConf
+directly in your application, because the driver JVM has already started 
at that point.
+Instead, please set this through the --driver-memory command 
line option
+or in your default properties file.
 
 
   spark.executor.memory
@@ -214,6 +218,11 @@ Apart from these, the following properties are also 
available, and may be useful
   (none)
   
 A string of extra JVM options to pass to the driver. For instance, GC 
settings or other logging.
+
+Note: In client mode, this config must not be set through 
the SparkConf
+directly in your application, because the driver JVM has already started 
at that point.
+Instead, please set this through the --driver-java-options 
command line option or in 
+your default properties file.
   
 
 
@@ -221,6 +230,11 @@ Apart from these, the following properties are also 
available, and may be useful
   (none)
   
 Extra classpath entries to append to the classpath of the driver.
+
+Note: In client mode, this config must not be set through 
the SparkConf
+directly in your application, because the driver JVM has already started 
at that point.
+Instead, please set this through the --driver-class-path 
command line option or in 
+your default properties file.
   
 
 
@@ -228,6 +242,11 @@ Apart from these, the following properties are also 
available, and may be useful
   (none)
   
 Set a special library path to use when launching the driver JVM.
+
+Note: In client mode, this config must not be set through 
the SparkConf
+directly in your application, because the driver JVM has already started 
at that point.
+Instead, please set this through the --driver-library-path 
command line option or in 
+your default properties file.
   
 
 
@@ -237,6 +256,8 @@ Apart from these, the following properties are also 
available, and may be useful
 (Experimental) Whether to give user-added jars precedence over Spark's own 
jars when loading
 classes in the the driver. This feature can be used to mitigate conflicts 
between Spark's
 dependencies and user dependencies. It is currently an experimental 
feature.
+
+This is used in cluster mode only.
   
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: SPARK-4682 [CORE] Consolidate various 'Clock' classes

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 ff8976ec7 -> bd49e8b96


http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 132ff24..818f551 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.storage._
 import org.apache.spark.streaming.receiver._
 import org.apache.spark.streaming.util._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{AkkaUtils, ManualClock}
 import WriteAheadLogBasedBlockHandler._
 import WriteAheadLogSuite._
 
@@ -165,7 +165,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with 
BeforeAndAfter with Matche
   preCleanupLogFiles.size should be > 1
 
   // this depends on the number of blocks inserted using 
generateAndStoreData()
-  manualClock.currentTime() shouldEqual 5000L
+  manualClock.getTimeMillis() shouldEqual 5000L
 
   val cleanupThreshTime = 3000L
   handler.cleanupOldBlocks(cleanupThreshTime)
@@ -243,7 +243,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with 
BeforeAndAfter with Matche
 val blockIds = Seq.fill(blocks.size)(generateBlockId())
 val storeResults = blocks.zip(blockIds).map {
   case (block, id) =>
-manualClock.addToTime(500) // log rolling interval set to 1000 ms 
through SparkConf
+manualClock.advance(500) // log rolling interval set to 1000 ms 
through SparkConf
 logDebug("Inserting block " + id)
 receivedBlockHandler.storeBlock(id, block)
 }.toList

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index fbb7b0b..a3a0fd5 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -34,9 +34,9 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock, 
WriteAheadLogReader}
+import org.apache.spark.streaming.util.WriteAheadLogReader
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
 
 class ReceivedBlockTrackerSuite
   extends FunSuite with BeforeAndAfter with Matchers with Logging {
@@ -100,7 +100,7 @@ class ReceivedBlockTrackerSuite
 
 def incrementTime() {
   val timeIncrementMillis = 2000L
-  manualClock.addToTime(timeIncrementMillis)
+  manualClock.advance(timeIncrementMillis)
 }
 
 // Generate and add blocks to the given tracker
@@ -138,13 +138,13 @@ class ReceivedBlockTrackerSuite
 tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
 
 // Allocate blocks to batch and verify whether the unallocated blocks got 
allocated
-val batchTime1 = manualClock.currentTime
+val batchTime1 = manualClock.getTimeMillis()
 tracker2.allocateBlocksToBatch(batchTime1)
 tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual 
blockInfos1
 
 // Add more blocks and allocate to another batch
 incrementTime()
-val batchTime2 = manualClock.currentTime
+val batchTime2 = manualClock.getTimeMillis()
 val blockInfos2 = addBlockInfos(tracker2)
 tracker2.allocateBlocksToBatch(batchTime2)
 tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual 
blockInfos2

http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 7d82c3e..c2375ff 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -31,10 +31,9 @@ import org.scalatest.concurre

[1/2] spark git commit: SPARK-4682 [CORE] Consolidate various 'Clock' classes

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master ad6b169de -> 34b7c3538


http://git-wip-us.apache.org/repos/asf/spark/blob/34b7c353/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 132ff24..818f551 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.storage._
 import org.apache.spark.streaming.receiver._
 import org.apache.spark.streaming.util._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{AkkaUtils, ManualClock}
 import WriteAheadLogBasedBlockHandler._
 import WriteAheadLogSuite._
 
@@ -165,7 +165,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with 
BeforeAndAfter with Matche
   preCleanupLogFiles.size should be > 1
 
   // this depends on the number of blocks inserted using 
generateAndStoreData()
-  manualClock.currentTime() shouldEqual 5000L
+  manualClock.getTimeMillis() shouldEqual 5000L
 
   val cleanupThreshTime = 3000L
   handler.cleanupOldBlocks(cleanupThreshTime)
@@ -243,7 +243,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with 
BeforeAndAfter with Matche
 val blockIds = Seq.fill(blocks.size)(generateBlockId())
 val storeResults = blocks.zip(blockIds).map {
   case (block, id) =>
-manualClock.addToTime(500) // log rolling interval set to 1000 ms 
through SparkConf
+manualClock.advance(500) // log rolling interval set to 1000 ms 
through SparkConf
 logDebug("Inserting block " + id)
 receivedBlockHandler.storeBlock(id, block)
 }.toList

http://git-wip-us.apache.org/repos/asf/spark/blob/34b7c353/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index fbb7b0b..a3a0fd5 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -34,9 +34,9 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock, 
WriteAheadLogReader}
+import org.apache.spark.streaming.util.WriteAheadLogReader
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
 
 class ReceivedBlockTrackerSuite
   extends FunSuite with BeforeAndAfter with Matchers with Logging {
@@ -100,7 +100,7 @@ class ReceivedBlockTrackerSuite
 
 def incrementTime() {
   val timeIncrementMillis = 2000L
-  manualClock.addToTime(timeIncrementMillis)
+  manualClock.advance(timeIncrementMillis)
 }
 
 // Generate and add blocks to the given tracker
@@ -138,13 +138,13 @@ class ReceivedBlockTrackerSuite
 tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
 
 // Allocate blocks to batch and verify whether the unallocated blocks got 
allocated
-val batchTime1 = manualClock.currentTime
+val batchTime1 = manualClock.getTimeMillis()
 tracker2.allocateBlocksToBatch(batchTime1)
 tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual 
blockInfos1
 
 // Add more blocks and allocate to another batch
 incrementTime()
-val batchTime2 = manualClock.currentTime
+val batchTime2 = manualClock.getTimeMillis()
 val blockInfos2 = addBlockInfos(tracker2)
 tracker2.allocateBlocksToBatch(batchTime2)
 tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual 
blockInfos2

http://git-wip-us.apache.org/repos/asf/spark/blob/34b7c353/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 7d82c3e..c2375ff 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -31,10 +31,9 @@ import org.scalatest.concurrent.P

[2/2] spark git commit: SPARK-4682 [CORE] Consolidate various 'Clock' classes

2015-02-19 Thread andrewor14
SPARK-4682 [CORE] Consolidate various 'Clock' classes

Another one from JoshRosen 's wish list. The first commit is much smaller and 
removes 2 of the 4 Clock classes. The second is much larger, necessary for 
consolidating the streaming one. I put together implementations in the way that 
seemed simplest. Almost all the change is standardizing class and method names.

Author: Sean Owen 

Closes #4514 from srowen/SPARK-4682 and squashes the following commits:

5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark]
169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock 
class names
277785a [Sean Owen] Reduce the net change in this patch by reversing some 
unnecessary syntax changes along the way
b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis()
160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock
7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock

(cherry picked from commit 34b7c35380c88569a1396fb4ed991a0bed4288e7)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd49e8b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd49e8b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd49e8b9

Branch: refs/heads/branch-1.3
Commit: bd49e8b962b397b8fb8b22f980739021cf1a195e
Parents: ff8976e
Author: Sean Owen 
Authored: Thu Feb 19 15:35:23 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 15:35:31 2015 -0800

--
 .../spark/ExecutorAllocationManager.scala   | 28 +-
 .../spark/deploy/worker/DriverRunner.scala  | 17 ++--
 .../apache/spark/scheduler/DAGScheduler.scala   | 20 ++---
 .../apache/spark/scheduler/TaskSetManager.scala | 16 ++--
 .../scala/org/apache/spark/util/Clock.scala | 44 +-
 .../org/apache/spark/util/ManualClock.scala | 69 +++
 .../spark/ExecutorAllocationManagerSuite.scala  | 65 +++---
 .../spark/deploy/worker/DriverRunnerTest.scala  |  3 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 36 
 .../scala/org/apache/spark/util/FakeClock.scala | 26 --
 .../streaming/LocalJavaStreamingContext.java|  2 +-
 .../flume/FlumePollingStreamSuite.scala |  7 +-
 .../streaming/LocalJavaStreamingContext.java|  2 +-
 .../streaming/LocalJavaStreamingContext.java|  2 +-
 .../streaming/LocalJavaStreamingContext.java|  2 +-
 .../kinesis/KinesisCheckpointState.scala| 10 +--
 .../kinesis/KinesisRecordProcessor.scala|  2 +-
 .../kinesis/KinesisReceiverSuite.scala  | 25 +++---
 .../JavaStreamingLogisticRegressionSuite.java   |  2 +-
 .../JavaStreamingLinearRegressionSuite.java |  2 +-
 project/MimaExcludes.scala  |  5 ++
 .../streaming/dstream/FileInputDStream.scala|  6 +-
 .../streaming/receiver/BlockGenerator.scala |  3 +-
 .../receiver/ReceivedBlockHandler.scala |  4 +-
 .../streaming/scheduler/JobGenerator.scala  | 13 ++-
 .../scheduler/ReceivedBlockTracker.scala|  6 +-
 .../org/apache/spark/streaming/util/Clock.scala | 89 
 .../spark/streaming/util/RecurringTimer.scala   |  5 +-
 .../streaming/util/WriteAheadLogManager.scala   |  5 +-
 .../streaming/LocalJavaStreamingContext.java|  2 +-
 .../spark/streaming/BasicOperationsSuite.scala  |  9 +-
 .../spark/streaming/CheckpointSuite.scala   | 33 
 .../spark/streaming/InputStreamsSuite.scala | 37 
 .../streaming/ReceivedBlockHandlerSuite.scala   |  6 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   | 10 +--
 .../apache/spark/streaming/TestSuiteBase.scala  | 15 ++--
 .../streaming/util/WriteAheadLogSuite.scala | 10 +--
 37 files changed, 301 insertions(+), 337 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 998695b..21c6e6f 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import scala.collection.mutable
 
 import org.apache.spark.scheduler._
+import org.apache.spark.util.{SystemClock, Clock}
 
 /**
  * An agent that dynamically allocates and removes executors based on the 
workload.
@@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
   private val intervalMillis: Long = 100
 
   // Clock used to schedule when executors should be added and removed
-  private var clock: Clock = new RealClock
+  private var clock: Clock = new S

[2/2] spark git commit: SPARK-4682 [CORE] Consolidate various 'Clock' classes

2015-02-19 Thread andrewor14
SPARK-4682 [CORE] Consolidate various 'Clock' classes

Another one from JoshRosen 's wish list. The first commit is much smaller and 
removes 2 of the 4 Clock classes. The second is much larger, necessary for 
consolidating the streaming one. I put together implementations in the way that 
seemed simplest. Almost all the change is standardizing class and method names.

Author: Sean Owen 

Closes #4514 from srowen/SPARK-4682 and squashes the following commits:

5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark]
169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock 
class names
277785a [Sean Owen] Reduce the net change in this patch by reversing some 
unnecessary syntax changes along the way
b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis()
160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock
7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34b7c353
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34b7c353
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34b7c353

Branch: refs/heads/master
Commit: 34b7c35380c88569a1396fb4ed991a0bed4288e7
Parents: ad6b169
Author: Sean Owen 
Authored: Thu Feb 19 15:35:23 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 15:35:23 2015 -0800

--
 .../spark/ExecutorAllocationManager.scala   | 28 +-
 .../spark/deploy/worker/DriverRunner.scala  | 17 ++--
 .../apache/spark/scheduler/DAGScheduler.scala   | 20 ++---
 .../apache/spark/scheduler/TaskSetManager.scala | 16 ++--
 .../scala/org/apache/spark/util/Clock.scala | 44 +-
 .../org/apache/spark/util/ManualClock.scala | 69 +++
 .../spark/ExecutorAllocationManagerSuite.scala  | 65 +++---
 .../spark/deploy/worker/DriverRunnerTest.scala  |  3 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 36 
 .../scala/org/apache/spark/util/FakeClock.scala | 26 --
 .../streaming/LocalJavaStreamingContext.java|  2 +-
 .../flume/FlumePollingStreamSuite.scala |  7 +-
 .../streaming/LocalJavaStreamingContext.java|  2 +-
 .../streaming/LocalJavaStreamingContext.java|  2 +-
 .../streaming/LocalJavaStreamingContext.java|  2 +-
 .../kinesis/KinesisCheckpointState.scala| 10 +--
 .../kinesis/KinesisRecordProcessor.scala|  2 +-
 .../kinesis/KinesisReceiverSuite.scala  | 25 +++---
 .../JavaStreamingLogisticRegressionSuite.java   |  2 +-
 .../JavaStreamingLinearRegressionSuite.java |  2 +-
 project/MimaExcludes.scala  |  5 ++
 .../streaming/dstream/FileInputDStream.scala|  6 +-
 .../streaming/receiver/BlockGenerator.scala |  3 +-
 .../receiver/ReceivedBlockHandler.scala |  4 +-
 .../streaming/scheduler/JobGenerator.scala  | 13 ++-
 .../scheduler/ReceivedBlockTracker.scala|  6 +-
 .../org/apache/spark/streaming/util/Clock.scala | 89 
 .../spark/streaming/util/RecurringTimer.scala   |  5 +-
 .../streaming/util/WriteAheadLogManager.scala   |  5 +-
 .../streaming/LocalJavaStreamingContext.java|  2 +-
 .../spark/streaming/BasicOperationsSuite.scala  |  9 +-
 .../spark/streaming/CheckpointSuite.scala   | 33 
 .../spark/streaming/InputStreamsSuite.scala | 37 
 .../streaming/ReceivedBlockHandlerSuite.scala   |  6 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   | 10 +--
 .../apache/spark/streaming/TestSuiteBase.scala  | 15 ++--
 .../streaming/util/WriteAheadLogSuite.scala | 10 +--
 37 files changed, 301 insertions(+), 337 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/34b7c353/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 998695b..21c6e6f 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import scala.collection.mutable
 
 import org.apache.spark.scheduler._
+import org.apache.spark.util.{SystemClock, Clock}
 
 /**
  * An agent that dynamically allocates and removes executors based on the 
workload.
@@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
   private val intervalMillis: Long = 100
 
   // Clock used to schedule when executors should be added and removed
-  private var clock: Clock = new RealClock
+  private var clock: Clock = new SystemClock()
 
   // Listener for Spark events that impact the allocation policy
   private val list

spark git commit: [Spark-5889] Remove pid file after stopping service.

2015-02-19 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 856fdcb65 -> 18fbed5b5


[Spark-5889] Remove pid file after stopping service.

Currently the pid file is not deleted, and potentially may cause some problem 
after service is stopped. The fix remove the pid file after service stopped.

Author: Zhan Zhang 

Closes #4676 from zhzhan/spark-5889 and squashes the following commits:

eb01be1 [Zhan Zhang] solve review comments
b4c009e [Zhan Zhang] solve review comments
018110a [Zhan Zhang] spark-5889: remove pid file after stopping service
088d2a2 [Zhan Zhang] squash all commits
c1f1fa5 [Zhan Zhang] test

(cherry picked from commit ad6b169dee84df175b51933b7a3ad7f0bbc52cf3)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18fbed5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18fbed5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18fbed5b

Branch: refs/heads/branch-1.2
Commit: 18fbed5b54818a84671b726ae952736f3251f9ff
Parents: 856fdcb
Author: Zhan Zhang 
Authored: Thu Feb 19 23:13:02 2015 +
Committer: Sean Owen 
Committed: Thu Feb 19 23:14:56 2015 +

--
 sbin/spark-daemon.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/18fbed5b/sbin/spark-daemon.sh
--
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index e1bcc7d..5e812a1 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -168,7 +168,7 @@ case $option in
   TARGET_ID="$(cat "$pid")"
   if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
 echo "stopping $command"
-kill "$TARGET_ID"
+kill "$TARGET_ID" && rm -f "$pid"
   else
 echo "no $command to stop"
   fi


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [Spark-5889] Remove pid file after stopping service.

2015-02-19 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master a5fed3435 -> ad6b169de


[Spark-5889] Remove pid file after stopping service.

Currently the pid file is not deleted, and potentially may cause some problem 
after service is stopped. The fix remove the pid file after service stopped.

Author: Zhan Zhang 

Closes #4676 from zhzhan/spark-5889 and squashes the following commits:

eb01be1 [Zhan Zhang] solve review comments
b4c009e [Zhan Zhang] solve review comments
018110a [Zhan Zhang] spark-5889: remove pid file after stopping service
088d2a2 [Zhan Zhang] squash all commits
c1f1fa5 [Zhan Zhang] test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad6b169d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad6b169d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad6b169d

Branch: refs/heads/master
Commit: ad6b169dee84df175b51933b7a3ad7f0bbc52cf3
Parents: a5fed34
Author: Zhan Zhang 
Authored: Thu Feb 19 23:13:02 2015 +
Committer: Sean Owen 
Committed: Thu Feb 19 23:13:02 2015 +

--
 sbin/spark-daemon.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ad6b169d/sbin/spark-daemon.sh
--
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index e1bcc7d..5e812a1 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -168,7 +168,7 @@ case $option in
   TARGET_ID="$(cat "$pid")"
   if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
 echo "stopping $command"
-kill "$TARGET_ID"
+kill "$TARGET_ID" && rm -f "$pid"
   else
 echo "no $command to stop"
   fi


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [Spark-5889] Remove pid file after stopping service.

2015-02-19 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 0c494cf9a -> ff8976ec7


[Spark-5889] Remove pid file after stopping service.

Currently the pid file is not deleted, and potentially may cause some problem 
after service is stopped. The fix remove the pid file after service stopped.

Author: Zhan Zhang 

Closes #4676 from zhzhan/spark-5889 and squashes the following commits:

eb01be1 [Zhan Zhang] solve review comments
b4c009e [Zhan Zhang] solve review comments
018110a [Zhan Zhang] spark-5889: remove pid file after stopping service
088d2a2 [Zhan Zhang] squash all commits
c1f1fa5 [Zhan Zhang] test

(cherry picked from commit ad6b169dee84df175b51933b7a3ad7f0bbc52cf3)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff8976ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff8976ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff8976ec

Branch: refs/heads/branch-1.3
Commit: ff8976ec7825bec0f0c30bf763a8ad3dcc14542b
Parents: 0c494cf
Author: Zhan Zhang 
Authored: Thu Feb 19 23:13:02 2015 +
Committer: Sean Owen 
Committed: Thu Feb 19 23:14:08 2015 +

--
 sbin/spark-daemon.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ff8976ec/sbin/spark-daemon.sh
--
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index e1bcc7d..5e812a1 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -168,7 +168,7 @@ case $option in
   TARGET_ID="$(cat "$pid")"
   if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
 echo "stopping $command"
-kill "$TARGET_ID"
+kill "$TARGET_ID" && rm -f "$pid"
   else
 echo "no $command to stop"
   fi


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5902] [ml] Made PipelineStage.transformSchema public instead of private to ml

2015-02-19 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 8ca3418e1 -> a5fed3435


[SPARK-5902] [ml] Made PipelineStage.transformSchema public instead of private 
to ml

For users to implement their own PipelineStages, we need to make 
PipelineStage.transformSchema be public instead of private to ml.  This would 
be nice to include in Spark 1.3

CC: mengxr

Author: Joseph K. Bradley 

Closes #4682 from jkbradley/SPARK-5902 and squashes the following commits:

6f02357 [Joseph K. Bradley] Made transformSchema public
0e6d0a0 [Joseph K. Bradley] made implementations of transformSchema protected 
as well
fdaf26a [Joseph K. Bradley] Made PipelineStage.transformSchema protected 
instead of private[ml]


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5fed343
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5fed343
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5fed343

Branch: refs/heads/master
Commit: a5fed34355b403188ad50b567ab62ee54597b493
Parents: 8ca3418
Author: Joseph K. Bradley 
Authored: Thu Feb 19 12:46:27 2015 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 19 12:46:27 2015 -0800

--
 .../main/scala/org/apache/spark/ml/Pipeline.scala   | 16 
 .../apache/spark/ml/feature/StandardScaler.scala|  4 ++--
 .../apache/spark/ml/impl/estimator/Predictor.scala  |  4 ++--
 .../org/apache/spark/ml/recommendation/ALS.scala|  4 ++--
 .../org/apache/spark/ml/tuning/CrossValidator.scala |  4 ++--
 5 files changed, 20 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a5fed343/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index 5607ed2..5bbcd2e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ml
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.Logging
-import org.apache.spark.annotation.AlphaComponent
+import org.apache.spark.annotation.{AlphaComponent, DeveloperApi}
 import org.apache.spark.ml.param.{Param, ParamMap}
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.types.StructType
@@ -33,9 +33,17 @@ import org.apache.spark.sql.types.StructType
 abstract class PipelineStage extends Serializable with Logging {
 
   /**
+   * :: DeveloperAPI ::
+   *
* Derives the output schema from the input schema and parameters.
+   * The schema describes the columns and types of the data.
+   *
+   * @param schema  Input schema to this stage
+   * @param paramMap  Parameters passed to this stage
+   * @return  Output schema from this stage
*/
-  private[ml] def transformSchema(schema: StructType, paramMap: ParamMap): 
StructType
+  @DeveloperApi
+  def transformSchema(schema: StructType, paramMap: ParamMap): StructType
 
   /**
* Derives the output schema from the input schema and parameters, 
optionally with logging.
@@ -126,7 +134,7 @@ class Pipeline extends Estimator[PipelineModel] {
 new PipelineModel(this, map, transformers.toArray)
   }
 
-  private[ml] override def transformSchema(schema: StructType, paramMap: 
ParamMap): StructType = {
+  override def transformSchema(schema: StructType, paramMap: ParamMap): 
StructType = {
 val map = this.paramMap ++ paramMap
 val theStages = map(stages)
 require(theStages.toSet.size == theStages.size,
@@ -171,7 +179,7 @@ class PipelineModel private[ml] (
 stages.foldLeft(dataset)((cur, transformer) => transformer.transform(cur, 
map))
   }
 
-  private[ml] override def transformSchema(schema: StructType, paramMap: 
ParamMap): StructType = {
+  override def transformSchema(schema: StructType, paramMap: ParamMap): 
StructType = {
 // Precedence of ParamMaps: paramMap > this.paramMap > fittingParamMap
 val map = (fittingParamMap ++ this.paramMap) ++ paramMap
 stages.foldLeft(schema)((cur, transformer) => 
transformer.transformSchema(cur, map))

http://git-wip-us.apache.org/repos/asf/spark/blob/a5fed343/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index ddbd648..1142aa4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -55,7 +55,7 @@ class StandardScaler extends Estimator[StandardScalerModel] 
with StandardScalerP
 model
   }
 
-  private[ml]

spark git commit: [SPARK-5902] [ml] Made PipelineStage.transformSchema public instead of private to ml

2015-02-19 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 55d91d92b -> 0c494cf9a


[SPARK-5902] [ml] Made PipelineStage.transformSchema public instead of private 
to ml

For users to implement their own PipelineStages, we need to make 
PipelineStage.transformSchema be public instead of private to ml.  This would 
be nice to include in Spark 1.3

CC: mengxr

Author: Joseph K. Bradley 

Closes #4682 from jkbradley/SPARK-5902 and squashes the following commits:

6f02357 [Joseph K. Bradley] Made transformSchema public
0e6d0a0 [Joseph K. Bradley] made implementations of transformSchema protected 
as well
fdaf26a [Joseph K. Bradley] Made PipelineStage.transformSchema protected 
instead of private[ml]

(cherry picked from commit a5fed34355b403188ad50b567ab62ee54597b493)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c494cf9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c494cf9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c494cf9

Branch: refs/heads/branch-1.3
Commit: 0c494cf9a3d2b717d86f53445b35f725afa89ac8
Parents: 55d91d9
Author: Joseph K. Bradley 
Authored: Thu Feb 19 12:46:27 2015 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 19 12:46:37 2015 -0800

--
 .../main/scala/org/apache/spark/ml/Pipeline.scala   | 16 
 .../apache/spark/ml/feature/StandardScaler.scala|  4 ++--
 .../apache/spark/ml/impl/estimator/Predictor.scala  |  4 ++--
 .../org/apache/spark/ml/recommendation/ALS.scala|  4 ++--
 .../org/apache/spark/ml/tuning/CrossValidator.scala |  4 ++--
 5 files changed, 20 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c494cf9/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index 5607ed2..5bbcd2e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ml
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.Logging
-import org.apache.spark.annotation.AlphaComponent
+import org.apache.spark.annotation.{AlphaComponent, DeveloperApi}
 import org.apache.spark.ml.param.{Param, ParamMap}
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.types.StructType
@@ -33,9 +33,17 @@ import org.apache.spark.sql.types.StructType
 abstract class PipelineStage extends Serializable with Logging {
 
   /**
+   * :: DeveloperAPI ::
+   *
* Derives the output schema from the input schema and parameters.
+   * The schema describes the columns and types of the data.
+   *
+   * @param schema  Input schema to this stage
+   * @param paramMap  Parameters passed to this stage
+   * @return  Output schema from this stage
*/
-  private[ml] def transformSchema(schema: StructType, paramMap: ParamMap): 
StructType
+  @DeveloperApi
+  def transformSchema(schema: StructType, paramMap: ParamMap): StructType
 
   /**
* Derives the output schema from the input schema and parameters, 
optionally with logging.
@@ -126,7 +134,7 @@ class Pipeline extends Estimator[PipelineModel] {
 new PipelineModel(this, map, transformers.toArray)
   }
 
-  private[ml] override def transformSchema(schema: StructType, paramMap: 
ParamMap): StructType = {
+  override def transformSchema(schema: StructType, paramMap: ParamMap): 
StructType = {
 val map = this.paramMap ++ paramMap
 val theStages = map(stages)
 require(theStages.toSet.size == theStages.size,
@@ -171,7 +179,7 @@ class PipelineModel private[ml] (
 stages.foldLeft(dataset)((cur, transformer) => transformer.transform(cur, 
map))
   }
 
-  private[ml] override def transformSchema(schema: StructType, paramMap: 
ParamMap): StructType = {
+  override def transformSchema(schema: StructType, paramMap: ParamMap): 
StructType = {
 // Precedence of ParamMaps: paramMap > this.paramMap > fittingParamMap
 val map = (fittingParamMap ++ this.paramMap) ++ paramMap
 stages.foldLeft(schema)((cur, transformer) => 
transformer.transformSchema(cur, map))

http://git-wip-us.apache.org/repos/asf/spark/blob/0c494cf9/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index ddbd648..1142aa4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -55,7 +55,7 @@ class 

spark git commit: [SPARK-5904][SQL] DataFrame API fixes.

2015-02-19 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 fe00eb66e -> 55d91d92b


[SPARK-5904][SQL] DataFrame API fixes.

1. Column is no longer a DataFrame to simplify class hierarchy.
2. Don't use varargs on abstract methods (see Scala compiler bug SI-9013).

Author: Reynold Xin 

Closes #4686 from rxin/SPARK-5904 and squashes the following commits:

fd9b199 [Reynold Xin] Fixed Python tests.
df25cef [Reynold Xin] Non final.
5221530 [Reynold Xin] [SPARK-5904][SQL] DataFrame API fixes.

Conflicts:
sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55d91d92
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55d91d92
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55d91d92

Branch: refs/heads/branch-1.3
Commit: 55d91d92bcdd56c479138cbdd9d2354bb8db52a7
Parents: fe00eb6
Author: Reynold Xin 
Authored: Thu Feb 19 12:09:44 2015 -0800
Committer: Michael Armbrust 
Committed: Thu Feb 19 12:15:18 2015 -0800

--
 python/pyspark/sql/dataframe.py |  56 +--
 .../scala/org/apache/spark/sql/Column.scala | 223 +++--
 .../org/apache/spark/sql/ComputableColumn.scala |  33 --
 .../scala/org/apache/spark/sql/DataFrame.scala  | 420 
 .../org/apache/spark/sql/DataFrameImpl.scala| 483 ---
 .../org/apache/spark/sql/GroupedData.scala  |   2 +-
 .../apache/spark/sql/IncomputableColumn.scala   | 183 ---
 .../spark/sql/ColumnExpressionSuite.scala   |  44 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |   7 +-
 9 files changed, 427 insertions(+), 1024 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/55d91d92/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index c68c97e..010c38f 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -546,7 +546,7 @@ class DataFrame(object):
 def __getitem__(self, item):
 """ Return the column by given name
 
->>> df['age'].collect()
+>>> df.select(df['age']).collect()
 [Row(age=2), Row(age=5)]
 >>> df[ ["name", "age"]].collect()
 [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
@@ -555,7 +555,7 @@ class DataFrame(object):
 """
 if isinstance(item, basestring):
 jc = self._jdf.apply(item)
-return Column(jc, self.sql_ctx)
+return Column(jc)
 elif isinstance(item, Column):
 return self.filter(item)
 elif isinstance(item, list):
@@ -566,13 +566,13 @@ class DataFrame(object):
 def __getattr__(self, name):
 """ Return the column by given name
 
->>> df.age.collect()
+>>> df.select(df.age).collect()
 [Row(age=2), Row(age=5)]
 """
 if name.startswith("__"):
 raise AttributeError(name)
 jc = self._jdf.apply(name)
-return Column(jc, self.sql_ctx)
+return Column(jc)
 
 def select(self, *cols):
 """ Selecting a set of expressions.
@@ -698,7 +698,7 @@ class DataFrame(object):
 >>> df.withColumnRenamed('age', 'age2').collect()
 [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
 """
-cols = [Column(_to_java_column(c), self.sql_ctx).alias(new)
+cols = [Column(_to_java_column(c)).alias(new)
 if c == existing else c
 for c in self.columns]
 return self.select(*cols)
@@ -873,15 +873,16 @@ def _unary_op(name, doc="unary operator"):
 """ Create a method for given unary operator """
 def _(self):
 jc = getattr(self._jc, name)()
-return Column(jc, self.sql_ctx)
+return Column(jc)
 _.__doc__ = doc
 return _
 
 
 def _func_op(name, doc=''):
 def _(self):
-jc = getattr(self._sc._jvm.functions, name)(self._jc)
-return Column(jc, self.sql_ctx)
+sc = SparkContext._active_spark_context
+jc = getattr(sc._jvm.functions, name)(self._jc)
+return Column(jc)
 _.__doc__ = doc
 return _
 
@@ -892,7 +893,7 @@ def _bin_op(name, doc="binary operator"):
 def _(self, other):
 jc = other._jc if isinstance(other, Column) else other
 njc = getattr(self._jc, name)(jc)
-return Column(njc, self.sql_ctx)
+return Column(njc)
 _.__doc__ = doc
 return _
 
@@ -903,12 +904,12 @@ def _reverse_op(name, doc="binary operator"):
 def _(self, other):
 jother = _create_column_from_literal(other)
 jc = getattr(jother, name)(self._jc)
-return Column(jc, self.sql_ctx)
+return Column(jc)
 _.__doc__ = doc
 return _
 
 
-class Column(

spark git commit: [SPARK-5904][SQL] DataFrame API fixes.

2015-02-19 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 94cdb05ff -> 8ca3418e1


[SPARK-5904][SQL] DataFrame API fixes.

1. Column is no longer a DataFrame to simplify class hierarchy.
2. Don't use varargs on abstract methods (see Scala compiler bug SI-9013).

Author: Reynold Xin 

Closes #4686 from rxin/SPARK-5904 and squashes the following commits:

fd9b199 [Reynold Xin] Fixed Python tests.
df25cef [Reynold Xin] Non final.
5221530 [Reynold Xin] [SPARK-5904][SQL] DataFrame API fixes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ca3418e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ca3418e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ca3418e

Branch: refs/heads/master
Commit: 8ca3418e1b3e2687e75a08c185d17045a97279fb
Parents: 94cdb05
Author: Reynold Xin 
Authored: Thu Feb 19 12:09:44 2015 -0800
Committer: Michael Armbrust 
Committed: Thu Feb 19 12:09:44 2015 -0800

--
 python/pyspark/sql/dataframe.py |  56 +--
 .../scala/org/apache/spark/sql/Column.scala | 223 +++--
 .../org/apache/spark/sql/ComputableColumn.scala |  33 --
 .../scala/org/apache/spark/sql/DataFrame.scala  | 420 
 .../org/apache/spark/sql/DataFrameImpl.scala| 483 ---
 .../org/apache/spark/sql/GroupedData.scala  |   2 +-
 .../apache/spark/sql/IncomputableColumn.scala   | 183 ---
 .../spark/sql/ColumnExpressionSuite.scala   |  44 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |   7 +-
 9 files changed, 427 insertions(+), 1024 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3418e/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index c68c97e..010c38f 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -546,7 +546,7 @@ class DataFrame(object):
 def __getitem__(self, item):
 """ Return the column by given name
 
->>> df['age'].collect()
+>>> df.select(df['age']).collect()
 [Row(age=2), Row(age=5)]
 >>> df[ ["name", "age"]].collect()
 [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
@@ -555,7 +555,7 @@ class DataFrame(object):
 """
 if isinstance(item, basestring):
 jc = self._jdf.apply(item)
-return Column(jc, self.sql_ctx)
+return Column(jc)
 elif isinstance(item, Column):
 return self.filter(item)
 elif isinstance(item, list):
@@ -566,13 +566,13 @@ class DataFrame(object):
 def __getattr__(self, name):
 """ Return the column by given name
 
->>> df.age.collect()
+>>> df.select(df.age).collect()
 [Row(age=2), Row(age=5)]
 """
 if name.startswith("__"):
 raise AttributeError(name)
 jc = self._jdf.apply(name)
-return Column(jc, self.sql_ctx)
+return Column(jc)
 
 def select(self, *cols):
 """ Selecting a set of expressions.
@@ -698,7 +698,7 @@ class DataFrame(object):
 >>> df.withColumnRenamed('age', 'age2').collect()
 [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
 """
-cols = [Column(_to_java_column(c), self.sql_ctx).alias(new)
+cols = [Column(_to_java_column(c)).alias(new)
 if c == existing else c
 for c in self.columns]
 return self.select(*cols)
@@ -873,15 +873,16 @@ def _unary_op(name, doc="unary operator"):
 """ Create a method for given unary operator """
 def _(self):
 jc = getattr(self._jc, name)()
-return Column(jc, self.sql_ctx)
+return Column(jc)
 _.__doc__ = doc
 return _
 
 
 def _func_op(name, doc=''):
 def _(self):
-jc = getattr(self._sc._jvm.functions, name)(self._jc)
-return Column(jc, self.sql_ctx)
+sc = SparkContext._active_spark_context
+jc = getattr(sc._jvm.functions, name)(self._jc)
+return Column(jc)
 _.__doc__ = doc
 return _
 
@@ -892,7 +893,7 @@ def _bin_op(name, doc="binary operator"):
 def _(self, other):
 jc = other._jc if isinstance(other, Column) else other
 njc = getattr(self._jc, name)(jc)
-return Column(njc, self.sql_ctx)
+return Column(njc)
 _.__doc__ = doc
 return _
 
@@ -903,12 +904,12 @@ def _reverse_op(name, doc="binary operator"):
 def _(self, other):
 jother = _create_column_from_literal(other)
 jc = getattr(jother, name)(self._jc)
-return Column(jc, self.sql_ctx)
+return Column(jc)
 _.__doc__ = doc
 return _
 
 
-class Column(DataFrame):
+class Column(object):
 
 """
 A column in a DataFrame.
@@ -924,9 +925,8 

spark git commit: [SPARK-5825] [Spark Submit] Remove the double checking instance name when stopping the service

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 90095bf3c -> 94cdb05ff


[SPARK-5825] [Spark Submit] Remove the double checking instance name when 
stopping the service

`spark-daemon.sh` will confirm the process id by fuzzy matching the class name 
while stopping the service, however, it will fail if the java process arguments 
is very long (greater than 4096 characters).
This PR looses the check for the service process.

Author: Cheng Hao 

Closes #4611 from chenghao-intel/stopping_service and squashes the following 
commits:

a0051f6 [Cheng Hao] loosen the process checking while stopping a service


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94cdb05f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94cdb05f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94cdb05f

Branch: refs/heads/master
Commit: 94cdb05ff7e6b8fc5b3a574202ba8bc8e5bbe689
Parents: 90095bf
Author: Cheng Hao 
Authored: Thu Feb 19 12:07:51 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 12:07:51 2015 -0800

--
 sbin/spark-daemon.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/94cdb05f/sbin/spark-daemon.sh
--
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index ec6d0b5..e1bcc7d 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -166,7 +166,7 @@ case $option in
 
 if [ -f $pid ]; then
   TARGET_ID="$(cat "$pid")"
-  if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then
+  if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
 echo "stopping $command"
 kill "$TARGET_ID"
   else


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5825] [Spark Submit] Remove the double checking instance name when stopping the service

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 61bde0049 -> 856fdcb65


[SPARK-5825] [Spark Submit] Remove the double checking instance name when 
stopping the service

`spark-daemon.sh` will confirm the process id by fuzzy matching the class name 
while stopping the service, however, it will fail if the java process arguments 
is very long (greater than 4096 characters).
This PR looses the check for the service process.

Author: Cheng Hao 

Closes #4611 from chenghao-intel/stopping_service and squashes the following 
commits:

a0051f6 [Cheng Hao] loosen the process checking while stopping a service

(cherry picked from commit 94cdb05ff7e6b8fc5b3a574202ba8bc8e5bbe689)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/856fdcb6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/856fdcb6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/856fdcb6

Branch: refs/heads/branch-1.2
Commit: 856fdcb654184f4d4ea3d620e48093ce3ada4cbb
Parents: 61bde00
Author: Cheng Hao 
Authored: Thu Feb 19 12:07:51 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 12:08:03 2015 -0800

--
 sbin/spark-daemon.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/856fdcb6/sbin/spark-daemon.sh
--
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index ec6d0b5..e1bcc7d 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -166,7 +166,7 @@ case $option in
 
 if [ -f $pid ]; then
   TARGET_ID="$(cat "$pid")"
-  if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then
+  if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
 echo "stopping $command"
 kill "$TARGET_ID"
   else


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5825] [Spark Submit] Remove the double checking instance name when stopping the service

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 25fae8e7e -> fe00eb66e


[SPARK-5825] [Spark Submit] Remove the double checking instance name when 
stopping the service

`spark-daemon.sh` will confirm the process id by fuzzy matching the class name 
while stopping the service, however, it will fail if the java process arguments 
is very long (greater than 4096 characters).
This PR looses the check for the service process.

Author: Cheng Hao 

Closes #4611 from chenghao-intel/stopping_service and squashes the following 
commits:

a0051f6 [Cheng Hao] loosen the process checking while stopping a service

(cherry picked from commit 94cdb05ff7e6b8fc5b3a574202ba8bc8e5bbe689)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe00eb66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe00eb66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe00eb66

Branch: refs/heads/branch-1.3
Commit: fe00eb66e8a65e23e81d401da32c5d5de8180394
Parents: 25fae8e
Author: Cheng Hao 
Authored: Thu Feb 19 12:07:51 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 12:07:56 2015 -0800

--
 sbin/spark-daemon.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fe00eb66/sbin/spark-daemon.sh
--
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index ec6d0b5..e1bcc7d 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -166,7 +166,7 @@ case $option in
 
 if [ -f $pid ]; then
   TARGET_ID="$(cat "$pid")"
-  if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then
+  if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
 echo "stopping $command"
 kill "$TARGET_ID"
   else


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 f6ee80b18 -> 61bde0049


[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure 
deleting the temp file

This PR adds a `finalize` method in DiskMapIterator to clean up the resources 
even if some exception happens during processing data.

Author: zsxwing 

Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits:

d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure 
deleting the temp file
(cherry picked from commit 90095bf3ce9304d09a32ceffaa99069079071b59)

Signed-off-by: Ubuntu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61bde004
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61bde004
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61bde004

Branch: refs/heads/branch-1.2
Commit: 61bde0049fac324b5004eadfa22b02cd76cf2187
Parents: f6ee80b
Author: zsxwing 
Authored: Thu Feb 19 18:37:31 2015 +
Committer: Ubuntu 
Committed: Thu Feb 19 18:38:00 2015 +

--
 .../util/collection/ExternalAppendOnlyMap.scala | 52 
 1 file changed, 43 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/61bde004/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8a0f5a6..fc7e86e 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
 private var batchIndex = 0  // Which batch we're in
 private var fileStream: FileInputStream = null
 
+@volatile private var closed = false
+
+// A volatile variable to remember which DeserializationStream is using. 
Need to set it when we
+// open a DeserializationStream. But we should use `deserializeStream` 
rather than
+// `deserializeStreamToBeClosed` to read the content because touching a 
volatile variable will
+// reduce the performance. It must be volatile so that we can see its 
correct value in the
+// `finalize` method, which could run in any thread.
+@volatile private var deserializeStreamToBeClosed: DeserializationStream = 
null
+
 // An intermediate stream that reads from exactly one batch
 // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
 private var deserializeStream = nextBatchStream()
@@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
   // we're still in a valid batch.
   if (batchIndex < batchOffsets.length - 1) {
 if (deserializeStream != null) {
+  deserializeStreamToBeClosed = null
   deserializeStream.close()
   fileStream.close()
   deserializeStream = null
@@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](
 
 val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
 val compressedStream = blockManager.wrapForCompression(blockId, 
bufferedStream)
-ser.deserializeStream(compressedStream)
+// Before returning the stream, assign it to 
`deserializeStreamToBeClosed` so that we can
+// close it in `finalize` and also avoid to touch the volatile 
`deserializeStreamToBeClosed`
+// during reading the (K, C) pairs.
+deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
+deserializeStreamToBeClosed
   } else {
 // No more batches left
 cleanup()
@@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
   item
 }
 
-// TODO: Ensure this gets called even if the iterator isn't drained.
-private def cleanup() {
-  batchIndex = batchOffsets.length  // Prevent reading any other batch
-  val ds = deserializeStream
-  deserializeStream = null
-  fileStream = null
-  ds.close()
-  file.delete()
+// TODO: Now only use `finalize` to ensure `close` gets called to clean up 
the resources. In the
+// future, we need some mechanism to ensure this gets called once the 
resources are not used.
+private def cleanup(): Unit = {
+  if (!closed) {
+closed = true
+batchIndex = batchOffsets.length  // Prevent reading any other batch
+fileStream = null
+try {
+  val ds = deserializeStreamToBeClosed
+  deserializeStreamToBeClosed = null
+  deserializeStream = null
+  if (ds != null) {
+ds.close()
+  }
+} finally {
+ 

spark git commit: [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 651ceaeb3 -> 36f3c499f


[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure 
deleting the temp file

This PR adds a `finalize` method in DiskMapIterator to clean up the resources 
even if some exception happens during processing data.

Author: zsxwing 

Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits:

d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure 
deleting the temp file
(cherry picked from commit 90095bf3ce9304d09a32ceffaa99069079071b59)

Signed-off-by: Ubuntu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36f3c499
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36f3c499
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36f3c499

Branch: refs/heads/branch-1.1
Commit: 36f3c499fd1ad53a68a084d6a16a2c68099e7049
Parents: 651ceae
Author: zsxwing 
Authored: Thu Feb 19 18:37:31 2015 +
Committer: Ubuntu 
Committed: Thu Feb 19 18:38:27 2015 +

--
 .../util/collection/ExternalAppendOnlyMap.scala | 52 
 1 file changed, 43 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/36f3c499/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 5619b30..abb1f11 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -434,6 +434,15 @@ class ExternalAppendOnlyMap[K, V, C](
 private var batchIndex = 0  // Which batch we're in
 private var fileStream: FileInputStream = null
 
+@volatile private var closed = false
+
+// A volatile variable to remember which DeserializationStream is using. 
Need to set it when we
+// open a DeserializationStream. But we should use `deserializeStream` 
rather than
+// `deserializeStreamToBeClosed` to read the content because touching a 
volatile variable will
+// reduce the performance. It must be volatile so that we can see its 
correct value in the
+// `finalize` method, which could run in any thread.
+@volatile private var deserializeStreamToBeClosed: DeserializationStream = 
null
+
 // An intermediate stream that reads from exactly one batch
 // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
 private var deserializeStream = nextBatchStream()
@@ -448,6 +457,7 @@ class ExternalAppendOnlyMap[K, V, C](
   // we're still in a valid batch.
   if (batchIndex < batchOffsets.length - 1) {
 if (deserializeStream != null) {
+  deserializeStreamToBeClosed = null
   deserializeStream.close()
   fileStream.close()
   deserializeStream = null
@@ -466,7 +476,11 @@ class ExternalAppendOnlyMap[K, V, C](
 
 val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
 val compressedStream = blockManager.wrapForCompression(blockId, 
bufferedStream)
-ser.deserializeStream(compressedStream)
+// Before returning the stream, assign it to 
`deserializeStreamToBeClosed` so that we can
+// close it in `finalize` and also avoid to touch the volatile 
`deserializeStreamToBeClosed`
+// during reading the (K, C) pairs.
+deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
+deserializeStreamToBeClosed
   } else {
 // No more batches left
 cleanup()
@@ -515,14 +529,34 @@ class ExternalAppendOnlyMap[K, V, C](
   item
 }
 
-// TODO: Ensure this gets called even if the iterator isn't drained.
-private def cleanup() {
-  batchIndex = batchOffsets.length  // Prevent reading any other batch
-  val ds = deserializeStream
-  deserializeStream = null
-  fileStream = null
-  ds.close()
-  file.delete()
+// TODO: Now only use `finalize` to ensure `close` gets called to clean up 
the resources. In the
+// future, we need some mechanism to ensure this gets called once the 
resources are not used.
+private def cleanup(): Unit = {
+  if (!closed) {
+closed = true
+batchIndex = batchOffsets.length  // Prevent reading any other batch
+fileStream = null
+try {
+  val ds = deserializeStreamToBeClosed
+  deserializeStreamToBeClosed = null
+  deserializeStream = null
+  if (ds != null) {
+ds.close()
+  }
+} finally {
+ 

spark git commit: [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 f93d4d992 -> 25fae8e7e


[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure 
deleting the temp file

This PR adds a `finalize` method in DiskMapIterator to clean up the resources 
even if some exception happens during processing data.

Author: zsxwing 

Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits:

d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure 
deleting the temp file
(cherry picked from commit 90095bf3ce9304d09a32ceffaa99069079071b59)

Signed-off-by: Ubuntu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25fae8e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25fae8e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25fae8e7

Branch: refs/heads/branch-1.3
Commit: 25fae8e7e6c93b7817771342d370b73b40dcf92e
Parents: f93d4d9
Author: zsxwing 
Authored: Thu Feb 19 18:37:31 2015 +
Committer: Ubuntu 
Committed: Thu Feb 19 18:37:47 2015 +

--
 .../util/collection/ExternalAppendOnlyMap.scala | 52 
 1 file changed, 43 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/25fae8e7/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8a0f5a6..fc7e86e 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
 private var batchIndex = 0  // Which batch we're in
 private var fileStream: FileInputStream = null
 
+@volatile private var closed = false
+
+// A volatile variable to remember which DeserializationStream is using. 
Need to set it when we
+// open a DeserializationStream. But we should use `deserializeStream` 
rather than
+// `deserializeStreamToBeClosed` to read the content because touching a 
volatile variable will
+// reduce the performance. It must be volatile so that we can see its 
correct value in the
+// `finalize` method, which could run in any thread.
+@volatile private var deserializeStreamToBeClosed: DeserializationStream = 
null
+
 // An intermediate stream that reads from exactly one batch
 // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
 private var deserializeStream = nextBatchStream()
@@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
   // we're still in a valid batch.
   if (batchIndex < batchOffsets.length - 1) {
 if (deserializeStream != null) {
+  deserializeStreamToBeClosed = null
   deserializeStream.close()
   fileStream.close()
   deserializeStream = null
@@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](
 
 val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
 val compressedStream = blockManager.wrapForCompression(blockId, 
bufferedStream)
-ser.deserializeStream(compressedStream)
+// Before returning the stream, assign it to 
`deserializeStreamToBeClosed` so that we can
+// close it in `finalize` and also avoid to touch the volatile 
`deserializeStreamToBeClosed`
+// during reading the (K, C) pairs.
+deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
+deserializeStreamToBeClosed
   } else {
 // No more batches left
 cleanup()
@@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
   item
 }
 
-// TODO: Ensure this gets called even if the iterator isn't drained.
-private def cleanup() {
-  batchIndex = batchOffsets.length  // Prevent reading any other batch
-  val ds = deserializeStream
-  deserializeStream = null
-  fileStream = null
-  ds.close()
-  file.delete()
+// TODO: Now only use `finalize` to ensure `close` gets called to clean up 
the resources. In the
+// future, we need some mechanism to ensure this gets called once the 
resources are not used.
+private def cleanup(): Unit = {
+  if (!closed) {
+closed = true
+batchIndex = batchOffsets.length  // Prevent reading any other batch
+fileStream = null
+try {
+  val ds = deserializeStreamToBeClosed
+  deserializeStreamToBeClosed = null
+  deserializeStream = null
+  if (ds != null) {
+ds.close()
+  }
+} finally {
+ 

spark git commit: [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 38e624a73 -> 90095bf3c


[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure 
deleting the temp file

This PR adds a `finalize` method in DiskMapIterator to clean up the resources 
even if some exception happens during processing data.

Author: zsxwing 

Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits:

d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure 
deleting the temp file


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90095bf3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90095bf3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90095bf3

Branch: refs/heads/master
Commit: 90095bf3ce9304d09a32ceffaa99069079071b59
Parents: 38e624a
Author: zsxwing 
Authored: Thu Feb 19 18:37:31 2015 +
Committer: Ubuntu 
Committed: Thu Feb 19 18:37:31 2015 +

--
 .../util/collection/ExternalAppendOnlyMap.scala | 52 
 1 file changed, 43 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/90095bf3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8a0f5a6..fc7e86e 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
 private var batchIndex = 0  // Which batch we're in
 private var fileStream: FileInputStream = null
 
+@volatile private var closed = false
+
+// A volatile variable to remember which DeserializationStream is using. 
Need to set it when we
+// open a DeserializationStream. But we should use `deserializeStream` 
rather than
+// `deserializeStreamToBeClosed` to read the content because touching a 
volatile variable will
+// reduce the performance. It must be volatile so that we can see its 
correct value in the
+// `finalize` method, which could run in any thread.
+@volatile private var deserializeStreamToBeClosed: DeserializationStream = 
null
+
 // An intermediate stream that reads from exactly one batch
 // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
 private var deserializeStream = nextBatchStream()
@@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
   // we're still in a valid batch.
   if (batchIndex < batchOffsets.length - 1) {
 if (deserializeStream != null) {
+  deserializeStreamToBeClosed = null
   deserializeStream.close()
   fileStream.close()
   deserializeStream = null
@@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](
 
 val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
 val compressedStream = blockManager.wrapForCompression(blockId, 
bufferedStream)
-ser.deserializeStream(compressedStream)
+// Before returning the stream, assign it to 
`deserializeStreamToBeClosed` so that we can
+// close it in `finalize` and also avoid to touch the volatile 
`deserializeStreamToBeClosed`
+// during reading the (K, C) pairs.
+deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
+deserializeStreamToBeClosed
   } else {
 // No more batches left
 cleanup()
@@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
   item
 }
 
-// TODO: Ensure this gets called even if the iterator isn't drained.
-private def cleanup() {
-  batchIndex = batchOffsets.length  // Prevent reading any other batch
-  val ds = deserializeStream
-  deserializeStream = null
-  fileStream = null
-  ds.close()
-  file.delete()
+// TODO: Now only use `finalize` to ensure `close` gets called to clean up 
the resources. In the
+// future, we need some mechanism to ensure this gets called once the 
resources are not used.
+private def cleanup(): Unit = {
+  if (!closed) {
+closed = true
+batchIndex = batchOffsets.length  // Prevent reading any other batch
+fileStream = null
+try {
+  val ds = deserializeStreamToBeClosed
+  deserializeStreamToBeClosed = null
+  deserializeStream = null
+  if (ds != null) {
+ds.close()
+  }
+} finally {
+  if (file.exists()) {
+file.delete()
+  }
+}
+  }
+}
+
+o

spark git commit: [SPARK-5816] Add huge compatibility warning in DriverWrapper

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master fb87f4492 -> 38e624a73


[SPARK-5816] Add huge compatibility warning in DriverWrapper

The stability of the new submission gateway assumes that the arguments in 
`DriverWrapper` are consistent across multiple Spark versions. However, this is 
not at all clear from the code itself. In fact, this was broken in 
20a6013106b56a1a1cc3e8cda092330ffbe77cc3, which is fortunately OK because both 
that commit and the original commit that added this gateway are part of the 
same release.

To prevent this from happening again we should at the very least add a huge 
warning where appropriate.

Author: Andrew Or 

Closes #4687 from andrewor14/driver-wrapper-warning and squashes the following 
commits:

7989b56 [Andrew Or] Add huge compatibility warning


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e624a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e624a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e624a7

Branch: refs/heads/master
Commit: 38e624a732b18e01ad2e7a499ce0bb0d7acdcdf6
Parents: fb87f44
Author: Andrew Or 
Authored: Thu Feb 19 09:56:25 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 09:56:25 2015 -0800

--
 .../scala/org/apache/spark/deploy/worker/DriverWrapper.scala  | 7 +++
 1 file changed, 7 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/38e624a7/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index ab467a5..deef6ef 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -26,10 +26,17 @@ import org.apache.spark.util.{AkkaUtils, 
ChildFirstURLClassLoader, MutableURLCla
 
 /**
  * Utility object for launching driver programs such that they share fate with 
the Worker process.
+ * This is used in standalone cluster mode only.
  */
 object DriverWrapper {
   def main(args: Array[String]) {
 args.toList match {
+  /*
+   * IMPORTANT: Spark 1.3 provides a stable application submission gateway 
that is both
+   * backward and forward compatible across future Spark versions. Because 
this gateway
+   * uses this class to launch the driver, the ordering and semantics of 
the arguments
+   * here must also remain consistent across versions.
+   */
   case workerUrl :: userJar :: mainClass :: extraArgs =>
 val conf = new SparkConf()
 val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5816] Add huge compatibility warning in DriverWrapper

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 fbcb949c5 -> f93d4d992


[SPARK-5816] Add huge compatibility warning in DriverWrapper

The stability of the new submission gateway assumes that the arguments in 
`DriverWrapper` are consistent across multiple Spark versions. However, this is 
not at all clear from the code itself. In fact, this was broken in 
20a6013106b56a1a1cc3e8cda092330ffbe77cc3, which is fortunately OK because both 
that commit and the original commit that added this gateway are part of the 
same release.

To prevent this from happening again we should at the very least add a huge 
warning where appropriate.

Author: Andrew Or 

Closes #4687 from andrewor14/driver-wrapper-warning and squashes the following 
commits:

7989b56 [Andrew Or] Add huge compatibility warning

(cherry picked from commit 38e624a732b18e01ad2e7a499ce0bb0d7acdcdf6)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f93d4d99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f93d4d99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f93d4d99

Branch: refs/heads/branch-1.3
Commit: f93d4d99204e3d83e802e326c55416e1b7e2
Parents: fbcb949
Author: Andrew Or 
Authored: Thu Feb 19 09:56:25 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 09:56:37 2015 -0800

--
 .../scala/org/apache/spark/deploy/worker/DriverWrapper.scala  | 7 +++
 1 file changed, 7 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f93d4d99/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index ab467a5..deef6ef 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -26,10 +26,17 @@ import org.apache.spark.util.{AkkaUtils, 
ChildFirstURLClassLoader, MutableURLCla
 
 /**
  * Utility object for launching driver programs such that they share fate with 
the Worker process.
+ * This is used in standalone cluster mode only.
  */
 object DriverWrapper {
   def main(args: Array[String]) {
 args.toList match {
+  /*
+   * IMPORTANT: Spark 1.3 provides a stable application submission gateway 
that is both
+   * backward and forward compatible across future Spark versions. Because 
this gateway
+   * uses this class to launch the driver, the ordering and semantics of 
the arguments
+   * here must also remain consistent across versions.
+   */
   case workerUrl :: userJar :: mainClass :: extraArgs =>
 val conf = new SparkConf()
 val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-5548: Fix for AkkaUtilsSuite failure - attempt 2

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 092b45f69 -> fbcb949c5


SPARK-5548: Fix for AkkaUtilsSuite failure - attempt 2

Author: Jacek Lewandowski 

Closes #4653 from jacek-lewandowski/SPARK-5548-2-master and squashes the 
following commits:

0e199b6 [Jacek Lewandowski] SPARK-5548: applied reviewer's comments
843eafb [Jacek Lewandowski] SPARK-5548: Fix for AkkaUtilsSuite failure - 
attempt 2

(cherry picked from commit fb87f449219c673a16bc46f85c1ef7a6e3f22736)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbcb949c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbcb949c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbcb949c

Branch: refs/heads/branch-1.3
Commit: fbcb949c545a1e17411403b23bb6bc20881fb26a
Parents: 092b45f
Author: Jacek Lewandowski 
Authored: Thu Feb 19 09:53:36 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 09:53:49 2015 -0800

--
 .../test/scala/org/apache/spark/util/AkkaUtilsSuite.scala   | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fbcb949c/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 2cc5817..6250d50 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
 import java.util.concurrent.TimeoutException
 
 import scala.concurrent.Await
+import scala.util.{Failure, Try}
 
 import akka.actor._
 
@@ -370,8 +371,12 @@ class AkkaUtilsSuite extends FunSuite with 
LocalSparkContext with ResetSystemPro
 val selection = slaveSystem.actorSelection(
   AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", 
boundPort, "MapOutputTracker"))
 val timeout = AkkaUtils.lookupTimeout(conf)
-intercept[TimeoutException] {
-  slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout * 
2), timeout)
+val result = Try(Await.result(selection.resolveOne(timeout * 2), timeout))
+
+result match {
+  case Failure(ex: ActorNotFound) =>
+  case Failure(ex: TimeoutException) =>
+  case r => fail(s"$r is neither Failure(ActorNotFound) nor 
Failure(TimeoutException)")
 }
 
 actorSystem.shutdown()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-5548: Fix for AkkaUtilsSuite failure - attempt 2

2015-02-19 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master e945aa613 -> fb87f4492


SPARK-5548: Fix for AkkaUtilsSuite failure - attempt 2

Author: Jacek Lewandowski 

Closes #4653 from jacek-lewandowski/SPARK-5548-2-master and squashes the 
following commits:

0e199b6 [Jacek Lewandowski] SPARK-5548: applied reviewer's comments
843eafb [Jacek Lewandowski] SPARK-5548: Fix for AkkaUtilsSuite failure - 
attempt 2


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb87f449
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb87f449
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb87f449

Branch: refs/heads/master
Commit: fb87f449219c673a16bc46f85c1ef7a6e3f22736
Parents: e945aa6
Author: Jacek Lewandowski 
Authored: Thu Feb 19 09:53:36 2015 -0800
Committer: Andrew Or 
Committed: Thu Feb 19 09:53:36 2015 -0800

--
 .../test/scala/org/apache/spark/util/AkkaUtilsSuite.scala   | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fb87f449/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 2cc5817..6250d50 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
 import java.util.concurrent.TimeoutException
 
 import scala.concurrent.Await
+import scala.util.{Failure, Try}
 
 import akka.actor._
 
@@ -370,8 +371,12 @@ class AkkaUtilsSuite extends FunSuite with 
LocalSparkContext with ResetSystemPro
 val selection = slaveSystem.actorSelection(
   AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", 
boundPort, "MapOutputTracker"))
 val timeout = AkkaUtils.lookupTimeout(conf)
-intercept[TimeoutException] {
-  slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout * 
2), timeout)
+val result = Try(Await.result(selection.resolveOne(timeout * 2), timeout))
+
+result match {
+  case Failure(ex: ActorNotFound) =>
+  case Failure(ex: TimeoutException) =>
+  case r => fail(s"$r is neither Failure(ActorNotFound) nor 
Failure(TimeoutException)")
 }
 
 actorSystem.shutdown()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org