spark git commit: [SPARK-7316][MLLIB] RDD sliding window with step

2015-11-10 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 80641c4fa -> f0180106a


[SPARK-7316][MLLIB] RDD sliding window with step

Implementation of step capability for sliding window function in MLlib's RDD.

Though one can use current sliding window with step 1 and then filter every Nth 
window, it will take more time and space (N*data.count times more than needed). 
For example, below are the results for various windows and steps on 10M data 
points:

Window | Step | Time | Windows produced
 | - | -- | --
128 | 1 |  6.38 | 873
128 | 10 | 0.9 | 88
128 | 100 | 0.41 | 9
1024 | 1 | 44.67 | 9998977
1024 | 10 | 4.74 | 999898
1024 | 100 | 0.78 | 0
```
import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd = sc.parallelize(1 to 1000, 10)
rdd.count
val window = 1024
val step = 1
val t = System.nanoTime(); val windows = rdd.sliding(window, step); 
println(windows.count); println((System.nanoTime() - t) / 1e9)
```

Author: unknown 
Author: Alexander Ulanov 
Author: Xiangrui Meng 

Closes #5855 from avulanov/SPARK-7316-sliding.

(cherry picked from commit dba1a62cf1baa9ae1ee665d592e01dfad78331a2)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0180106
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0180106
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0180106

Branch: refs/heads/branch-1.6
Commit: f0180106a044a7e700d7c0d3818805968819946d
Parents: 80641c4
Author: unknown 
Authored: Tue Nov 10 14:25:06 2015 -0800
Committer: Xiangrui Meng 
Committed: Tue Nov 10 14:25:19 2015 -0800

--
 .../apache/spark/mllib/rdd/RDDFunctions.scala   | 11 ++-
 .../org/apache/spark/mllib/rdd/SlidingRDD.scala | 71 +++-
 .../spark/mllib/rdd/RDDFunctionsSuite.scala | 11 +--
 3 files changed, 54 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f0180106/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
