spark git commit: [SPARK-7316][MLLIB] RDD sliding window with step
Repository: spark Updated Branches: refs/heads/branch-1.6 80641c4fa -> f0180106a [SPARK-7316][MLLIB] RDD sliding window with step Implementation of step capability for sliding window function in MLlib's RDD. Though one can use current sliding window with step 1 and then filter every Nth window, it will take more time and space (N*data.count times more than needed). For example, below are the results for various windows and steps on 10M data points: Window | Step | Time | Windows produced | - | -- | -- 128 | 1 | 6.38 | 873 128 | 10 | 0.9 | 88 128 | 100 | 0.41 | 9 1024 | 1 | 44.67 | 9998977 1024 | 10 | 4.74 | 999898 1024 | 100 | 0.78 | 0 ``` import org.apache.spark.mllib.rdd.RDDFunctions._ val rdd = sc.parallelize(1 to 1000, 10) rdd.count val window = 1024 val step = 1 val t = System.nanoTime(); val windows = rdd.sliding(window, step); println(windows.count); println((System.nanoTime() - t) / 1e9) ``` Author: unknown Author: Alexander Ulanov Author: Xiangrui Meng Closes #5855 from avulanov/SPARK-7316-sliding. (cherry picked from commit dba1a62cf1baa9ae1ee665d592e01dfad78331a2) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0180106 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0180106 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0180106 Branch: refs/heads/branch-1.6 Commit: f0180106a044a7e700d7c0d3818805968819946d Parents: 80641c4 Author: unknown Authored: Tue Nov 10 14:25:06 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 14:25:19 2015 -0800 -- .../apache/spark/mllib/rdd/RDDFunctions.scala | 11 ++- .../org/apache/spark/mllib/rdd/SlidingRDD.scala | 71 +++- .../spark/mllib/rdd/RDDFunctionsSuite.scala | 11 +-- 3 files changed, 54 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0180106/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala index 7817284..19a047d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala @@ -37,16 +37,21 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) extends Serializable { * trigger a Spark job if the parent RDD has more than one partitions and the window size is * greater than 1. */ - def sliding(windowSize: Int): RDD[Array[T]] = { + def sliding(windowSize: Int, step: Int): RDD[Array[T]] = { require(windowSize > 0, s"Sliding window size must be positive, but got $windowSize.") -if (windowSize == 1) { +if (windowSize == 1 && step == 1) { self.map(Array(_)) } else { - new SlidingRDD[T](self, windowSize) + new SlidingRDD[T](self, windowSize, step) } } /** + * [[sliding(Int, Int)*]] with step = 1. + */ + def sliding(windowSize: Int): RDD[Array[T]] = sliding(windowSize, 1) + + /** * Reduces the elements of this RDD in a multi-level tree pattern. * * @param depth suggested depth of the tree (default: 2) http://git-wip-us.apache.org/repos/asf/spark/blob/f0180106/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala index 1facf83..ead8db6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala @@ -24,13 +24,13 @@ import org.apache.spark.{TaskContext, Partition} import org.apache.spark.rdd.RDD private[mllib] -class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]) +class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T], val offset: Int) extends Partition with Serializable { override val index: Int = idx } /** - * Represents a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding + * Represents an RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding * window over them. The ordering is first based on the partition index and then the ordering of * items within each partition. This is similar to sliding in Scala collections, except that it * becomes an empty RDD if the window size is greater than the total number of items. It needs to @@ -40,19 +40,24 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, va
spark git commit: [SPARK-7316][MLLIB] RDD sliding window with step
Repository: spark Updated Branches: refs/heads/master 18350a570 -> dba1a62cf [SPARK-7316][MLLIB] RDD sliding window with step Implementation of step capability for sliding window function in MLlib's RDD. Though one can use current sliding window with step 1 and then filter every Nth window, it will take more time and space (N*data.count times more than needed). For example, below are the results for various windows and steps on 10M data points: Window | Step | Time | Windows produced | - | -- | -- 128 | 1 | 6.38 | 873 128 | 10 | 0.9 | 88 128 | 100 | 0.41 | 9 1024 | 1 | 44.67 | 9998977 1024 | 10 | 4.74 | 999898 1024 | 100 | 0.78 | 0 ``` import org.apache.spark.mllib.rdd.RDDFunctions._ val rdd = sc.parallelize(1 to 1000, 10) rdd.count val window = 1024 val step = 1 val t = System.nanoTime(); val windows = rdd.sliding(window, step); println(windows.count); println((System.nanoTime() - t) / 1e9) ``` Author: unknown Author: Alexander Ulanov Author: Xiangrui Meng Closes #5855 from avulanov/SPARK-7316-sliding. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba1a62c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba1a62c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba1a62c Branch: refs/heads/master Commit: dba1a62cf1baa9ae1ee665d592e01dfad78331a2 Parents: 18350a5 Author: unknown Authored: Tue Nov 10 14:25:06 2015 -0800 Committer: Xiangrui Meng Committed: Tue Nov 10 14:25:06 2015 -0800 -- .../apache/spark/mllib/rdd/RDDFunctions.scala | 11 ++- .../org/apache/spark/mllib/rdd/SlidingRDD.scala | 71 +++- .../spark/mllib/rdd/RDDFunctionsSuite.scala | 11 +-- 3 files changed, 54 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dba1a62c/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala index 7817284..19a047d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala @@ -37,16 +37,21 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) extends Serializable { * trigger a Spark job if the parent RDD has more than one partitions and the window size is * greater than 1. */ - def sliding(windowSize: Int): RDD[Array[T]] = { + def sliding(windowSize: Int, step: Int): RDD[Array[T]] = { require(windowSize > 0, s"Sliding window size must be positive, but got $windowSize.") -if (windowSize == 1) { +if (windowSize == 1 && step == 1) { self.map(Array(_)) } else { - new SlidingRDD[T](self, windowSize) + new SlidingRDD[T](self, windowSize, step) } } /** + * [[sliding(Int, Int)*]] with step = 1. + */ + def sliding(windowSize: Int): RDD[Array[T]] = sliding(windowSize, 1) + + /** * Reduces the elements of this RDD in a multi-level tree pattern. * * @param depth suggested depth of the tree (default: 2) http://git-wip-us.apache.org/repos/asf/spark/blob/dba1a62c/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala index 1facf83..ead8db6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala @@ -24,13 +24,13 @@ import org.apache.spark.{TaskContext, Partition} import org.apache.spark.rdd.RDD private[mllib] -class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]) +class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T], val offset: Int) extends Partition with Serializable { override val index: Int = idx } /** - * Represents a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding + * Represents an RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding * window over them. The ordering is first based on the partition index and then the ordering of * items within each partition. This is similar to sliding in Scala collections, except that it * becomes an empty RDD if the window size is greater than the total number of items. It needs to @@ -40,19 +40,24 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T] * * @param parent the parent RDD * @param windowSize the window size, must be greater t