spark git commit: Revert [SPARK-4808] Removing minimum number of elements read before spill check

2015-02-22 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 eed7389cf - 4186dd3dd


Revert [SPARK-4808] Removing minimum number of elements read before spill 
check

This reverts commit 0382dcc0a94f8e619fd11ec2cc0b18459a690c2b.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4186dd3d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4186dd3d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4186dd3d

Branch: refs/heads/branch-1.3
Commit: 4186dd3dd074a41b3a1d6a4279b683fb355da092
Parents: eed7389
Author: Andrew Or and...@databricks.com
Authored: Sun Feb 22 09:44:52 2015 -0800
Committer: Andrew Or and...@databricks.com
Committed: Sun Feb 22 09:44:52 2015 -0800

--
 .../scala/org/apache/spark/util/collection/Spillable.scala | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4186dd3d/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 747ecf0..9f54312 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -42,6 +42,9 @@ private[spark] trait Spillable[C] extends Logging {
   // Memory manager that can be used to acquire/release memory
   private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
 
+  // Threshold for `elementsRead` before we start tracking this collection's 
memory usage
+  private[this] val trackMemoryThreshold = 1000
+
   // Initial threshold for the size of a collection before we start tracking 
its memory usage
   // Exposed for testing
   private[this] val initialMemoryThreshold: Long =
@@ -69,7 +72,8 @@ private[spark] trait Spillable[C] extends Logging {
* @return true if `collection` was spilled to disk; false otherwise
*/
   protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
-if (elementsRead % 32 == 0  currentMemory = myMemoryThreshold) {
+if (elementsRead  trackMemoryThreshold  elementsRead % 32 == 0 
+currentMemory = myMemoryThreshold) {
   // Claim up to double our current memory from the shuffle memory pool
   val amountToRequest = 2 * currentMemory - myMemoryThreshold
   val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-5669 [BUILD] Reverse exclusion of JBLAS libs for 1.3

2015-02-22 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 04d3b328f - eed7389cf


SPARK-5669 [BUILD] Reverse exclusion of JBLAS libs for 1.3

CC mengxr

Author: Sean Owen so...@cloudera.com

Closes #4715 from srowen/SPARK-5669.3 and squashes the following commits:

b27ffa9 [Sean Owen] Reverse exclusion of JBLAS libs for 1.3


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eed7389c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eed7389c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eed7389c

Branch: refs/heads/branch-1.3
Commit: eed7389cf80d0930da16f77d6ccb39a82fe976c2
Parents: 04d3b32
Author: Sean Owen so...@cloudera.com
Authored: Sun Feb 22 09:09:06 2015 +
Committer: Sean Owen so...@cloudera.com
Committed: Sun Feb 22 09:09:06 2015 +

--
 assembly/pom.xml | 10 --
 1 file changed, 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eed7389c/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 8a92c8f..87b3e6f 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -118,16 +118,6 @@
 excludeMETA-INF/*.RSA/exclude
   /excludes
 /filter
-filter
-  !-- Exclude libgfortran, libgcc for license issues --
-  artifactorg.jblas:jblas/artifact
-  excludes
-!-- Linux amd64 is OK; not statically linked --
-excludelib/static/Linux/i386/**/exclude
-excludelib/static/Mac OS X/**/exclude
-excludelib/static/Windows/**/exclude
-  /excludes
-/filter
   /filters
 /configuration
 executions


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [DOCS] Fix typo in API for custom InputFormats based on the “new” MapReduce API

2015-02-22 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 76e3e6527 - c5a5c6f61


[DOCS] Fix typo in API for custom InputFormats based on the “new” MapReduce 
API

This looks like a simple typo ```SparkContext.newHadoopRDD``` instead of 
```SparkContext.newAPIHadoopRDD``` as in actual 
http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.SparkContext

Author: Alexander abezzu...@nflabs.com

Closes #4718 from bzz/hadoop-InputFormats-doc-fix and squashes the following 
commits:

680a4c4 [Alexander] Fix typo in docs on custom Hadoop InputFormats

(cherry picked from commit a7f90390251ff62a0e10edf4c2eb876538597791)
Signed-off-by: Sean Owen so...@cloudera.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5a5c6f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5a5c6f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5a5c6f6

Branch: refs/heads/branch-1.3
Commit: c5a5c6f618b89d712c13a236388fa67c136691ee
Parents: 76e3e65
Author: Alexander abezzu...@nflabs.com
Authored: Sun Feb 22 08:53:05 2015 +
Committer: Sean Owen so...@cloudera.com
Committed: Sun Feb 22 08:53:14 2015 +

--
 docs/programming-guide.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c5a5c6f6/docs/programming-guide.md
--
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 4e4af76..7b07018 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -335,7 +335,7 @@ Apart from text files, Spark's Scala API also supports 
several other data format
 
 * For 
[SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html),
 use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types 
of key and values in the file. These should be subclasses of Hadoop's 
[Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html)
 interface, like 
[IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html)
 and 
[Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html).
 In addition, Spark allows you to specify native types for a few common 
Writables; for example, `sequenceFile[Int, String]` will automatically read 
IntWritables and Texts.
 
-* For other Hadoop InputFormats, you can use the `SparkContext.hadoopRDD` 
method, which takes an arbitrary `JobConf` and input format class, key class 
and value class. Set these the same way you would for a Hadoop job with your 
input source. You can also use `SparkContext.newHadoopRDD` for InputFormats 
based on the new MapReduce API (`org.apache.hadoop.mapreduce`).
+* For other Hadoop InputFormats, you can use the `SparkContext.hadoopRDD` 
method, which takes an arbitrary `JobConf` and input format class, key class 
and value class. Set these the same way you would for a Hadoop job with your 
input source. You can also use `SparkContext.newAPIHadoopRDD` for InputFormats 
based on the new MapReduce API (`org.apache.hadoop.mapreduce`).
 
 * `RDD.saveAsObjectFile` and `SparkContext.objectFile` support saving an RDD 
in a simple format consisting of serialized Java objects. While this is not as 
efficient as specialized formats like Avro, it offers an easy way to save any 
RDD.
 
@@ -367,7 +367,7 @@ Apart from text files, Spark's Java API also supports 
several other data formats
 
 * For 
[SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html),
 use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types 
of key and values in the file. These should be subclasses of Hadoop's 
[Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html)
 interface, like 
[IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html)
 and 
[Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html).
 
-* For other Hadoop InputFormats, you can use the `JavaSparkContext.hadoopRDD` 
method, which takes an arbitrary `JobConf` and input format class, key class 
and value class. Set these the same way you would for a Hadoop job with your 
input source. You can also use `JavaSparkContext.newHadoopRDD` for InputFormats 
based on the new MapReduce API (`org.apache.hadoop.mapreduce`).
+* For other Hadoop InputFormats, you can use the `JavaSparkContext.hadoopRDD` 
method, which takes an arbitrary `JobConf` and input format class, key class 
and value class. Set these the same way you would for a Hadoop job with your 
input source. You can also use 

spark git commit: [DOCS] Fix typo in API for custom InputFormats based on the “new” MapReduce API

2015-02-22 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 46462ff25 - a7f903902


[DOCS] Fix typo in API for custom InputFormats based on the “new” MapReduce 
API

This looks like a simple typo ```SparkContext.newHadoopRDD``` instead of 
```SparkContext.newAPIHadoopRDD``` as in actual 
http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.SparkContext

Author: Alexander abezzu...@nflabs.com

Closes #4718 from bzz/hadoop-InputFormats-doc-fix and squashes the following 
commits:

680a4c4 [Alexander] Fix typo in docs on custom Hadoop InputFormats


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7f90390
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7f90390
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7f90390

Branch: refs/heads/master
Commit: a7f90390251ff62a0e10edf4c2eb876538597791
Parents: 46462ff
Author: Alexander abezzu...@nflabs.com
Authored: Sun Feb 22 08:53:05 2015 +
Committer: Sean Owen so...@cloudera.com
Committed: Sun Feb 22 08:53:05 2015 +

--
 docs/programming-guide.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a7f90390/docs/programming-guide.md
--
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 4e4af76..7b07018 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -335,7 +335,7 @@ Apart from text files, Spark's Scala API also supports 
several other data format
 
 * For 
[SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html),
 use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types 
of key and values in the file. These should be subclasses of Hadoop's 
[Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html)
 interface, like 
[IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html)
 and 
[Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html).
 In addition, Spark allows you to specify native types for a few common 
Writables; for example, `sequenceFile[Int, String]` will automatically read 
IntWritables and Texts.
 
-* For other Hadoop InputFormats, you can use the `SparkContext.hadoopRDD` 
method, which takes an arbitrary `JobConf` and input format class, key class 
and value class. Set these the same way you would for a Hadoop job with your 
input source. You can also use `SparkContext.newHadoopRDD` for InputFormats 
based on the new MapReduce API (`org.apache.hadoop.mapreduce`).
+* For other Hadoop InputFormats, you can use the `SparkContext.hadoopRDD` 
method, which takes an arbitrary `JobConf` and input format class, key class 
and value class. Set these the same way you would for a Hadoop job with your 
input source. You can also use `SparkContext.newAPIHadoopRDD` for InputFormats 
based on the new MapReduce API (`org.apache.hadoop.mapreduce`).
 
 * `RDD.saveAsObjectFile` and `SparkContext.objectFile` support saving an RDD 
in a simple format consisting of serialized Java objects. While this is not as 
efficient as specialized formats like Avro, it offers an easy way to save any 
RDD.
 
@@ -367,7 +367,7 @@ Apart from text files, Spark's Java API also supports 
several other data formats
 
 * For 
[SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html),
 use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types 
of key and values in the file. These should be subclasses of Hadoop's 
[Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html)
 interface, like 
[IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html)
 and 
[Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html).
 
-* For other Hadoop InputFormats, you can use the `JavaSparkContext.hadoopRDD` 
method, which takes an arbitrary `JobConf` and input format class, key class 
and value class. Set these the same way you would for a Hadoop job with your 
input source. You can also use `JavaSparkContext.newHadoopRDD` for InputFormats 
based on the new MapReduce API (`org.apache.hadoop.mapreduce`).
+* For other Hadoop InputFormats, you can use the `JavaSparkContext.hadoopRDD` 
method, which takes an arbitrary `JobConf` and input format class, key class 
and value class. Set these the same way you would for a Hadoop job with your 
input source. You can also use `JavaSparkContext.newAPIHadoopRDD` for 
InputFormats based on the new MapReduce API (`org.apache.hadoop.mapreduce`).
 
 * `JavaRDD.saveAsObjectFile` 

spark git commit: [DataFrame] [Typo] Fix the typo

2015-02-22 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master a7f903902 - 275b1bef8


[DataFrame] [Typo] Fix the typo

Author: Cheng Hao hao.ch...@intel.com

Closes #4717 from chenghao-intel/typo1 and squashes the following commits:

858d7b0 [Cheng Hao] update the typo


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/275b1bef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/275b1bef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/275b1bef

Branch: refs/heads/master
Commit: 275b1bef897d775f1f7743378ca3e09e36160136
Parents: a7f9039
Author: Cheng Hao hao.ch...@intel.com
Authored: Sun Feb 22 08:56:30 2015 +
Committer: Sean Owen so...@cloudera.com
Committed: Sun Feb 22 08:56:30 2015 +

--
 sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/275b1bef/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 810f7c7..69e5f6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -138,7 +138,7 @@ class DataFrame protected[sql](
 
   /**
* An implicit conversion function internal to this class for us to avoid 
doing
-   * new DataFrameImpl(...) everywhere.
+   * new DataFrame(...) everywhere.
*/
   @inline private implicit def logicalPlanToDataFrame(logicalPlan: 
LogicalPlan): DataFrame = {
 new DataFrame(sqlContext, logicalPlan)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-3885] Provide mechanism to remove accumulators once they are no longer used

2015-02-22 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master e4f9d03d7 - 95cd643aa


[SPARK-3885] Provide mechanism to remove accumulators once they are no longer 
used

Instead of storing a strong reference to accumulators, I've replaced this with 
a weak reference and updated any code that uses these accumulators to check 
whether the reference resolves before using the accumulator. A weak reference 
will be cleared when there is no longer an existing copy of the variable versus 
using a soft reference in which case accumulators would only be cleared when 
the GC explicitly ran out of memory.

Author: Ilya Ganelin ilya.gane...@capitalone.com

Closes #4021 from ilganeli/SPARK-3885 and squashes the following commits:

4ba9575 [Ilya Ganelin]  Fixed error in test suite
8510943 [Ilya Ganelin] Extra code
bb76ef0 [Ilya Ganelin] File deleted somehow
283a333 [Ilya Ganelin] Added cleanup method for accumulators to remove stale 
references within Accumulators.original to accumulators that are now out of 
scope
345fd4f [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into 
SPARK-3885
7485a82 [Ilya Ganelin] Fixed build error
c8e0f2b [Ilya Ganelin] Added working test for accumulator garbage collection
94ce754 [Ilya Ganelin] Still not being properly garbage collected
8722b63 [Ilya Ganelin] Fixing gc test
7414a9c [Ilya Ganelin] Added test for accumulator garbage collection
18d62ec [Ilya Ganelin] Updated to throw Exception when accessing a GCd 
accumulator
9a81928 [Ilya Ganelin] Reverting permissions changes
28f705c [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into 
SPARK-3885
b820ab4b [Ilya Ganelin] reset
d78f4bf [Ilya Ganelin] Removed obsolete comment
0746e61 [Ilya Ganelin] Updated DAGSchedulerSUite to fix bug
3350852 [Ilya Ganelin] Updated DAGScheduler and Suite to correctly use new 
implementation of WeakRef Accumulator storage
c49066a [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into 
SPARK-3885
cbb9023 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into 
SPARK-3885
a77d11b [Ilya Ganelin] Updated Accumulators class to store weak references 
instead of strong references to allow garbage collection of old accumulators


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95cd643a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95cd643a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95cd643a

Branch: refs/heads/master
Commit: 95cd643aa954b7e4229e94fa8bdc99bf3b2bb1da
Parents: e4f9d03
Author: Ilya Ganelin ilya.gane...@capitalone.com
Authored: Sun Feb 22 22:43:04 2015 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Sun Feb 22 22:57:26 2015 -0800

--
 .../scala/org/apache/spark/Accumulators.scala   | 36 +++-
 .../scala/org/apache/spark/ContextCleaner.scala | 20 +++
 .../scala/org/apache/spark/SparkContext.scala   | 28 +++
 .../apache/spark/scheduler/DAGScheduler.scala   | 10 +-
 .../org/apache/spark/AccumulatorSuite.scala | 20 +++
 .../org/apache/spark/ContextCleanerSuite.scala  |  4 +++
 .../spark/scheduler/DAGSchedulerSuite.scala |  6 +++-
 7 files changed, 107 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/95cd643a/core/src/main/scala/org/apache/spark/Accumulators.scala
--
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala 
b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 5f31bfb..30f0ccd 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -23,6 +23,7 @@ import java.lang.ThreadLocal
 
 import scala.collection.generic.Growable
 import scala.collection.mutable.Map
+import scala.ref.WeakReference
 import scala.reflect.ClassTag
 
 import org.apache.spark.serializer.JavaSerializer
@@ -280,10 +281,12 @@ object AccumulatorParam {
 // TODO: The multi-thread support in accumulators is kind of lame; check
 // if there's a more intuitive way of doing it right
 private[spark] object Accumulators {
-  // TODO: Use soft references? = need to make readObject work properly then
-  val originals = Map[Long, Accumulable[_, _]]()
-  val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
-override protected def initialValue() = Map[Long, Accumulable[_, _]]()
+  // Store a WeakReference instead of a StrongReference because this way 
accumulators can be
+  // appropriately garbage collected during long-running jobs and release 
memory
+  type WeakAcc = WeakReference[Accumulable[_, _]]
+  val originals = Map[Long, WeakAcc]()
+  val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() {
+override protected def initialValue() = Map[Long, WeakAcc]()
   }
   var 

spark git commit: [SPARK-911] allow efficient queries for a range if RDD is partitioned wi...

2015-02-22 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 275b1bef8 - e4f9d03d7


[SPARK-911] allow efficient queries for a range if RDD is partitioned wi...

...th RangePartitioner

Author: Aaron Josephs ajose...@binghamton.edu

Closes #1381 from aaronjosephs/PLAT-911 and squashes the following commits:

e30ade5 [Aaron Josephs] [SPARK-911] allow efficient queries for a range if RDD 
is partitioned with RangePartitioner


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4f9d03d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4f9d03d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4f9d03d

Branch: refs/heads/master
Commit: e4f9d03d728bc6fbfb6ebc7d15b4ba328f98f3dc
Parents: 275b1be
Author: Aaron Josephs ajose...@binghamton.edu
Authored: Sun Feb 22 22:09:06 2015 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Sun Feb 22 22:09:06 2015 -0800

--
 .../apache/spark/rdd/OrderedRDDFunctions.scala  | 23 
 .../org/apache/spark/rdd/SortingSuite.scala | 28 
 2 files changed, 51 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e4f9d03d/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 144f679..6fdfdb7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -75,4 +75,27 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
 new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
   }
 
+  /**
+   * Returns an RDD containing only the elements in the the inclusive range 
`lower` to `upper`.
+   * If the RDD has been partitioned using a `RangePartitioner`, then this 
operation can be
+   * performed efficiently by only scanning the partitions that might contain 
matching elements.
+   * Otherwise, a standard `filter` is applied to all partitions.
+   */
+  def filterByRange(lower: K, upper: K): RDD[P] = {
+
+def inRange(k: K): Boolean = ordering.gteq(k, lower)  ordering.lteq(k, 
upper)
+
+val rddToFilter: RDD[P] = self.partitioner match {
+  case Some(rp: RangePartitioner[K, V]) = {
+val partitionIndicies = (rp.getPartition(lower), 
rp.getPartition(upper)) match {
+  case (l, u) = Math.min(l, u) to Math.max(l, u)
+}
+PartitionPruningRDD.create(self, partitionIndicies.contains)
+  }
+  case _ =
+self
+}
+rddToFilter.filter { case (k, v) = inRange(k) }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e4f9d03d/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala
index a40f2ff..64b1c24 100644
--- a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala
@@ -119,5 +119,33 @@ class SortingSuite extends FunSuite with 
SharedSparkContext with Matchers with L
 partitions(1).last should be  partitions(2).head
 partitions(2).last should be  partitions(3).head
   }
+
+  test(get a range of elements in a sorted RDD that is on one partition) {
+val pairArr = (1 to 1000).map(x = (x, x)).toArray
+val sorted = sc.parallelize(pairArr, 10).sortByKey()
+val range = sorted.filterByRange(20, 40).collect()
+assert((20 to 40).toArray === range.map(_._1))
+  }
+
+  test(get a range of elements over multiple partitions in a descendingly 
sorted RDD) {
+val pairArr = (1000 to 1 by -1).map(x = (x, x)).toArray
+val sorted = sc.parallelize(pairArr, 10).sortByKey(false)
+val range = sorted.filterByRange(200, 800).collect()
+assert((800 to 200 by -1).toArray === range.map(_._1))
+  }
+
+  test(get a range of elements in an array not partitioned by a range 
partitioner) {
+val pairArr = util.Random.shuffle((1 to 1000).toList).map(x = (x, x))
+val pairs = sc.parallelize(pairArr,10)
+val range = pairs.filterByRange(200, 800).collect()
+assert((800 to 200 by -1).toArray.sorted === range.map(_._1).sorted)
+  }
+
+  test(get a range of elements over multiple partitions but not taking up 
full partitions) {
+val pairArr = (1000 to 1 by -1).map(x = (x, x)).toArray
+val sorted = sc.parallelize(pairArr, 10).sortByKey(false)
+val range = sorted.filterByRange(250, 850).collect()
+assert((850 to 250 by -1).toArray === range.map(_._1))
+  }
 }