spark git commit: [SPARK-4808] Removing minimum number of elements read before spill check
Repository: spark Updated Branches: refs/heads/branch-1.2 18fbed5b5 -> 5cea859fd [SPARK-4808] Removing minimum number of elements read before spill check In the general case, Spillable's heuristic of checking for memory stress on every 32nd item after 1000 items are read is good enough. In general, we do not want to be enacting the spilling checks until later on in the job; checking for disk-spilling too early can produce unacceptable performance impact in trivial cases. However, there are non-trivial cases, particularly if each serialized object is large, where checking for the necessity to spill too late would allow the memory to overflow. Consider if every item is 1.5 MB in size, and the heap size is 1000 MB. Then clearly if we only try to spill the in-memory contents to disk after 1000 items are read, we would have already accumulated 1500 MB of RAM and overflowed the heap. Patch #3656 attempted to circumvent this by checking the need to spill on every single item read, but that would cause unacceptable performance in the general case. However, the convoluted cases above should not be forced to be refactored to shrink the data items. Therefore it makes sense that the memory spilling thresholds be configurable. Author: mcheah Closes #4420 from mingyukim/memory-spill-configurable and squashes the following commits: 6e2509f [mcheah] [SPARK-4808] Removing minimum number of elements read before spill check Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cea859f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cea859f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cea859f Branch: refs/heads/branch-1.2 Commit: 5cea859fd27dc6a216fa9d31d293c93407fbff01 Parents: 18fbed5 Author: mcheah Authored: Thu Feb 19 18:09:22 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 18:10:02 2015 -0800 -- .../scala/org/apache/spark/util/collection/Spillable.scala | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5cea859f/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 9f54312..747ecf0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -42,9 +42,6 @@ private[spark] trait Spillable[C] extends Logging { // Memory manager that can be used to acquire/release memory private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager - // Threshold for `elementsRead` before we start tracking this collection's memory usage - private[this] val trackMemoryThreshold = 1000 - // Initial threshold for the size of a collection before we start tracking its memory usage // Exposed for testing private[this] val initialMemoryThreshold: Long = @@ -72,8 +69,7 @@ private[spark] trait Spillable[C] extends Logging { * @return true if `collection` was spilled to disk; false otherwise */ protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { -if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && -currentMemory >= myMemoryThreshold) { +if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = shuffleMemoryManager.tryToAcquire(amountToRequest) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4808] Removing minimum number of elements read before spill check
Repository: spark Updated Branches: refs/heads/master 0cfd2cebd -> 3be92cdac [SPARK-4808] Removing minimum number of elements read before spill check In the general case, Spillable's heuristic of checking for memory stress on every 32nd item after 1000 items are read is good enough. In general, we do not want to be enacting the spilling checks until later on in the job; checking for disk-spilling too early can produce unacceptable performance impact in trivial cases. However, there are non-trivial cases, particularly if each serialized object is large, where checking for the necessity to spill too late would allow the memory to overflow. Consider if every item is 1.5 MB in size, and the heap size is 1000 MB. Then clearly if we only try to spill the in-memory contents to disk after 1000 items are read, we would have already accumulated 1500 MB of RAM and overflowed the heap. Patch #3656 attempted to circumvent this by checking the need to spill on every single item read, but that would cause unacceptable performance in the general case. However, the convoluted cases above should not be forced to be refactored to shrink the data items. Therefore it makes sense that the memory spilling thresholds be configurable. Author: mcheah Closes #4420 from mingyukim/memory-spill-configurable and squashes the following commits: 6e2509f [mcheah] [SPARK-4808] Removing minimum number of elements read before spill check Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3be92cda Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3be92cda Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3be92cda Branch: refs/heads/master Commit: 3be92cdac30cf488e09dbdaaa70e5c4cdaa9a099 Parents: 0cfd2ce Author: mcheah Authored: Thu Feb 19 18:09:22 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 18:09:22 2015 -0800 -- .../scala/org/apache/spark/util/collection/Spillable.scala | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3be92cda/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 9f54312..747ecf0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -42,9 +42,6 @@ private[spark] trait Spillable[C] extends Logging { // Memory manager that can be used to acquire/release memory private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager - // Threshold for `elementsRead` before we start tracking this collection's memory usage - private[this] val trackMemoryThreshold = 1000 - // Initial threshold for the size of a collection before we start tracking its memory usage // Exposed for testing private[this] val initialMemoryThreshold: Long = @@ -72,8 +69,7 @@ private[spark] trait Spillable[C] extends Logging { * @return true if `collection` was spilled to disk; false otherwise */ protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { -if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && -currentMemory >= myMemoryThreshold) { +if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = shuffleMemoryManager.tryToAcquire(amountToRequest) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4808] Removing minimum number of elements read before spill check
Repository: spark Updated Branches: refs/heads/branch-1.3 ba941ceb1 -> 0382dcc0a [SPARK-4808] Removing minimum number of elements read before spill check In the general case, Spillable's heuristic of checking for memory stress on every 32nd item after 1000 items are read is good enough. In general, we do not want to be enacting the spilling checks until later on in the job; checking for disk-spilling too early can produce unacceptable performance impact in trivial cases. However, there are non-trivial cases, particularly if each serialized object is large, where checking for the necessity to spill too late would allow the memory to overflow. Consider if every item is 1.5 MB in size, and the heap size is 1000 MB. Then clearly if we only try to spill the in-memory contents to disk after 1000 items are read, we would have already accumulated 1500 MB of RAM and overflowed the heap. Patch #3656 attempted to circumvent this by checking the need to spill on every single item read, but that would cause unacceptable performance in the general case. However, the convoluted cases above should not be forced to be refactored to shrink the data items. Therefore it makes sense that the memory spilling thresholds be configurable. Author: mcheah Closes #4420 from mingyukim/memory-spill-configurable and squashes the following commits: 6e2509f [mcheah] [SPARK-4808] Removing minimum number of elements read before spill check (cherry picked from commit 3be92cdac30cf488e09dbdaaa70e5c4cdaa9a099) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0382dcc0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0382dcc0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0382dcc0 Branch: refs/heads/branch-1.3 Commit: 0382dcc0a94f8e619fd11ec2cc0b18459a690c2b Parents: ba941ce Author: mcheah Authored: Thu Feb 19 18:09:22 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 18:09:26 2015 -0800 -- .../scala/org/apache/spark/util/collection/Spillable.scala | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0382dcc0/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 9f54312..747ecf0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -42,9 +42,6 @@ private[spark] trait Spillable[C] extends Logging { // Memory manager that can be used to acquire/release memory private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager - // Threshold for `elementsRead` before we start tracking this collection's memory usage - private[this] val trackMemoryThreshold = 1000 - // Initial threshold for the size of a collection before we start tracking its memory usage // Exposed for testing private[this] val initialMemoryThreshold: Long = @@ -72,8 +69,7 @@ private[spark] trait Spillable[C] extends Logging { * @return true if `collection` was spilled to disk; false otherwise */ protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { -if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && -currentMemory >= myMemoryThreshold) { +if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = shuffleMemoryManager.tryToAcquire(amountToRequest) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5900][MLLIB] make PIC and FPGrowth Java-friendly
Repository: spark Updated Branches: refs/heads/branch-1.3 c5f3b9e02 -> ba941ceb1 [SPARK-5900][MLLIB] make PIC and FPGrowth Java-friendly In the previous version, PIC stores clustering assignments as an `RDD[(Long, Int)]`. This is mapped to `RDD>` in Java and hence Java users have to cast types manually. We should either create a new method called `javaAssignments` that returns `JavaRDD[(java.lang.Long, java.lang.Int)]` or wrap the result pair in a class. I chose the latter approach in this PR. Now assignments are stored as an `RDD[Assignment]`, where `Assignment` is a class with `id` and `cluster`. Similarly, in FPGrowth, the frequent itemsets are stored as an `RDD[(Array[Item], Long)]`, which is mapped to `RDD>`. Though we provide a "Java-friendly" method `javaFreqItemsets` that returns `JavaRDD[(Array[Item], java.lang.Long)]`. It doesn't really work because `Array[Item]` is mapped to `Object` in Java. So in this PR I created a class `FreqItemset` to wrap the results. It has `items` and `freq`, as well as a `javaItems` method that returns `List` in Java. I'm not certain that the names I chose are proper: `Assignment`/`id`/`cluster` and `FreqItemset`/`items`/`freq`. Please let me know if there are better suggestions. CC: jkbradley Author: Xiangrui Meng Closes #4695 from mengxr/SPARK-5900 and squashes the following commits: 865b5ca [Xiangrui Meng] make Assignment serializable cffa96e [Xiangrui Meng] fix test 9c0e590 [Xiangrui Meng] remove unused Tuple2 1b9db3d [Xiangrui Meng] make PIC and FPGrowth Java-friendly (cherry picked from commit 0cfd2cebde0b7fac3779eda80d6e42223f8a3d9f) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba941ceb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba941ceb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba941ceb Branch: refs/heads/branch-1.3 Commit: ba941ceb1f78b28ca5cfb18c770f4171b9c74b0a Parents: c5f3b9e Author: Xiangrui Meng Authored: Thu Feb 19 18:06:16 2015 -0800 Committer: Xiangrui Meng Committed: Thu Feb 19 18:06:26 2015 -0800 -- docs/mllib-clustering.md| 8 ++-- docs/mllib-frequent-pattern-mining.md | 12 +++--- .../examples/mllib/JavaFPGrowthExample.java | 8 ++-- .../JavaPowerIterationClusteringExample.java| 5 +-- .../spark/examples/mllib/FPGrowthExample.scala | 4 +- .../mllib/PowerIterationClusteringExample.scala | 8 +--- .../clustering/PowerIterationClustering.scala | 33 +--- .../org/apache/spark/mllib/fpm/FPGrowth.scala | 41 ++-- .../spark/mllib/fpm/JavaFPGrowthSuite.java | 30 +- .../PowerIterationClusteringSuite.scala | 8 ++-- .../apache/spark/mllib/fpm/FPGrowthSuite.scala | 10 ++--- 11 files changed, 93 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ba941ceb/docs/mllib-clustering.md -- diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 6e46a47..0b6db4f 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -314,8 +314,8 @@ val pic = new PowerIteartionClustering() .setMaxIterations(20) val model = pic.run(similarities) -model.assignments.foreach { case (vertexId, clusterId) => - println(s"$vertexId -> $clusterId") +model.assignments.foreach { a => + println(s"${a.id} -> ${a.cluster}") } {% endhighlight %} @@ -349,8 +349,8 @@ PowerIterationClustering pic = new PowerIterationClustering() .setMaxIterations(10); PowerIterationClusteringModel model = pic.run(similarities); -for (Tuple2 assignment: model.assignments().toJavaRDD().collect()) { - System.out.println(assignment._1() + " -> " + assignment._2()); +for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) { + System.out.println(a.id() + " -> " + a.cluster()); } {% endhighlight %} http://git-wip-us.apache.org/repos/asf/spark/blob/ba941ceb/docs/mllib-frequent-pattern-mining.md -- diff --git a/docs/mllib-frequent-pattern-mining.md b/docs/mllib-frequent-pattern-mining.md index 0ff9738..9fd9be0 100644 --- a/docs/mllib-frequent-pattern-mining.md +++ b/docs/mllib-frequent-pattern-mining.md @@ -57,8 +57,8 @@ val fpg = new FPGrowth() .setNumPartitions(10) val model = fpg.run(transactions) -model.freqItemsets.collect().foreach { case (itemset, freq) => - println(itemset.mkString("[", ",", "]") + ", " + freq) +model.freqItemsets.collect().foreach { itemset => + println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq) } {% endhighlight %} @@ -74,10 +74,9 @@ Calling `FPGrowth.run` with transactions returns an that stores the frequent
spark git commit: [SPARK-5900][MLLIB] make PIC and FPGrowth Java-friendly
Repository: spark Updated Branches: refs/heads/master 6bddc4035 -> 0cfd2cebd [SPARK-5900][MLLIB] make PIC and FPGrowth Java-friendly In the previous version, PIC stores clustering assignments as an `RDD[(Long, Int)]`. This is mapped to `RDD>` in Java and hence Java users have to cast types manually. We should either create a new method called `javaAssignments` that returns `JavaRDD[(java.lang.Long, java.lang.Int)]` or wrap the result pair in a class. I chose the latter approach in this PR. Now assignments are stored as an `RDD[Assignment]`, where `Assignment` is a class with `id` and `cluster`. Similarly, in FPGrowth, the frequent itemsets are stored as an `RDD[(Array[Item], Long)]`, which is mapped to `RDD>`. Though we provide a "Java-friendly" method `javaFreqItemsets` that returns `JavaRDD[(Array[Item], java.lang.Long)]`. It doesn't really work because `Array[Item]` is mapped to `Object` in Java. So in this PR I created a class `FreqItemset` to wrap the results. It has `items` and `freq`, as well as a `javaItems` method that returns `List` in Java. I'm not certain that the names I chose are proper: `Assignment`/`id`/`cluster` and `FreqItemset`/`items`/`freq`. Please let me know if there are better suggestions. CC: jkbradley Author: Xiangrui Meng Closes #4695 from mengxr/SPARK-5900 and squashes the following commits: 865b5ca [Xiangrui Meng] make Assignment serializable cffa96e [Xiangrui Meng] fix test 9c0e590 [Xiangrui Meng] remove unused Tuple2 1b9db3d [Xiangrui Meng] make PIC and FPGrowth Java-friendly Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cfd2ceb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cfd2ceb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cfd2ceb Branch: refs/heads/master Commit: 0cfd2cebde0b7fac3779eda80d6e42223f8a3d9f Parents: 6bddc40 Author: Xiangrui Meng Authored: Thu Feb 19 18:06:16 2015 -0800 Committer: Xiangrui Meng Committed: Thu Feb 19 18:06:16 2015 -0800 -- docs/mllib-clustering.md| 8 ++-- docs/mllib-frequent-pattern-mining.md | 12 +++--- .../examples/mllib/JavaFPGrowthExample.java | 8 ++-- .../JavaPowerIterationClusteringExample.java| 5 +-- .../spark/examples/mllib/FPGrowthExample.scala | 4 +- .../mllib/PowerIterationClusteringExample.scala | 8 +--- .../clustering/PowerIterationClustering.scala | 33 +--- .../org/apache/spark/mllib/fpm/FPGrowth.scala | 41 ++-- .../spark/mllib/fpm/JavaFPGrowthSuite.java | 30 +- .../PowerIterationClusteringSuite.scala | 8 ++-- .../apache/spark/mllib/fpm/FPGrowthSuite.scala | 10 ++--- 11 files changed, 93 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd2ceb/docs/mllib-clustering.md -- diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 6e46a47..0b6db4f 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -314,8 +314,8 @@ val pic = new PowerIteartionClustering() .setMaxIterations(20) val model = pic.run(similarities) -model.assignments.foreach { case (vertexId, clusterId) => - println(s"$vertexId -> $clusterId") +model.assignments.foreach { a => + println(s"${a.id} -> ${a.cluster}") } {% endhighlight %} @@ -349,8 +349,8 @@ PowerIterationClustering pic = new PowerIterationClustering() .setMaxIterations(10); PowerIterationClusteringModel model = pic.run(similarities); -for (Tuple2 assignment: model.assignments().toJavaRDD().collect()) { - System.out.println(assignment._1() + " -> " + assignment._2()); +for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) { + System.out.println(a.id() + " -> " + a.cluster()); } {% endhighlight %} http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd2ceb/docs/mllib-frequent-pattern-mining.md -- diff --git a/docs/mllib-frequent-pattern-mining.md b/docs/mllib-frequent-pattern-mining.md index 0ff9738..9fd9be0 100644 --- a/docs/mllib-frequent-pattern-mining.md +++ b/docs/mllib-frequent-pattern-mining.md @@ -57,8 +57,8 @@ val fpg = new FPGrowth() .setNumPartitions(10) val model = fpg.run(transactions) -model.freqItemsets.collect().foreach { case (itemset, freq) => - println(itemset.mkString("[", ",", "]") + ", " + freq) +model.freqItemsets.collect().foreach { itemset => + println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq) } {% endhighlight %} @@ -74,10 +74,9 @@ Calling `FPGrowth.run` with transactions returns an that stores the frequent itemsets with their frequencies. {% highlight java %} -import java.util.Arrays; import java.util.List;
spark git commit: SPARK-5570: No docs stating that `new SparkConf().set("spark.driver.memory", ...) will not work
Repository: spark Updated Branches: refs/heads/master 34b7c3538 -> 6bddc4035 SPARK-5570: No docs stating that `new SparkConf().set("spark.driver.memory", ...) will not work I've updated documentation to reflect true behavior of this setting in client vs. cluster mode. Author: Ilya Ganelin Closes #4665 from ilganeli/SPARK-5570 and squashes the following commits: 5d1c8dd [Ilya Ganelin] Added example configuration code a51700a [Ilya Ganelin] Getting rid of extra spaces 85f7a08 [Ilya Ganelin] Reworded note 5889d43 [Ilya Ganelin] Formatting adjustment f149ba1 [Ilya Ganelin] Minor updates 1fec7a5 [Ilya Ganelin] Updated to add clarification for other driver properties db47595 [Ilya Ganelin] Slight formatting update c899564 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5570 17b751d [Ilya Ganelin] Updated documentation for driver-memory to reflect its true behavior in client vs cluster mode Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6bddc403 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6bddc403 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6bddc403 Branch: refs/heads/master Commit: 6bddc40353057a562c78e75c5549c79a0d7d5f8b Parents: 34b7c35 Author: Ilya Ganelin Authored: Thu Feb 19 15:50:58 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 15:53:20 2015 -0800 -- docs/configuration.md | 23 ++- 1 file changed, 22 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6bddc403/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index eb0d6d3..541695c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -115,7 +115,11 @@ of the most common options to set are: Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (e.g. 512m, 2g). - + +Note: In client mode, this config must not be set through the SparkConf +directly in your application, because the driver JVM has already started at that point. +Instead, please set this through the --driver-memory command line option +or in your default properties file. spark.executor.memory @@ -214,6 +218,11 @@ Apart from these, the following properties are also available, and may be useful (none) A string of extra JVM options to pass to the driver. For instance, GC settings or other logging. + +Note: In client mode, this config must not be set through the SparkConf +directly in your application, because the driver JVM has already started at that point. +Instead, please set this through the --driver-java-options command line option or in +your default properties file. @@ -221,6 +230,11 @@ Apart from these, the following properties are also available, and may be useful (none) Extra classpath entries to append to the classpath of the driver. + +Note: In client mode, this config must not be set through the SparkConf +directly in your application, because the driver JVM has already started at that point. +Instead, please set this through the --driver-class-path command line option or in +your default properties file. @@ -228,6 +242,11 @@ Apart from these, the following properties are also available, and may be useful (none) Set a special library path to use when launching the driver JVM. + +Note: In client mode, this config must not be set through the SparkConf +directly in your application, because the driver JVM has already started at that point. +Instead, please set this through the --driver-library-path command line option or in +your default properties file. @@ -237,6 +256,8 @@ Apart from these, the following properties are also available, and may be useful (Experimental) Whether to give user-added jars precedence over Spark's own jars when loading classes in the the driver. This feature can be used to mitigate conflicts between Spark's dependencies and user dependencies. It is currently an experimental feature. + +This is used in cluster mode only. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-5570: No docs stating that `new SparkConf().set("spark.driver.memory", ...) will not work
Repository: spark Updated Branches: refs/heads/branch-1.3 bd49e8b96 -> c5f3b9e02 SPARK-5570: No docs stating that `new SparkConf().set("spark.driver.memory", ...) will not work I've updated documentation to reflect true behavior of this setting in client vs. cluster mode. Author: Ilya Ganelin Closes #4665 from ilganeli/SPARK-5570 and squashes the following commits: 5d1c8dd [Ilya Ganelin] Added example configuration code a51700a [Ilya Ganelin] Getting rid of extra spaces 85f7a08 [Ilya Ganelin] Reworded note 5889d43 [Ilya Ganelin] Formatting adjustment f149ba1 [Ilya Ganelin] Minor updates 1fec7a5 [Ilya Ganelin] Updated to add clarification for other driver properties db47595 [Ilya Ganelin] Slight formatting update c899564 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5570 17b751d [Ilya Ganelin] Updated documentation for driver-memory to reflect its true behavior in client vs cluster mode (cherry picked from commit 6bddc40353057a562c78e75c5549c79a0d7d5f8b) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5f3b9e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5f3b9e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5f3b9e0 Branch: refs/heads/branch-1.3 Commit: c5f3b9e02f8b1d1b09d4309df9a2c8633da82910 Parents: bd49e8b Author: Ilya Ganelin Authored: Thu Feb 19 15:50:58 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 15:53:33 2015 -0800 -- docs/configuration.md | 23 ++- 1 file changed, 22 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c5f3b9e0/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index eb0d6d3..541695c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -115,7 +115,11 @@ of the most common options to set are: Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (e.g. 512m, 2g). - + +Note: In client mode, this config must not be set through the SparkConf +directly in your application, because the driver JVM has already started at that point. +Instead, please set this through the --driver-memory command line option +or in your default properties file. spark.executor.memory @@ -214,6 +218,11 @@ Apart from these, the following properties are also available, and may be useful (none) A string of extra JVM options to pass to the driver. For instance, GC settings or other logging. + +Note: In client mode, this config must not be set through the SparkConf +directly in your application, because the driver JVM has already started at that point. +Instead, please set this through the --driver-java-options command line option or in +your default properties file. @@ -221,6 +230,11 @@ Apart from these, the following properties are also available, and may be useful (none) Extra classpath entries to append to the classpath of the driver. + +Note: In client mode, this config must not be set through the SparkConf +directly in your application, because the driver JVM has already started at that point. +Instead, please set this through the --driver-class-path command line option or in +your default properties file. @@ -228,6 +242,11 @@ Apart from these, the following properties are also available, and may be useful (none) Set a special library path to use when launching the driver JVM. + +Note: In client mode, this config must not be set through the SparkConf +directly in your application, because the driver JVM has already started at that point. +Instead, please set this through the --driver-library-path command line option or in +your default properties file. @@ -237,6 +256,8 @@ Apart from these, the following properties are also available, and may be useful (Experimental) Whether to give user-added jars precedence over Spark's own jars when loading classes in the the driver. This feature can be used to mitigate conflicts between Spark's dependencies and user dependencies. It is currently an experimental feature. + +This is used in cluster mode only. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: SPARK-4682 [CORE] Consolidate various 'Clock' classes
Repository: spark Updated Branches: refs/heads/branch-1.3 ff8976ec7 -> bd49e8b96 http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 132ff24..818f551 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.util._ -import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.{AkkaUtils, ManualClock} import WriteAheadLogBasedBlockHandler._ import WriteAheadLogSuite._ @@ -165,7 +165,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche preCleanupLogFiles.size should be > 1 // this depends on the number of blocks inserted using generateAndStoreData() - manualClock.currentTime() shouldEqual 5000L + manualClock.getTimeMillis() shouldEqual 5000L val cleanupThreshTime = 3000L handler.cleanupOldBlocks(cleanupThreshTime) @@ -243,7 +243,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche val blockIds = Seq.fill(blocks.size)(generateBlockId()) val storeResults = blocks.zip(blockIds).map { case (block, id) => -manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf +manualClock.advance(500) // log rolling interval set to 1000 ms through SparkConf logDebug("Inserting block " + id) receivedBlockHandler.storeBlock(id, block) }.toList http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index fbb7b0b..a3a0fd5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -34,9 +34,9 @@ import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock, WriteAheadLogReader} +import org.apache.spark.streaming.util.WriteAheadLogReader import org.apache.spark.streaming.util.WriteAheadLogSuite._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} class ReceivedBlockTrackerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { @@ -100,7 +100,7 @@ class ReceivedBlockTrackerSuite def incrementTime() { val timeIncrementMillis = 2000L - manualClock.addToTime(timeIncrementMillis) + manualClock.advance(timeIncrementMillis) } // Generate and add blocks to the given tracker @@ -138,13 +138,13 @@ class ReceivedBlockTrackerSuite tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 // Allocate blocks to batch and verify whether the unallocated blocks got allocated -val batchTime1 = manualClock.currentTime +val batchTime1 = manualClock.getTimeMillis() tracker2.allocateBlocksToBatch(batchTime1) tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1 // Add more blocks and allocate to another batch incrementTime() -val batchTime2 = manualClock.currentTime +val batchTime2 = manualClock.getTimeMillis() val blockInfos2 = addBlockInfos(tracker2) tracker2.allocateBlocksToBatch(batchTime2) tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 7d82c3e..c2375ff 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -31,10 +31,9 @@ import org.scalatest.concurre
[1/2] spark git commit: SPARK-4682 [CORE] Consolidate various 'Clock' classes
Repository: spark Updated Branches: refs/heads/master ad6b169de -> 34b7c3538 http://git-wip-us.apache.org/repos/asf/spark/blob/34b7c353/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 132ff24..818f551 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.util._ -import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.{AkkaUtils, ManualClock} import WriteAheadLogBasedBlockHandler._ import WriteAheadLogSuite._ @@ -165,7 +165,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche preCleanupLogFiles.size should be > 1 // this depends on the number of blocks inserted using generateAndStoreData() - manualClock.currentTime() shouldEqual 5000L + manualClock.getTimeMillis() shouldEqual 5000L val cleanupThreshTime = 3000L handler.cleanupOldBlocks(cleanupThreshTime) @@ -243,7 +243,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche val blockIds = Seq.fill(blocks.size)(generateBlockId()) val storeResults = blocks.zip(blockIds).map { case (block, id) => -manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf +manualClock.advance(500) // log rolling interval set to 1000 ms through SparkConf logDebug("Inserting block " + id) receivedBlockHandler.storeBlock(id, block) }.toList http://git-wip-us.apache.org/repos/asf/spark/blob/34b7c353/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index fbb7b0b..a3a0fd5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -34,9 +34,9 @@ import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock, WriteAheadLogReader} +import org.apache.spark.streaming.util.WriteAheadLogReader import org.apache.spark.streaming.util.WriteAheadLogSuite._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} class ReceivedBlockTrackerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { @@ -100,7 +100,7 @@ class ReceivedBlockTrackerSuite def incrementTime() { val timeIncrementMillis = 2000L - manualClock.addToTime(timeIncrementMillis) + manualClock.advance(timeIncrementMillis) } // Generate and add blocks to the given tracker @@ -138,13 +138,13 @@ class ReceivedBlockTrackerSuite tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 // Allocate blocks to batch and verify whether the unallocated blocks got allocated -val batchTime1 = manualClock.currentTime +val batchTime1 = manualClock.getTimeMillis() tracker2.allocateBlocksToBatch(batchTime1) tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1 // Add more blocks and allocate to another batch incrementTime() -val batchTime2 = manualClock.currentTime +val batchTime2 = manualClock.getTimeMillis() val blockInfos2 = addBlockInfos(tracker2) tracker2.allocateBlocksToBatch(batchTime2) tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 http://git-wip-us.apache.org/repos/asf/spark/blob/34b7c353/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 7d82c3e..c2375ff 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -31,10 +31,9 @@ import org.scalatest.concurrent.P
[2/2] spark git commit: SPARK-4682 [CORE] Consolidate various 'Clock' classes
SPARK-4682 [CORE] Consolidate various 'Clock' classes Another one from JoshRosen 's wish list. The first commit is much smaller and removes 2 of the 4 Clock classes. The second is much larger, necessary for consolidating the streaming one. I put together implementations in the way that seemed simplest. Almost all the change is standardizing class and method names. Author: Sean Owen Closes #4514 from srowen/SPARK-4682 and squashes the following commits: 5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark] 169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock class names 277785a [Sean Owen] Reduce the net change in this patch by reversing some unnecessary syntax changes along the way b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis() 160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock 7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock (cherry picked from commit 34b7c35380c88569a1396fb4ed991a0bed4288e7) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd49e8b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd49e8b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd49e8b9 Branch: refs/heads/branch-1.3 Commit: bd49e8b962b397b8fb8b22f980739021cf1a195e Parents: ff8976e Author: Sean Owen Authored: Thu Feb 19 15:35:23 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 15:35:31 2015 -0800 -- .../spark/ExecutorAllocationManager.scala | 28 +- .../spark/deploy/worker/DriverRunner.scala | 17 ++-- .../apache/spark/scheduler/DAGScheduler.scala | 20 ++--- .../apache/spark/scheduler/TaskSetManager.scala | 16 ++-- .../scala/org/apache/spark/util/Clock.scala | 44 +- .../org/apache/spark/util/ManualClock.scala | 69 +++ .../spark/ExecutorAllocationManagerSuite.scala | 65 +++--- .../spark/deploy/worker/DriverRunnerTest.scala | 3 +- .../spark/scheduler/TaskSetManagerSuite.scala | 36 .../scala/org/apache/spark/util/FakeClock.scala | 26 -- .../streaming/LocalJavaStreamingContext.java| 2 +- .../flume/FlumePollingStreamSuite.scala | 7 +- .../streaming/LocalJavaStreamingContext.java| 2 +- .../streaming/LocalJavaStreamingContext.java| 2 +- .../streaming/LocalJavaStreamingContext.java| 2 +- .../kinesis/KinesisCheckpointState.scala| 10 +-- .../kinesis/KinesisRecordProcessor.scala| 2 +- .../kinesis/KinesisReceiverSuite.scala | 25 +++--- .../JavaStreamingLogisticRegressionSuite.java | 2 +- .../JavaStreamingLinearRegressionSuite.java | 2 +- project/MimaExcludes.scala | 5 ++ .../streaming/dstream/FileInputDStream.scala| 6 +- .../streaming/receiver/BlockGenerator.scala | 3 +- .../receiver/ReceivedBlockHandler.scala | 4 +- .../streaming/scheduler/JobGenerator.scala | 13 ++- .../scheduler/ReceivedBlockTracker.scala| 6 +- .../org/apache/spark/streaming/util/Clock.scala | 89 .../spark/streaming/util/RecurringTimer.scala | 5 +- .../streaming/util/WriteAheadLogManager.scala | 5 +- .../streaming/LocalJavaStreamingContext.java| 2 +- .../spark/streaming/BasicOperationsSuite.scala | 9 +- .../spark/streaming/CheckpointSuite.scala | 33 .../spark/streaming/InputStreamsSuite.scala | 37 .../streaming/ReceivedBlockHandlerSuite.scala | 6 +- .../streaming/ReceivedBlockTrackerSuite.scala | 10 +-- .../apache/spark/streaming/TestSuiteBase.scala | 15 ++-- .../streaming/util/WriteAheadLogSuite.scala | 10 +-- 37 files changed, 301 insertions(+), 337 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bd49e8b9/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 998695b..21c6e6f 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -20,6 +20,7 @@ package org.apache.spark import scala.collection.mutable import org.apache.spark.scheduler._ +import org.apache.spark.util.{SystemClock, Clock} /** * An agent that dynamically allocates and removes executors based on the workload. @@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager( private val intervalMillis: Long = 100 // Clock used to schedule when executors should be added and removed - private var clock: Clock = new RealClock + private var clock: Clock = new S
[2/2] spark git commit: SPARK-4682 [CORE] Consolidate various 'Clock' classes
SPARK-4682 [CORE] Consolidate various 'Clock' classes Another one from JoshRosen 's wish list. The first commit is much smaller and removes 2 of the 4 Clock classes. The second is much larger, necessary for consolidating the streaming one. I put together implementations in the way that seemed simplest. Almost all the change is standardizing class and method names. Author: Sean Owen Closes #4514 from srowen/SPARK-4682 and squashes the following commits: 5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark] 169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock class names 277785a [Sean Owen] Reduce the net change in this patch by reversing some unnecessary syntax changes along the way b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis() 160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock 7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34b7c353 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34b7c353 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34b7c353 Branch: refs/heads/master Commit: 34b7c35380c88569a1396fb4ed991a0bed4288e7 Parents: ad6b169 Author: Sean Owen Authored: Thu Feb 19 15:35:23 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 15:35:23 2015 -0800 -- .../spark/ExecutorAllocationManager.scala | 28 +- .../spark/deploy/worker/DriverRunner.scala | 17 ++-- .../apache/spark/scheduler/DAGScheduler.scala | 20 ++--- .../apache/spark/scheduler/TaskSetManager.scala | 16 ++-- .../scala/org/apache/spark/util/Clock.scala | 44 +- .../org/apache/spark/util/ManualClock.scala | 69 +++ .../spark/ExecutorAllocationManagerSuite.scala | 65 +++--- .../spark/deploy/worker/DriverRunnerTest.scala | 3 +- .../spark/scheduler/TaskSetManagerSuite.scala | 36 .../scala/org/apache/spark/util/FakeClock.scala | 26 -- .../streaming/LocalJavaStreamingContext.java| 2 +- .../flume/FlumePollingStreamSuite.scala | 7 +- .../streaming/LocalJavaStreamingContext.java| 2 +- .../streaming/LocalJavaStreamingContext.java| 2 +- .../streaming/LocalJavaStreamingContext.java| 2 +- .../kinesis/KinesisCheckpointState.scala| 10 +-- .../kinesis/KinesisRecordProcessor.scala| 2 +- .../kinesis/KinesisReceiverSuite.scala | 25 +++--- .../JavaStreamingLogisticRegressionSuite.java | 2 +- .../JavaStreamingLinearRegressionSuite.java | 2 +- project/MimaExcludes.scala | 5 ++ .../streaming/dstream/FileInputDStream.scala| 6 +- .../streaming/receiver/BlockGenerator.scala | 3 +- .../receiver/ReceivedBlockHandler.scala | 4 +- .../streaming/scheduler/JobGenerator.scala | 13 ++- .../scheduler/ReceivedBlockTracker.scala| 6 +- .../org/apache/spark/streaming/util/Clock.scala | 89 .../spark/streaming/util/RecurringTimer.scala | 5 +- .../streaming/util/WriteAheadLogManager.scala | 5 +- .../streaming/LocalJavaStreamingContext.java| 2 +- .../spark/streaming/BasicOperationsSuite.scala | 9 +- .../spark/streaming/CheckpointSuite.scala | 33 .../spark/streaming/InputStreamsSuite.scala | 37 .../streaming/ReceivedBlockHandlerSuite.scala | 6 +- .../streaming/ReceivedBlockTrackerSuite.scala | 10 +-- .../apache/spark/streaming/TestSuiteBase.scala | 15 ++-- .../streaming/util/WriteAheadLogSuite.scala | 10 +-- 37 files changed, 301 insertions(+), 337 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/34b7c353/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 998695b..21c6e6f 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -20,6 +20,7 @@ package org.apache.spark import scala.collection.mutable import org.apache.spark.scheduler._ +import org.apache.spark.util.{SystemClock, Clock} /** * An agent that dynamically allocates and removes executors based on the workload. @@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager( private val intervalMillis: Long = 100 // Clock used to schedule when executors should be added and removed - private var clock: Clock = new RealClock + private var clock: Clock = new SystemClock() // Listener for Spark events that impact the allocation policy private val list
spark git commit: [Spark-5889] Remove pid file after stopping service.
Repository: spark Updated Branches: refs/heads/branch-1.2 856fdcb65 -> 18fbed5b5 [Spark-5889] Remove pid file after stopping service. Currently the pid file is not deleted, and potentially may cause some problem after service is stopped. The fix remove the pid file after service stopped. Author: Zhan Zhang Closes #4676 from zhzhan/spark-5889 and squashes the following commits: eb01be1 [Zhan Zhang] solve review comments b4c009e [Zhan Zhang] solve review comments 018110a [Zhan Zhang] spark-5889: remove pid file after stopping service 088d2a2 [Zhan Zhang] squash all commits c1f1fa5 [Zhan Zhang] test (cherry picked from commit ad6b169dee84df175b51933b7a3ad7f0bbc52cf3) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18fbed5b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18fbed5b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18fbed5b Branch: refs/heads/branch-1.2 Commit: 18fbed5b54818a84671b726ae952736f3251f9ff Parents: 856fdcb Author: Zhan Zhang Authored: Thu Feb 19 23:13:02 2015 + Committer: Sean Owen Committed: Thu Feb 19 23:14:56 2015 + -- sbin/spark-daemon.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/18fbed5b/sbin/spark-daemon.sh -- diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index e1bcc7d..5e812a1 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -168,7 +168,7 @@ case $option in TARGET_ID="$(cat "$pid")" if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo "stopping $command" -kill "$TARGET_ID" +kill "$TARGET_ID" && rm -f "$pid" else echo "no $command to stop" fi - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [Spark-5889] Remove pid file after stopping service.
Repository: spark Updated Branches: refs/heads/master a5fed3435 -> ad6b169de [Spark-5889] Remove pid file after stopping service. Currently the pid file is not deleted, and potentially may cause some problem after service is stopped. The fix remove the pid file after service stopped. Author: Zhan Zhang Closes #4676 from zhzhan/spark-5889 and squashes the following commits: eb01be1 [Zhan Zhang] solve review comments b4c009e [Zhan Zhang] solve review comments 018110a [Zhan Zhang] spark-5889: remove pid file after stopping service 088d2a2 [Zhan Zhang] squash all commits c1f1fa5 [Zhan Zhang] test Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad6b169d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad6b169d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad6b169d Branch: refs/heads/master Commit: ad6b169dee84df175b51933b7a3ad7f0bbc52cf3 Parents: a5fed34 Author: Zhan Zhang Authored: Thu Feb 19 23:13:02 2015 + Committer: Sean Owen Committed: Thu Feb 19 23:13:02 2015 + -- sbin/spark-daemon.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ad6b169d/sbin/spark-daemon.sh -- diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index e1bcc7d..5e812a1 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -168,7 +168,7 @@ case $option in TARGET_ID="$(cat "$pid")" if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo "stopping $command" -kill "$TARGET_ID" +kill "$TARGET_ID" && rm -f "$pid" else echo "no $command to stop" fi - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [Spark-5889] Remove pid file after stopping service.
Repository: spark Updated Branches: refs/heads/branch-1.3 0c494cf9a -> ff8976ec7 [Spark-5889] Remove pid file after stopping service. Currently the pid file is not deleted, and potentially may cause some problem after service is stopped. The fix remove the pid file after service stopped. Author: Zhan Zhang Closes #4676 from zhzhan/spark-5889 and squashes the following commits: eb01be1 [Zhan Zhang] solve review comments b4c009e [Zhan Zhang] solve review comments 018110a [Zhan Zhang] spark-5889: remove pid file after stopping service 088d2a2 [Zhan Zhang] squash all commits c1f1fa5 [Zhan Zhang] test (cherry picked from commit ad6b169dee84df175b51933b7a3ad7f0bbc52cf3) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff8976ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff8976ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff8976ec Branch: refs/heads/branch-1.3 Commit: ff8976ec7825bec0f0c30bf763a8ad3dcc14542b Parents: 0c494cf Author: Zhan Zhang Authored: Thu Feb 19 23:13:02 2015 + Committer: Sean Owen Committed: Thu Feb 19 23:14:08 2015 + -- sbin/spark-daemon.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ff8976ec/sbin/spark-daemon.sh -- diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index e1bcc7d..5e812a1 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -168,7 +168,7 @@ case $option in TARGET_ID="$(cat "$pid")" if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo "stopping $command" -kill "$TARGET_ID" +kill "$TARGET_ID" && rm -f "$pid" else echo "no $command to stop" fi - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5902] [ml] Made PipelineStage.transformSchema public instead of private to ml
Repository: spark Updated Branches: refs/heads/master 8ca3418e1 -> a5fed3435 [SPARK-5902] [ml] Made PipelineStage.transformSchema public instead of private to ml For users to implement their own PipelineStages, we need to make PipelineStage.transformSchema be public instead of private to ml. This would be nice to include in Spark 1.3 CC: mengxr Author: Joseph K. Bradley Closes #4682 from jkbradley/SPARK-5902 and squashes the following commits: 6f02357 [Joseph K. Bradley] Made transformSchema public 0e6d0a0 [Joseph K. Bradley] made implementations of transformSchema protected as well fdaf26a [Joseph K. Bradley] Made PipelineStage.transformSchema protected instead of private[ml] Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5fed343 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5fed343 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5fed343 Branch: refs/heads/master Commit: a5fed34355b403188ad50b567ab62ee54597b493 Parents: 8ca3418 Author: Joseph K. Bradley Authored: Thu Feb 19 12:46:27 2015 -0800 Committer: Xiangrui Meng Committed: Thu Feb 19 12:46:27 2015 -0800 -- .../main/scala/org/apache/spark/ml/Pipeline.scala | 16 .../apache/spark/ml/feature/StandardScaler.scala| 4 ++-- .../apache/spark/ml/impl/estimator/Predictor.scala | 4 ++-- .../org/apache/spark/ml/recommendation/ALS.scala| 4 ++-- .../org/apache/spark/ml/tuning/CrossValidator.scala | 4 ++-- 5 files changed, 20 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a5fed343/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index 5607ed2..5bbcd2e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml import scala.collection.mutable.ListBuffer import org.apache.spark.Logging -import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.annotation.{AlphaComponent, DeveloperApi} import org.apache.spark.ml.param.{Param, ParamMap} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType @@ -33,9 +33,17 @@ import org.apache.spark.sql.types.StructType abstract class PipelineStage extends Serializable with Logging { /** + * :: DeveloperAPI :: + * * Derives the output schema from the input schema and parameters. + * The schema describes the columns and types of the data. + * + * @param schema Input schema to this stage + * @param paramMap Parameters passed to this stage + * @return Output schema from this stage */ - private[ml] def transformSchema(schema: StructType, paramMap: ParamMap): StructType + @DeveloperApi + def transformSchema(schema: StructType, paramMap: ParamMap): StructType /** * Derives the output schema from the input schema and parameters, optionally with logging. @@ -126,7 +134,7 @@ class Pipeline extends Estimator[PipelineModel] { new PipelineModel(this, map, transformers.toArray) } - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { val map = this.paramMap ++ paramMap val theStages = map(stages) require(theStages.toSet.size == theStages.size, @@ -171,7 +179,7 @@ class PipelineModel private[ml] ( stages.foldLeft(dataset)((cur, transformer) => transformer.transform(cur, map)) } - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { // Precedence of ParamMaps: paramMap > this.paramMap > fittingParamMap val map = (fittingParamMap ++ this.paramMap) ++ paramMap stages.foldLeft(schema)((cur, transformer) => transformer.transformSchema(cur, map)) http://git-wip-us.apache.org/repos/asf/spark/blob/a5fed343/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index ddbd648..1142aa4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -55,7 +55,7 @@ class StandardScaler extends Estimator[StandardScalerModel] with StandardScalerP model } - private[ml]
spark git commit: [SPARK-5902] [ml] Made PipelineStage.transformSchema public instead of private to ml
Repository: spark Updated Branches: refs/heads/branch-1.3 55d91d92b -> 0c494cf9a [SPARK-5902] [ml] Made PipelineStage.transformSchema public instead of private to ml For users to implement their own PipelineStages, we need to make PipelineStage.transformSchema be public instead of private to ml. This would be nice to include in Spark 1.3 CC: mengxr Author: Joseph K. Bradley Closes #4682 from jkbradley/SPARK-5902 and squashes the following commits: 6f02357 [Joseph K. Bradley] Made transformSchema public 0e6d0a0 [Joseph K. Bradley] made implementations of transformSchema protected as well fdaf26a [Joseph K. Bradley] Made PipelineStage.transformSchema protected instead of private[ml] (cherry picked from commit a5fed34355b403188ad50b567ab62ee54597b493) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c494cf9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c494cf9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c494cf9 Branch: refs/heads/branch-1.3 Commit: 0c494cf9a3d2b717d86f53445b35f725afa89ac8 Parents: 55d91d9 Author: Joseph K. Bradley Authored: Thu Feb 19 12:46:27 2015 -0800 Committer: Xiangrui Meng Committed: Thu Feb 19 12:46:37 2015 -0800 -- .../main/scala/org/apache/spark/ml/Pipeline.scala | 16 .../apache/spark/ml/feature/StandardScaler.scala| 4 ++-- .../apache/spark/ml/impl/estimator/Predictor.scala | 4 ++-- .../org/apache/spark/ml/recommendation/ALS.scala| 4 ++-- .../org/apache/spark/ml/tuning/CrossValidator.scala | 4 ++-- 5 files changed, 20 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c494cf9/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index 5607ed2..5bbcd2e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml import scala.collection.mutable.ListBuffer import org.apache.spark.Logging -import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.annotation.{AlphaComponent, DeveloperApi} import org.apache.spark.ml.param.{Param, ParamMap} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType @@ -33,9 +33,17 @@ import org.apache.spark.sql.types.StructType abstract class PipelineStage extends Serializable with Logging { /** + * :: DeveloperAPI :: + * * Derives the output schema from the input schema and parameters. + * The schema describes the columns and types of the data. + * + * @param schema Input schema to this stage + * @param paramMap Parameters passed to this stage + * @return Output schema from this stage */ - private[ml] def transformSchema(schema: StructType, paramMap: ParamMap): StructType + @DeveloperApi + def transformSchema(schema: StructType, paramMap: ParamMap): StructType /** * Derives the output schema from the input schema and parameters, optionally with logging. @@ -126,7 +134,7 @@ class Pipeline extends Estimator[PipelineModel] { new PipelineModel(this, map, transformers.toArray) } - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { val map = this.paramMap ++ paramMap val theStages = map(stages) require(theStages.toSet.size == theStages.size, @@ -171,7 +179,7 @@ class PipelineModel private[ml] ( stages.foldLeft(dataset)((cur, transformer) => transformer.transform(cur, map)) } - private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { // Precedence of ParamMaps: paramMap > this.paramMap > fittingParamMap val map = (fittingParamMap ++ this.paramMap) ++ paramMap stages.foldLeft(schema)((cur, transformer) => transformer.transformSchema(cur, map)) http://git-wip-us.apache.org/repos/asf/spark/blob/0c494cf9/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index ddbd648..1142aa4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -55,7 +55,7 @@ class
spark git commit: [SPARK-5904][SQL] DataFrame API fixes.
Repository: spark Updated Branches: refs/heads/branch-1.3 fe00eb66e -> 55d91d92b [SPARK-5904][SQL] DataFrame API fixes. 1. Column is no longer a DataFrame to simplify class hierarchy. 2. Don't use varargs on abstract methods (see Scala compiler bug SI-9013). Author: Reynold Xin Closes #4686 from rxin/SPARK-5904 and squashes the following commits: fd9b199 [Reynold Xin] Fixed Python tests. df25cef [Reynold Xin] Non final. 5221530 [Reynold Xin] [SPARK-5904][SQL] DataFrame API fixes. Conflicts: sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55d91d92 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55d91d92 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55d91d92 Branch: refs/heads/branch-1.3 Commit: 55d91d92bcdd56c479138cbdd9d2354bb8db52a7 Parents: fe00eb6 Author: Reynold Xin Authored: Thu Feb 19 12:09:44 2015 -0800 Committer: Michael Armbrust Committed: Thu Feb 19 12:15:18 2015 -0800 -- python/pyspark/sql/dataframe.py | 56 +-- .../scala/org/apache/spark/sql/Column.scala | 223 +++-- .../org/apache/spark/sql/ComputableColumn.scala | 33 -- .../scala/org/apache/spark/sql/DataFrame.scala | 420 .../org/apache/spark/sql/DataFrameImpl.scala| 483 --- .../org/apache/spark/sql/GroupedData.scala | 2 +- .../apache/spark/sql/IncomputableColumn.scala | 183 --- .../spark/sql/ColumnExpressionSuite.scala | 44 +- .../org/apache/spark/sql/DataFrameSuite.scala | 7 +- 9 files changed, 427 insertions(+), 1024 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/55d91d92/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index c68c97e..010c38f 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -546,7 +546,7 @@ class DataFrame(object): def __getitem__(self, item): """ Return the column by given name ->>> df['age'].collect() +>>> df.select(df['age']).collect() [Row(age=2), Row(age=5)] >>> df[ ["name", "age"]].collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] @@ -555,7 +555,7 @@ class DataFrame(object): """ if isinstance(item, basestring): jc = self._jdf.apply(item) -return Column(jc, self.sql_ctx) +return Column(jc) elif isinstance(item, Column): return self.filter(item) elif isinstance(item, list): @@ -566,13 +566,13 @@ class DataFrame(object): def __getattr__(self, name): """ Return the column by given name ->>> df.age.collect() +>>> df.select(df.age).collect() [Row(age=2), Row(age=5)] """ if name.startswith("__"): raise AttributeError(name) jc = self._jdf.apply(name) -return Column(jc, self.sql_ctx) +return Column(jc) def select(self, *cols): """ Selecting a set of expressions. @@ -698,7 +698,7 @@ class DataFrame(object): >>> df.withColumnRenamed('age', 'age2').collect() [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')] """ -cols = [Column(_to_java_column(c), self.sql_ctx).alias(new) +cols = [Column(_to_java_column(c)).alias(new) if c == existing else c for c in self.columns] return self.select(*cols) @@ -873,15 +873,16 @@ def _unary_op(name, doc="unary operator"): """ Create a method for given unary operator """ def _(self): jc = getattr(self._jc, name)() -return Column(jc, self.sql_ctx) +return Column(jc) _.__doc__ = doc return _ def _func_op(name, doc=''): def _(self): -jc = getattr(self._sc._jvm.functions, name)(self._jc) -return Column(jc, self.sql_ctx) +sc = SparkContext._active_spark_context +jc = getattr(sc._jvm.functions, name)(self._jc) +return Column(jc) _.__doc__ = doc return _ @@ -892,7 +893,7 @@ def _bin_op(name, doc="binary operator"): def _(self, other): jc = other._jc if isinstance(other, Column) else other njc = getattr(self._jc, name)(jc) -return Column(njc, self.sql_ctx) +return Column(njc) _.__doc__ = doc return _ @@ -903,12 +904,12 @@ def _reverse_op(name, doc="binary operator"): def _(self, other): jother = _create_column_from_literal(other) jc = getattr(jother, name)(self._jc) -return Column(jc, self.sql_ctx) +return Column(jc) _.__doc__ = doc return _ -class Column(
spark git commit: [SPARK-5904][SQL] DataFrame API fixes.
Repository: spark Updated Branches: refs/heads/master 94cdb05ff -> 8ca3418e1 [SPARK-5904][SQL] DataFrame API fixes. 1. Column is no longer a DataFrame to simplify class hierarchy. 2. Don't use varargs on abstract methods (see Scala compiler bug SI-9013). Author: Reynold Xin Closes #4686 from rxin/SPARK-5904 and squashes the following commits: fd9b199 [Reynold Xin] Fixed Python tests. df25cef [Reynold Xin] Non final. 5221530 [Reynold Xin] [SPARK-5904][SQL] DataFrame API fixes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ca3418e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ca3418e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ca3418e Branch: refs/heads/master Commit: 8ca3418e1b3e2687e75a08c185d17045a97279fb Parents: 94cdb05 Author: Reynold Xin Authored: Thu Feb 19 12:09:44 2015 -0800 Committer: Michael Armbrust Committed: Thu Feb 19 12:09:44 2015 -0800 -- python/pyspark/sql/dataframe.py | 56 +-- .../scala/org/apache/spark/sql/Column.scala | 223 +++-- .../org/apache/spark/sql/ComputableColumn.scala | 33 -- .../scala/org/apache/spark/sql/DataFrame.scala | 420 .../org/apache/spark/sql/DataFrameImpl.scala| 483 --- .../org/apache/spark/sql/GroupedData.scala | 2 +- .../apache/spark/sql/IncomputableColumn.scala | 183 --- .../spark/sql/ColumnExpressionSuite.scala | 44 +- .../org/apache/spark/sql/DataFrameSuite.scala | 7 +- 9 files changed, 427 insertions(+), 1024 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3418e/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index c68c97e..010c38f 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -546,7 +546,7 @@ class DataFrame(object): def __getitem__(self, item): """ Return the column by given name ->>> df['age'].collect() +>>> df.select(df['age']).collect() [Row(age=2), Row(age=5)] >>> df[ ["name", "age"]].collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] @@ -555,7 +555,7 @@ class DataFrame(object): """ if isinstance(item, basestring): jc = self._jdf.apply(item) -return Column(jc, self.sql_ctx) +return Column(jc) elif isinstance(item, Column): return self.filter(item) elif isinstance(item, list): @@ -566,13 +566,13 @@ class DataFrame(object): def __getattr__(self, name): """ Return the column by given name ->>> df.age.collect() +>>> df.select(df.age).collect() [Row(age=2), Row(age=5)] """ if name.startswith("__"): raise AttributeError(name) jc = self._jdf.apply(name) -return Column(jc, self.sql_ctx) +return Column(jc) def select(self, *cols): """ Selecting a set of expressions. @@ -698,7 +698,7 @@ class DataFrame(object): >>> df.withColumnRenamed('age', 'age2').collect() [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')] """ -cols = [Column(_to_java_column(c), self.sql_ctx).alias(new) +cols = [Column(_to_java_column(c)).alias(new) if c == existing else c for c in self.columns] return self.select(*cols) @@ -873,15 +873,16 @@ def _unary_op(name, doc="unary operator"): """ Create a method for given unary operator """ def _(self): jc = getattr(self._jc, name)() -return Column(jc, self.sql_ctx) +return Column(jc) _.__doc__ = doc return _ def _func_op(name, doc=''): def _(self): -jc = getattr(self._sc._jvm.functions, name)(self._jc) -return Column(jc, self.sql_ctx) +sc = SparkContext._active_spark_context +jc = getattr(sc._jvm.functions, name)(self._jc) +return Column(jc) _.__doc__ = doc return _ @@ -892,7 +893,7 @@ def _bin_op(name, doc="binary operator"): def _(self, other): jc = other._jc if isinstance(other, Column) else other njc = getattr(self._jc, name)(jc) -return Column(njc, self.sql_ctx) +return Column(njc) _.__doc__ = doc return _ @@ -903,12 +904,12 @@ def _reverse_op(name, doc="binary operator"): def _(self, other): jother = _create_column_from_literal(other) jc = getattr(jother, name)(self._jc) -return Column(jc, self.sql_ctx) +return Column(jc) _.__doc__ = doc return _ -class Column(DataFrame): +class Column(object): """ A column in a DataFrame. @@ -924,9 +925,8
spark git commit: [SPARK-5825] [Spark Submit] Remove the double checking instance name when stopping the service
Repository: spark Updated Branches: refs/heads/master 90095bf3c -> 94cdb05ff [SPARK-5825] [Spark Submit] Remove the double checking instance name when stopping the service `spark-daemon.sh` will confirm the process id by fuzzy matching the class name while stopping the service, however, it will fail if the java process arguments is very long (greater than 4096 characters). This PR looses the check for the service process. Author: Cheng Hao Closes #4611 from chenghao-intel/stopping_service and squashes the following commits: a0051f6 [Cheng Hao] loosen the process checking while stopping a service Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94cdb05f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94cdb05f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94cdb05f Branch: refs/heads/master Commit: 94cdb05ff7e6b8fc5b3a574202ba8bc8e5bbe689 Parents: 90095bf Author: Cheng Hao Authored: Thu Feb 19 12:07:51 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 12:07:51 2015 -0800 -- sbin/spark-daemon.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/94cdb05f/sbin/spark-daemon.sh -- diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index ec6d0b5..e1bcc7d 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -166,7 +166,7 @@ case $option in if [ -f $pid ]; then TARGET_ID="$(cat "$pid")" - if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then + if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo "stopping $command" kill "$TARGET_ID" else - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5825] [Spark Submit] Remove the double checking instance name when stopping the service
Repository: spark Updated Branches: refs/heads/branch-1.2 61bde0049 -> 856fdcb65 [SPARK-5825] [Spark Submit] Remove the double checking instance name when stopping the service `spark-daemon.sh` will confirm the process id by fuzzy matching the class name while stopping the service, however, it will fail if the java process arguments is very long (greater than 4096 characters). This PR looses the check for the service process. Author: Cheng Hao Closes #4611 from chenghao-intel/stopping_service and squashes the following commits: a0051f6 [Cheng Hao] loosen the process checking while stopping a service (cherry picked from commit 94cdb05ff7e6b8fc5b3a574202ba8bc8e5bbe689) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/856fdcb6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/856fdcb6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/856fdcb6 Branch: refs/heads/branch-1.2 Commit: 856fdcb654184f4d4ea3d620e48093ce3ada4cbb Parents: 61bde00 Author: Cheng Hao Authored: Thu Feb 19 12:07:51 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 12:08:03 2015 -0800 -- sbin/spark-daemon.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/856fdcb6/sbin/spark-daemon.sh -- diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index ec6d0b5..e1bcc7d 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -166,7 +166,7 @@ case $option in if [ -f $pid ]; then TARGET_ID="$(cat "$pid")" - if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then + if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo "stopping $command" kill "$TARGET_ID" else - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5825] [Spark Submit] Remove the double checking instance name when stopping the service
Repository: spark Updated Branches: refs/heads/branch-1.3 25fae8e7e -> fe00eb66e [SPARK-5825] [Spark Submit] Remove the double checking instance name when stopping the service `spark-daemon.sh` will confirm the process id by fuzzy matching the class name while stopping the service, however, it will fail if the java process arguments is very long (greater than 4096 characters). This PR looses the check for the service process. Author: Cheng Hao Closes #4611 from chenghao-intel/stopping_service and squashes the following commits: a0051f6 [Cheng Hao] loosen the process checking while stopping a service (cherry picked from commit 94cdb05ff7e6b8fc5b3a574202ba8bc8e5bbe689) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe00eb66 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe00eb66 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe00eb66 Branch: refs/heads/branch-1.3 Commit: fe00eb66e8a65e23e81d401da32c5d5de8180394 Parents: 25fae8e Author: Cheng Hao Authored: Thu Feb 19 12:07:51 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 12:07:56 2015 -0800 -- sbin/spark-daemon.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fe00eb66/sbin/spark-daemon.sh -- diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index ec6d0b5..e1bcc7d 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -166,7 +166,7 @@ case $option in if [ -f $pid ]; then TARGET_ID="$(cat "$pid")" - if [[ $(ps -p "$TARGET_ID" -o args=) =~ $command ]]; then + if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo "stopping $command" kill "$TARGET_ID" else - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file
Repository: spark Updated Branches: refs/heads/branch-1.2 f6ee80b18 -> 61bde0049 [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file This PR adds a `finalize` method in DiskMapIterator to clean up the resources even if some exception happens during processing data. Author: zsxwing Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits: d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file (cherry picked from commit 90095bf3ce9304d09a32ceffaa99069079071b59) Signed-off-by: Ubuntu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61bde004 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61bde004 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61bde004 Branch: refs/heads/branch-1.2 Commit: 61bde0049fac324b5004eadfa22b02cd76cf2187 Parents: f6ee80b Author: zsxwing Authored: Thu Feb 19 18:37:31 2015 + Committer: Ubuntu Committed: Thu Feb 19 18:38:00 2015 + -- .../util/collection/ExternalAppendOnlyMap.scala | 52 1 file changed, 43 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/61bde004/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8a0f5a6..fc7e86e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C]( private var batchIndex = 0 // Which batch we're in private var fileStream: FileInputStream = null +@volatile private var closed = false + +// A volatile variable to remember which DeserializationStream is using. Need to set it when we +// open a DeserializationStream. But we should use `deserializeStream` rather than +// `deserializeStreamToBeClosed` to read the content because touching a volatile variable will +// reduce the performance. It must be volatile so that we can see its correct value in the +// `finalize` method, which could run in any thread. +@volatile private var deserializeStreamToBeClosed: DeserializationStream = null + // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams private var deserializeStream = nextBatchStream() @@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C]( // we're still in a valid batch. if (batchIndex < batchOffsets.length - 1) { if (deserializeStream != null) { + deserializeStreamToBeClosed = null deserializeStream.close() fileStream.close() deserializeStream = null @@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C]( val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) -ser.deserializeStream(compressedStream) +// Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can +// close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed` +// during reading the (K, C) pairs. +deserializeStreamToBeClosed = ser.deserializeStream(compressedStream) +deserializeStreamToBeClosed } else { // No more batches left cleanup() @@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C]( item } -// TODO: Ensure this gets called even if the iterator isn't drained. -private def cleanup() { - batchIndex = batchOffsets.length // Prevent reading any other batch - val ds = deserializeStream - deserializeStream = null - fileStream = null - ds.close() - file.delete() +// TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the +// future, we need some mechanism to ensure this gets called once the resources are not used. +private def cleanup(): Unit = { + if (!closed) { +closed = true +batchIndex = batchOffsets.length // Prevent reading any other batch +fileStream = null +try { + val ds = deserializeStreamToBeClosed + deserializeStreamToBeClosed = null + deserializeStream = null + if (ds != null) { +ds.close() + } +} finally { +
spark git commit: [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file
Repository: spark Updated Branches: refs/heads/branch-1.1 651ceaeb3 -> 36f3c499f [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file This PR adds a `finalize` method in DiskMapIterator to clean up the resources even if some exception happens during processing data. Author: zsxwing Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits: d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file (cherry picked from commit 90095bf3ce9304d09a32ceffaa99069079071b59) Signed-off-by: Ubuntu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36f3c499 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36f3c499 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36f3c499 Branch: refs/heads/branch-1.1 Commit: 36f3c499fd1ad53a68a084d6a16a2c68099e7049 Parents: 651ceae Author: zsxwing Authored: Thu Feb 19 18:37:31 2015 + Committer: Ubuntu Committed: Thu Feb 19 18:38:27 2015 + -- .../util/collection/ExternalAppendOnlyMap.scala | 52 1 file changed, 43 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/36f3c499/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 5619b30..abb1f11 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -434,6 +434,15 @@ class ExternalAppendOnlyMap[K, V, C]( private var batchIndex = 0 // Which batch we're in private var fileStream: FileInputStream = null +@volatile private var closed = false + +// A volatile variable to remember which DeserializationStream is using. Need to set it when we +// open a DeserializationStream. But we should use `deserializeStream` rather than +// `deserializeStreamToBeClosed` to read the content because touching a volatile variable will +// reduce the performance. It must be volatile so that we can see its correct value in the +// `finalize` method, which could run in any thread. +@volatile private var deserializeStreamToBeClosed: DeserializationStream = null + // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams private var deserializeStream = nextBatchStream() @@ -448,6 +457,7 @@ class ExternalAppendOnlyMap[K, V, C]( // we're still in a valid batch. if (batchIndex < batchOffsets.length - 1) { if (deserializeStream != null) { + deserializeStreamToBeClosed = null deserializeStream.close() fileStream.close() deserializeStream = null @@ -466,7 +476,11 @@ class ExternalAppendOnlyMap[K, V, C]( val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) -ser.deserializeStream(compressedStream) +// Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can +// close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed` +// during reading the (K, C) pairs. +deserializeStreamToBeClosed = ser.deserializeStream(compressedStream) +deserializeStreamToBeClosed } else { // No more batches left cleanup() @@ -515,14 +529,34 @@ class ExternalAppendOnlyMap[K, V, C]( item } -// TODO: Ensure this gets called even if the iterator isn't drained. -private def cleanup() { - batchIndex = batchOffsets.length // Prevent reading any other batch - val ds = deserializeStream - deserializeStream = null - fileStream = null - ds.close() - file.delete() +// TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the +// future, we need some mechanism to ensure this gets called once the resources are not used. +private def cleanup(): Unit = { + if (!closed) { +closed = true +batchIndex = batchOffsets.length // Prevent reading any other batch +fileStream = null +try { + val ds = deserializeStreamToBeClosed + deserializeStreamToBeClosed = null + deserializeStream = null + if (ds != null) { +ds.close() + } +} finally { +
spark git commit: [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file
Repository: spark Updated Branches: refs/heads/branch-1.3 f93d4d992 -> 25fae8e7e [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file This PR adds a `finalize` method in DiskMapIterator to clean up the resources even if some exception happens during processing data. Author: zsxwing Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits: d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file (cherry picked from commit 90095bf3ce9304d09a32ceffaa99069079071b59) Signed-off-by: Ubuntu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25fae8e7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25fae8e7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25fae8e7 Branch: refs/heads/branch-1.3 Commit: 25fae8e7e6c93b7817771342d370b73b40dcf92e Parents: f93d4d9 Author: zsxwing Authored: Thu Feb 19 18:37:31 2015 + Committer: Ubuntu Committed: Thu Feb 19 18:37:47 2015 + -- .../util/collection/ExternalAppendOnlyMap.scala | 52 1 file changed, 43 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/25fae8e7/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8a0f5a6..fc7e86e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C]( private var batchIndex = 0 // Which batch we're in private var fileStream: FileInputStream = null +@volatile private var closed = false + +// A volatile variable to remember which DeserializationStream is using. Need to set it when we +// open a DeserializationStream. But we should use `deserializeStream` rather than +// `deserializeStreamToBeClosed` to read the content because touching a volatile variable will +// reduce the performance. It must be volatile so that we can see its correct value in the +// `finalize` method, which could run in any thread. +@volatile private var deserializeStreamToBeClosed: DeserializationStream = null + // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams private var deserializeStream = nextBatchStream() @@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C]( // we're still in a valid batch. if (batchIndex < batchOffsets.length - 1) { if (deserializeStream != null) { + deserializeStreamToBeClosed = null deserializeStream.close() fileStream.close() deserializeStream = null @@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C]( val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) -ser.deserializeStream(compressedStream) +// Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can +// close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed` +// during reading the (K, C) pairs. +deserializeStreamToBeClosed = ser.deserializeStream(compressedStream) +deserializeStreamToBeClosed } else { // No more batches left cleanup() @@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C]( item } -// TODO: Ensure this gets called even if the iterator isn't drained. -private def cleanup() { - batchIndex = batchOffsets.length // Prevent reading any other batch - val ds = deserializeStream - deserializeStream = null - fileStream = null - ds.close() - file.delete() +// TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the +// future, we need some mechanism to ensure this gets called once the resources are not used. +private def cleanup(): Unit = { + if (!closed) { +closed = true +batchIndex = batchOffsets.length // Prevent reading any other batch +fileStream = null +try { + val ds = deserializeStreamToBeClosed + deserializeStreamToBeClosed = null + deserializeStream = null + if (ds != null) { +ds.close() + } +} finally { +
spark git commit: [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file
Repository: spark Updated Branches: refs/heads/master 38e624a73 -> 90095bf3c [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file This PR adds a `finalize` method in DiskMapIterator to clean up the resources even if some exception happens during processing data. Author: zsxwing Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits: d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90095bf3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90095bf3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90095bf3 Branch: refs/heads/master Commit: 90095bf3ce9304d09a32ceffaa99069079071b59 Parents: 38e624a Author: zsxwing Authored: Thu Feb 19 18:37:31 2015 + Committer: Ubuntu Committed: Thu Feb 19 18:37:31 2015 + -- .../util/collection/ExternalAppendOnlyMap.scala | 52 1 file changed, 43 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90095bf3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8a0f5a6..fc7e86e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C]( private var batchIndex = 0 // Which batch we're in private var fileStream: FileInputStream = null +@volatile private var closed = false + +// A volatile variable to remember which DeserializationStream is using. Need to set it when we +// open a DeserializationStream. But we should use `deserializeStream` rather than +// `deserializeStreamToBeClosed` to read the content because touching a volatile variable will +// reduce the performance. It must be volatile so that we can see its correct value in the +// `finalize` method, which could run in any thread. +@volatile private var deserializeStreamToBeClosed: DeserializationStream = null + // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams private var deserializeStream = nextBatchStream() @@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C]( // we're still in a valid batch. if (batchIndex < batchOffsets.length - 1) { if (deserializeStream != null) { + deserializeStreamToBeClosed = null deserializeStream.close() fileStream.close() deserializeStream = null @@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C]( val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) -ser.deserializeStream(compressedStream) +// Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can +// close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed` +// during reading the (K, C) pairs. +deserializeStreamToBeClosed = ser.deserializeStream(compressedStream) +deserializeStreamToBeClosed } else { // No more batches left cleanup() @@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C]( item } -// TODO: Ensure this gets called even if the iterator isn't drained. -private def cleanup() { - batchIndex = batchOffsets.length // Prevent reading any other batch - val ds = deserializeStream - deserializeStream = null - fileStream = null - ds.close() - file.delete() +// TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the +// future, we need some mechanism to ensure this gets called once the resources are not used. +private def cleanup(): Unit = { + if (!closed) { +closed = true +batchIndex = batchOffsets.length // Prevent reading any other batch +fileStream = null +try { + val ds = deserializeStreamToBeClosed + deserializeStreamToBeClosed = null + deserializeStream = null + if (ds != null) { +ds.close() + } +} finally { + if (file.exists()) { +file.delete() + } +} + } +} + +o
spark git commit: [SPARK-5816] Add huge compatibility warning in DriverWrapper
Repository: spark Updated Branches: refs/heads/master fb87f4492 -> 38e624a73 [SPARK-5816] Add huge compatibility warning in DriverWrapper The stability of the new submission gateway assumes that the arguments in `DriverWrapper` are consistent across multiple Spark versions. However, this is not at all clear from the code itself. In fact, this was broken in 20a6013106b56a1a1cc3e8cda092330ffbe77cc3, which is fortunately OK because both that commit and the original commit that added this gateway are part of the same release. To prevent this from happening again we should at the very least add a huge warning where appropriate. Author: Andrew Or Closes #4687 from andrewor14/driver-wrapper-warning and squashes the following commits: 7989b56 [Andrew Or] Add huge compatibility warning Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e624a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e624a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e624a7 Branch: refs/heads/master Commit: 38e624a732b18e01ad2e7a499ce0bb0d7acdcdf6 Parents: fb87f44 Author: Andrew Or Authored: Thu Feb 19 09:56:25 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 09:56:25 2015 -0800 -- .../scala/org/apache/spark/deploy/worker/DriverWrapper.scala | 7 +++ 1 file changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38e624a7/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index ab467a5..deef6ef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -26,10 +26,17 @@ import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLCla /** * Utility object for launching driver programs such that they share fate with the Worker process. + * This is used in standalone cluster mode only. */ object DriverWrapper { def main(args: Array[String]) { args.toList match { + /* + * IMPORTANT: Spark 1.3 provides a stable application submission gateway that is both + * backward and forward compatible across future Spark versions. Because this gateway + * uses this class to launch the driver, the ordering and semantics of the arguments + * here must also remain consistent across versions. + */ case workerUrl :: userJar :: mainClass :: extraArgs => val conf = new SparkConf() val (actorSystem, _) = AkkaUtils.createActorSystem("Driver", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5816] Add huge compatibility warning in DriverWrapper
Repository: spark Updated Branches: refs/heads/branch-1.3 fbcb949c5 -> f93d4d992 [SPARK-5816] Add huge compatibility warning in DriverWrapper The stability of the new submission gateway assumes that the arguments in `DriverWrapper` are consistent across multiple Spark versions. However, this is not at all clear from the code itself. In fact, this was broken in 20a6013106b56a1a1cc3e8cda092330ffbe77cc3, which is fortunately OK because both that commit and the original commit that added this gateway are part of the same release. To prevent this from happening again we should at the very least add a huge warning where appropriate. Author: Andrew Or Closes #4687 from andrewor14/driver-wrapper-warning and squashes the following commits: 7989b56 [Andrew Or] Add huge compatibility warning (cherry picked from commit 38e624a732b18e01ad2e7a499ce0bb0d7acdcdf6) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f93d4d99 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f93d4d99 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f93d4d99 Branch: refs/heads/branch-1.3 Commit: f93d4d99204e3d83e802e326c55416e1b7e2 Parents: fbcb949 Author: Andrew Or Authored: Thu Feb 19 09:56:25 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 09:56:37 2015 -0800 -- .../scala/org/apache/spark/deploy/worker/DriverWrapper.scala | 7 +++ 1 file changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f93d4d99/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index ab467a5..deef6ef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -26,10 +26,17 @@ import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLCla /** * Utility object for launching driver programs such that they share fate with the Worker process. + * This is used in standalone cluster mode only. */ object DriverWrapper { def main(args: Array[String]) { args.toList match { + /* + * IMPORTANT: Spark 1.3 provides a stable application submission gateway that is both + * backward and forward compatible across future Spark versions. Because this gateway + * uses this class to launch the driver, the ordering and semantics of the arguments + * here must also remain consistent across versions. + */ case workerUrl :: userJar :: mainClass :: extraArgs => val conf = new SparkConf() val (actorSystem, _) = AkkaUtils.createActorSystem("Driver", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-5548: Fix for AkkaUtilsSuite failure - attempt 2
Repository: spark Updated Branches: refs/heads/branch-1.3 092b45f69 -> fbcb949c5 SPARK-5548: Fix for AkkaUtilsSuite failure - attempt 2 Author: Jacek Lewandowski Closes #4653 from jacek-lewandowski/SPARK-5548-2-master and squashes the following commits: 0e199b6 [Jacek Lewandowski] SPARK-5548: applied reviewer's comments 843eafb [Jacek Lewandowski] SPARK-5548: Fix for AkkaUtilsSuite failure - attempt 2 (cherry picked from commit fb87f449219c673a16bc46f85c1ef7a6e3f22736) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbcb949c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbcb949c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbcb949c Branch: refs/heads/branch-1.3 Commit: fbcb949c545a1e17411403b23bb6bc20881fb26a Parents: 092b45f Author: Jacek Lewandowski Authored: Thu Feb 19 09:53:36 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 09:53:49 2015 -0800 -- .../test/scala/org/apache/spark/util/AkkaUtilsSuite.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fbcb949c/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index 2cc5817..6250d50 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import java.util.concurrent.TimeoutException import scala.concurrent.Await +import scala.util.{Failure, Try} import akka.actor._ @@ -370,8 +371,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val selection = slaveSystem.actorSelection( AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) -intercept[TimeoutException] { - slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout * 2), timeout) +val result = Try(Await.result(selection.resolveOne(timeout * 2), timeout)) + +result match { + case Failure(ex: ActorNotFound) => + case Failure(ex: TimeoutException) => + case r => fail(s"$r is neither Failure(ActorNotFound) nor Failure(TimeoutException)") } actorSystem.shutdown() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-5548: Fix for AkkaUtilsSuite failure - attempt 2
Repository: spark Updated Branches: refs/heads/master e945aa613 -> fb87f4492 SPARK-5548: Fix for AkkaUtilsSuite failure - attempt 2 Author: Jacek Lewandowski Closes #4653 from jacek-lewandowski/SPARK-5548-2-master and squashes the following commits: 0e199b6 [Jacek Lewandowski] SPARK-5548: applied reviewer's comments 843eafb [Jacek Lewandowski] SPARK-5548: Fix for AkkaUtilsSuite failure - attempt 2 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb87f449 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb87f449 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb87f449 Branch: refs/heads/master Commit: fb87f449219c673a16bc46f85c1ef7a6e3f22736 Parents: e945aa6 Author: Jacek Lewandowski Authored: Thu Feb 19 09:53:36 2015 -0800 Committer: Andrew Or Committed: Thu Feb 19 09:53:36 2015 -0800 -- .../test/scala/org/apache/spark/util/AkkaUtilsSuite.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb87f449/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index 2cc5817..6250d50 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import java.util.concurrent.TimeoutException import scala.concurrent.Await +import scala.util.{Failure, Try} import akka.actor._ @@ -370,8 +371,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val selection = slaveSystem.actorSelection( AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) -intercept[TimeoutException] { - slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout * 2), timeout) +val result = Try(Await.result(selection.resolveOne(timeout * 2), timeout)) + +result match { + case Failure(ex: ActorNotFound) => + case Failure(ex: TimeoutException) => + case r => fail(s"$r is neither Failure(ActorNotFound) nor Failure(TimeoutException)") } actorSystem.shutdown() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org