spark git commit: Revert [SPARK-4808] Removing minimum number of elements read before spill check
Repository: spark Updated Branches: refs/heads/branch-1.3 eed7389cf - 4186dd3dd Revert [SPARK-4808] Removing minimum number of elements read before spill check This reverts commit 0382dcc0a94f8e619fd11ec2cc0b18459a690c2b. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4186dd3d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4186dd3d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4186dd3d Branch: refs/heads/branch-1.3 Commit: 4186dd3dd074a41b3a1d6a4279b683fb355da092 Parents: eed7389 Author: Andrew Or and...@databricks.com Authored: Sun Feb 22 09:44:52 2015 -0800 Committer: Andrew Or and...@databricks.com Committed: Sun Feb 22 09:44:52 2015 -0800 -- .../scala/org/apache/spark/util/collection/Spillable.scala | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4186dd3d/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 747ecf0..9f54312 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -42,6 +42,9 @@ private[spark] trait Spillable[C] extends Logging { // Memory manager that can be used to acquire/release memory private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager + // Threshold for `elementsRead` before we start tracking this collection's memory usage + private[this] val trackMemoryThreshold = 1000 + // Initial threshold for the size of a collection before we start tracking its memory usage // Exposed for testing private[this] val initialMemoryThreshold: Long = @@ -69,7 +72,8 @@ private[spark] trait Spillable[C] extends Logging { * @return true if `collection` was spilled to disk; false otherwise */ protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { -if (elementsRead % 32 == 0 currentMemory = myMemoryThreshold) { +if (elementsRead trackMemoryThreshold elementsRead % 32 == 0 +currentMemory = myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = shuffleMemoryManager.tryToAcquire(amountToRequest) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-5669 [BUILD] Reverse exclusion of JBLAS libs for 1.3
Repository: spark Updated Branches: refs/heads/branch-1.3 04d3b328f - eed7389cf SPARK-5669 [BUILD] Reverse exclusion of JBLAS libs for 1.3 CC mengxr Author: Sean Owen so...@cloudera.com Closes #4715 from srowen/SPARK-5669.3 and squashes the following commits: b27ffa9 [Sean Owen] Reverse exclusion of JBLAS libs for 1.3 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eed7389c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eed7389c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eed7389c Branch: refs/heads/branch-1.3 Commit: eed7389cf80d0930da16f77d6ccb39a82fe976c2 Parents: 04d3b32 Author: Sean Owen so...@cloudera.com Authored: Sun Feb 22 09:09:06 2015 + Committer: Sean Owen so...@cloudera.com Committed: Sun Feb 22 09:09:06 2015 + -- assembly/pom.xml | 10 -- 1 file changed, 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eed7389c/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 8a92c8f..87b3e6f 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -118,16 +118,6 @@ excludeMETA-INF/*.RSA/exclude /excludes /filter -filter - !-- Exclude libgfortran, libgcc for license issues -- - artifactorg.jblas:jblas/artifact - excludes -!-- Linux amd64 is OK; not statically linked -- -excludelib/static/Linux/i386/**/exclude -excludelib/static/Mac OS X/**/exclude -excludelib/static/Windows/**/exclude - /excludes -/filter /filters /configuration executions - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCS] Fix typo in API for custom InputFormats based on the “new” MapReduce API
Repository: spark Updated Branches: refs/heads/branch-1.3 76e3e6527 - c5a5c6f61 [DOCS] Fix typo in API for custom InputFormats based on the ânewâ MapReduce API This looks like a simple typo ```SparkContext.newHadoopRDD``` instead of ```SparkContext.newAPIHadoopRDD``` as in actual http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.SparkContext Author: Alexander abezzu...@nflabs.com Closes #4718 from bzz/hadoop-InputFormats-doc-fix and squashes the following commits: 680a4c4 [Alexander] Fix typo in docs on custom Hadoop InputFormats (cherry picked from commit a7f90390251ff62a0e10edf4c2eb876538597791) Signed-off-by: Sean Owen so...@cloudera.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5a5c6f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5a5c6f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5a5c6f6 Branch: refs/heads/branch-1.3 Commit: c5a5c6f618b89d712c13a236388fa67c136691ee Parents: 76e3e65 Author: Alexander abezzu...@nflabs.com Authored: Sun Feb 22 08:53:05 2015 + Committer: Sean Owen so...@cloudera.com Committed: Sun Feb 22 08:53:14 2015 + -- docs/programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c5a5c6f6/docs/programming-guide.md -- diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 4e4af76..7b07018 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -335,7 +335,7 @@ Apart from text files, Spark's Scala API also supports several other data format * For [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types of key and values in the file. These should be subclasses of Hadoop's [Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html) interface, like [IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html) and [Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html). In addition, Spark allows you to specify native types for a few common Writables; for example, `sequenceFile[Int, String]` will automatically read IntWritables and Texts. -* For other Hadoop InputFormats, you can use the `SparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use `SparkContext.newHadoopRDD` for InputFormats based on the new MapReduce API (`org.apache.hadoop.mapreduce`). +* For other Hadoop InputFormats, you can use the `SparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use `SparkContext.newAPIHadoopRDD` for InputFormats based on the new MapReduce API (`org.apache.hadoop.mapreduce`). * `RDD.saveAsObjectFile` and `SparkContext.objectFile` support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD. @@ -367,7 +367,7 @@ Apart from text files, Spark's Java API also supports several other data formats * For [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types of key and values in the file. These should be subclasses of Hadoop's [Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html) interface, like [IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html) and [Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html). -* For other Hadoop InputFormats, you can use the `JavaSparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use `JavaSparkContext.newHadoopRDD` for InputFormats based on the new MapReduce API (`org.apache.hadoop.mapreduce`). +* For other Hadoop InputFormats, you can use the `JavaSparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use
spark git commit: [DOCS] Fix typo in API for custom InputFormats based on the “new” MapReduce API
Repository: spark Updated Branches: refs/heads/master 46462ff25 - a7f903902 [DOCS] Fix typo in API for custom InputFormats based on the ânewâ MapReduce API This looks like a simple typo ```SparkContext.newHadoopRDD``` instead of ```SparkContext.newAPIHadoopRDD``` as in actual http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.SparkContext Author: Alexander abezzu...@nflabs.com Closes #4718 from bzz/hadoop-InputFormats-doc-fix and squashes the following commits: 680a4c4 [Alexander] Fix typo in docs on custom Hadoop InputFormats Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7f90390 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7f90390 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7f90390 Branch: refs/heads/master Commit: a7f90390251ff62a0e10edf4c2eb876538597791 Parents: 46462ff Author: Alexander abezzu...@nflabs.com Authored: Sun Feb 22 08:53:05 2015 + Committer: Sean Owen so...@cloudera.com Committed: Sun Feb 22 08:53:05 2015 + -- docs/programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a7f90390/docs/programming-guide.md -- diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 4e4af76..7b07018 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -335,7 +335,7 @@ Apart from text files, Spark's Scala API also supports several other data format * For [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types of key and values in the file. These should be subclasses of Hadoop's [Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html) interface, like [IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html) and [Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html). In addition, Spark allows you to specify native types for a few common Writables; for example, `sequenceFile[Int, String]` will automatically read IntWritables and Texts. -* For other Hadoop InputFormats, you can use the `SparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use `SparkContext.newHadoopRDD` for InputFormats based on the new MapReduce API (`org.apache.hadoop.mapreduce`). +* For other Hadoop InputFormats, you can use the `SparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use `SparkContext.newAPIHadoopRDD` for InputFormats based on the new MapReduce API (`org.apache.hadoop.mapreduce`). * `RDD.saveAsObjectFile` and `SparkContext.objectFile` support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD. @@ -367,7 +367,7 @@ Apart from text files, Spark's Java API also supports several other data formats * For [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types of key and values in the file. These should be subclasses of Hadoop's [Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html) interface, like [IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html) and [Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html). -* For other Hadoop InputFormats, you can use the `JavaSparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use `JavaSparkContext.newHadoopRDD` for InputFormats based on the new MapReduce API (`org.apache.hadoop.mapreduce`). +* For other Hadoop InputFormats, you can use the `JavaSparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use `JavaSparkContext.newAPIHadoopRDD` for InputFormats based on the new MapReduce API (`org.apache.hadoop.mapreduce`). * `JavaRDD.saveAsObjectFile`
spark git commit: [DataFrame] [Typo] Fix the typo
Repository: spark Updated Branches: refs/heads/master a7f903902 - 275b1bef8 [DataFrame] [Typo] Fix the typo Author: Cheng Hao hao.ch...@intel.com Closes #4717 from chenghao-intel/typo1 and squashes the following commits: 858d7b0 [Cheng Hao] update the typo Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/275b1bef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/275b1bef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/275b1bef Branch: refs/heads/master Commit: 275b1bef897d775f1f7743378ca3e09e36160136 Parents: a7f9039 Author: Cheng Hao hao.ch...@intel.com Authored: Sun Feb 22 08:56:30 2015 + Committer: Sean Owen so...@cloudera.com Committed: Sun Feb 22 08:56:30 2015 + -- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/275b1bef/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 810f7c7..69e5f6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -138,7 +138,7 @@ class DataFrame protected[sql]( /** * An implicit conversion function internal to this class for us to avoid doing - * new DataFrameImpl(...) everywhere. + * new DataFrame(...) everywhere. */ @inline private implicit def logicalPlanToDataFrame(logicalPlan: LogicalPlan): DataFrame = { new DataFrame(sqlContext, logicalPlan) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-3885] Provide mechanism to remove accumulators once they are no longer used
Repository: spark Updated Branches: refs/heads/master e4f9d03d7 - 95cd643aa [SPARK-3885] Provide mechanism to remove accumulators once they are no longer used Instead of storing a strong reference to accumulators, I've replaced this with a weak reference and updated any code that uses these accumulators to check whether the reference resolves before using the accumulator. A weak reference will be cleared when there is no longer an existing copy of the variable versus using a soft reference in which case accumulators would only be cleared when the GC explicitly ran out of memory. Author: Ilya Ganelin ilya.gane...@capitalone.com Closes #4021 from ilganeli/SPARK-3885 and squashes the following commits: 4ba9575 [Ilya Ganelin] Fixed error in test suite 8510943 [Ilya Ganelin] Extra code bb76ef0 [Ilya Ganelin] File deleted somehow 283a333 [Ilya Ganelin] Added cleanup method for accumulators to remove stale references within Accumulators.original to accumulators that are now out of scope 345fd4f [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-3885 7485a82 [Ilya Ganelin] Fixed build error c8e0f2b [Ilya Ganelin] Added working test for accumulator garbage collection 94ce754 [Ilya Ganelin] Still not being properly garbage collected 8722b63 [Ilya Ganelin] Fixing gc test 7414a9c [Ilya Ganelin] Added test for accumulator garbage collection 18d62ec [Ilya Ganelin] Updated to throw Exception when accessing a GCd accumulator 9a81928 [Ilya Ganelin] Reverting permissions changes 28f705c [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-3885 b820ab4b [Ilya Ganelin] reset d78f4bf [Ilya Ganelin] Removed obsolete comment 0746e61 [Ilya Ganelin] Updated DAGSchedulerSUite to fix bug 3350852 [Ilya Ganelin] Updated DAGScheduler and Suite to correctly use new implementation of WeakRef Accumulator storage c49066a [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-3885 cbb9023 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-3885 a77d11b [Ilya Ganelin] Updated Accumulators class to store weak references instead of strong references to allow garbage collection of old accumulators Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95cd643a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95cd643a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95cd643a Branch: refs/heads/master Commit: 95cd643aa954b7e4229e94fa8bdc99bf3b2bb1da Parents: e4f9d03 Author: Ilya Ganelin ilya.gane...@capitalone.com Authored: Sun Feb 22 22:43:04 2015 -0800 Committer: Josh Rosen joshro...@databricks.com Committed: Sun Feb 22 22:57:26 2015 -0800 -- .../scala/org/apache/spark/Accumulators.scala | 36 +++- .../scala/org/apache/spark/ContextCleaner.scala | 20 +++ .../scala/org/apache/spark/SparkContext.scala | 28 +++ .../apache/spark/scheduler/DAGScheduler.scala | 10 +- .../org/apache/spark/AccumulatorSuite.scala | 20 +++ .../org/apache/spark/ContextCleanerSuite.scala | 4 +++ .../spark/scheduler/DAGSchedulerSuite.scala | 6 +++- 7 files changed, 107 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/95cd643a/core/src/main/scala/org/apache/spark/Accumulators.scala -- diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 5f31bfb..30f0ccd 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -23,6 +23,7 @@ import java.lang.ThreadLocal import scala.collection.generic.Growable import scala.collection.mutable.Map +import scala.ref.WeakReference import scala.reflect.ClassTag import org.apache.spark.serializer.JavaSerializer @@ -280,10 +281,12 @@ object AccumulatorParam { // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right private[spark] object Accumulators { - // TODO: Use soft references? = need to make readObject work properly then - val originals = Map[Long, Accumulable[_, _]]() - val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { -override protected def initialValue() = Map[Long, Accumulable[_, _]]() + // Store a WeakReference instead of a StrongReference because this way accumulators can be + // appropriately garbage collected during long-running jobs and release memory + type WeakAcc = WeakReference[Accumulable[_, _]] + val originals = Map[Long, WeakAcc]() + val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() { +override protected def initialValue() = Map[Long, WeakAcc]() } var
spark git commit: [SPARK-911] allow efficient queries for a range if RDD is partitioned wi...
Repository: spark Updated Branches: refs/heads/master 275b1bef8 - e4f9d03d7 [SPARK-911] allow efficient queries for a range if RDD is partitioned wi... ...th RangePartitioner Author: Aaron Josephs ajose...@binghamton.edu Closes #1381 from aaronjosephs/PLAT-911 and squashes the following commits: e30ade5 [Aaron Josephs] [SPARK-911] allow efficient queries for a range if RDD is partitioned with RangePartitioner Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4f9d03d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4f9d03d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4f9d03d Branch: refs/heads/master Commit: e4f9d03d728bc6fbfb6ebc7d15b4ba328f98f3dc Parents: 275b1be Author: Aaron Josephs ajose...@binghamton.edu Authored: Sun Feb 22 22:09:06 2015 -0800 Committer: Josh Rosen joshro...@databricks.com Committed: Sun Feb 22 22:09:06 2015 -0800 -- .../apache/spark/rdd/OrderedRDDFunctions.scala | 23 .../org/apache/spark/rdd/SortingSuite.scala | 28 2 files changed, 51 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e4f9d03d/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index 144f679..6fdfdb7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -75,4 +75,27 @@ class OrderedRDDFunctions[K : Ordering : ClassTag, new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering) } + /** + * Returns an RDD containing only the elements in the the inclusive range `lower` to `upper`. + * If the RDD has been partitioned using a `RangePartitioner`, then this operation can be + * performed efficiently by only scanning the partitions that might contain matching elements. + * Otherwise, a standard `filter` is applied to all partitions. + */ + def filterByRange(lower: K, upper: K): RDD[P] = { + +def inRange(k: K): Boolean = ordering.gteq(k, lower) ordering.lteq(k, upper) + +val rddToFilter: RDD[P] = self.partitioner match { + case Some(rp: RangePartitioner[K, V]) = { +val partitionIndicies = (rp.getPartition(lower), rp.getPartition(upper)) match { + case (l, u) = Math.min(l, u) to Math.max(l, u) +} +PartitionPruningRDD.create(self, partitionIndicies.contains) + } + case _ = +self +} +rddToFilter.filter { case (k, v) = inRange(k) } + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/e4f9d03d/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala index a40f2ff..64b1c24 100644 --- a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala @@ -119,5 +119,33 @@ class SortingSuite extends FunSuite with SharedSparkContext with Matchers with L partitions(1).last should be partitions(2).head partitions(2).last should be partitions(3).head } + + test(get a range of elements in a sorted RDD that is on one partition) { +val pairArr = (1 to 1000).map(x = (x, x)).toArray +val sorted = sc.parallelize(pairArr, 10).sortByKey() +val range = sorted.filterByRange(20, 40).collect() +assert((20 to 40).toArray === range.map(_._1)) + } + + test(get a range of elements over multiple partitions in a descendingly sorted RDD) { +val pairArr = (1000 to 1 by -1).map(x = (x, x)).toArray +val sorted = sc.parallelize(pairArr, 10).sortByKey(false) +val range = sorted.filterByRange(200, 800).collect() +assert((800 to 200 by -1).toArray === range.map(_._1)) + } + + test(get a range of elements in an array not partitioned by a range partitioner) { +val pairArr = util.Random.shuffle((1 to 1000).toList).map(x = (x, x)) +val pairs = sc.parallelize(pairArr,10) +val range = pairs.filterByRange(200, 800).collect() +assert((800 to 200 by -1).toArray.sorted === range.map(_._1).sorted) + } + + test(get a range of elements over multiple partitions but not taking up full partitions) { +val pairArr = (1000 to 1 by -1).map(x = (x, x)).toArray +val sorted = sc.parallelize(pairArr, 10).sortByKey(false) +val range = sorted.filterByRange(250, 850).collect() +assert((850 to 250 by -1).toArray === range.map(_._1)) + } }