index 7817284..19a047d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
@@ -37,16 +37,21 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) extends 
Serializable {
* trigger a Spark job if the parent RDD has more than one partitions and 
the window size is
* greater than 1.
*/
-  def sliding(windowSize: Int): RDD[Array[T]] = {
+  def sliding(windowSize: Int, step: Int): RDD[Array[T]] = {
 require(windowSize > 0, s"Sliding window size must be positive, but got 
$windowSize.")
-if (windowSize == 1) {
+if (windowSize == 1 && step == 1) {
   self.map(Array(_))
 } else {
-  new SlidingRDD[T](self, windowSize)
+  new SlidingRDD[T](self, windowSize, step)
 }
   }
 
   /**
+   * [[sliding(Int, Int)*]] with step = 1.
+   */
+  def sliding(windowSize: Int): RDD[Array[T]] = sliding(windowSize, 1)
+
+  /**
* Reduces the elements of this RDD in a multi-level tree pattern.
*
* @param depth suggested depth of the tree (default: 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/f0180106/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
index 1facf83..ead8db6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
@@ -24,13 +24,13 @@ import org.apache.spark.{TaskContext, Partition}
 import org.apache.spark.rdd.RDD
 
 private[mllib]
-class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Seq[T])
+class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Seq[T], val offset: Int)
   extends Partition with Serializable {
   override val index: Int = idx
 }
 
 /**
- * Represents a RDD from grouping items of its parent RDD in fixed size blocks 
by passing a sliding
+ * Represents an RDD from grouping items of its parent RDD in fixed size 
blocks by passing a sliding
  * window over them. The ordering is first based on the partition index and 
then the ordering of
  * items within each partition. This is similar to sliding in Scala 
collections, except that it
  * becomes an empty RDD if the window size is greater than the total number of 
items. It needs to
@@ -40,19 +40,24 @@ class SlidingRDDPartition[T](val idx: Int, val prev: 
Partition, va

spark git commit: [SPARK-7316][MLLIB] RDD sliding window with step

2015-11-10 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 18350a570 -> dba1a62cf


[SPARK-7316][MLLIB] RDD sliding window with step

Implementation of step capability for sliding window function in MLlib's RDD.

Though one can use current sliding window with step 1 and then filter every Nth 
window, it will take more time and space (N*data.count times more than needed). 
For example, below are the results for various windows and steps on 10M data 
points:

Window | Step | Time | Windows produced
 | - | -- | --
128 | 1 |  6.38 | 873
128 | 10 | 0.9 | 88
128 | 100 | 0.41 | 9
1024 | 1 | 44.67 | 9998977
1024 | 10 | 4.74 | 999898
1024 | 100 | 0.78 | 0
```
import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd = sc.parallelize(1 to 1000, 10)
rdd.count
val window = 1024
val step = 1
val t = System.nanoTime(); val windows = rdd.sliding(window, step); 
println(windows.count); println((System.nanoTime() - t) / 1e9)
```

Author: unknown 
Author: Alexander Ulanov 
Author: Xiangrui Meng 

Closes #5855 from avulanov/SPARK-7316-sliding.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba1a62c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba1a62c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba1a62c

Branch: refs/heads/master
Commit: dba1a62cf1baa9ae1ee665d592e01dfad78331a2
Parents: 18350a5
Author: unknown 
Authored: Tue Nov 10 14:25:06 2015 -0800
Committer: Xiangrui Meng 
Committed: Tue Nov 10 14:25:06 2015 -0800

--
 .../apache/spark/mllib/rdd/RDDFunctions.scala   | 11 ++-
 .../org/apache/spark/mllib/rdd/SlidingRDD.scala | 71 +++-
 .../spark/mllib/rdd/RDDFunctionsSuite.scala | 11 +--
 3 files changed, 54 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dba1a62c/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
index 7817284..19a047d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
@@ -37,16 +37,21 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) extends 
Serializable {
* trigger a Spark job if the parent RDD has more than one partitions and 
the window size is
* greater than 1.
*/
-  def sliding(windowSize: Int): RDD[Array[T]] = {
+  def sliding(windowSize: Int, step: Int): RDD[Array[T]] = {
 require(windowSize > 0, s"Sliding window size must be positive, but got 
$windowSize.")
-if (windowSize == 1) {
+if (windowSize == 1 && step == 1) {
   self.map(Array(_))
 } else {
-  new SlidingRDD[T](self, windowSize)
+  new SlidingRDD[T](self, windowSize, step)
 }
   }
 
   /**
+   * [[sliding(Int, Int)*]] with step = 1.
+   */
+  def sliding(windowSize: Int): RDD[Array[T]] = sliding(windowSize, 1)
+
+  /**
* Reduces the elements of this RDD in a multi-level tree pattern.
*
* @param depth suggested depth of the tree (default: 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/dba1a62c/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
index 1facf83..ead8db6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
@@ -24,13 +24,13 @@ import org.apache.spark.{TaskContext, Partition}
 import org.apache.spark.rdd.RDD
 
 private[mllib]
-class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Seq[T])
+class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Seq[T], val offset: Int)
   extends Partition with Serializable {
   override val index: Int = idx
 }
 
 /**
- * Represents a RDD from grouping items of its parent RDD in fixed size blocks 
by passing a sliding
+ * Represents an RDD from grouping items of its parent RDD in fixed size 
blocks by passing a sliding
  * window over them. The ordering is first based on the partition index and 
then the ordering of
  * items within each partition. This is similar to sliding in Scala 
collections, except that it
  * becomes an empty RDD if the window size is greater than the total number of 
items. It needs to
@@ -40,19 +40,24 @@ class SlidingRDDPartition[T](val idx: Int, val prev: 
Partition, val tail: Seq[T]
  *
  * @param parent the parent RDD
  * @param windowSize the window size, must be greater t