[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-15 Thread linbojin
Github user linbojin closed the pull request at:

https://github.com/apache/spark/pull/16276


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-14 Thread linbojin
Github user linbojin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16276#discussion_r92550699
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+*  Return a new RDD by flattening all elements from RDD with 
traversable elements
+*/
+  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
--- End diff --

@srowen I think i figured out a simpler way:
```
  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
new MapPartitionsRDD[U, T](this, (context, pid, iter) => {
  var newIter: Iterator[U] = Iterator.empty
  for (x <- iter) newIter ++= asTraversable(x)
  newIter
})
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-14 Thread linbojin
Github user linbojin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16276#discussion_r92546374
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+*  Return a new RDD by flattening all elements from RDD with 
traversable elements
+*/
+  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
--- End diff --

Hi @srowen, thx for your suggestion. I have one way to use scala flatMap as 
follows:
```
  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
val f = (x: T) => asTraversable(x)
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => 
iter.flatMap(f))
  }
```
Or i implement the logic by myself:
```
  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
new MapPartitionsRDD[U, T](this, (context, pid, iter) => new 
Iterator[U] {
  private val empty = Iterator.empty
  private var cur: Iterator[U] = empty
  private def nextCur() { cur = asTraversable(iter.next).toIterator }
  def hasNext: Boolean = {
while (!cur.hasNext) {
  if (!iter.hasNext) return false
  nextCur()
}
true
  }
  def next(): U = (if (hasNext) cur else empty).next()
})
  }
```
ref: 
https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/Iterator.scala#L432

Which one do you think is better?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-14 Thread linbojin
Github user linbojin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16276#discussion_r92531327
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -88,6 +88,13 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 }
   }
 
+  test("flatten") {
+val nums = sc.makeRDD(Array(Array(1, 2, 3), Array(4, 5), Array(6)), 2)
+assert(nums.flatten.collect().toList === List(1, 2, 3, 4, 5, 6))
+val strs = sc.makeRDD(Array(Array("a", "b", "c"), Array("d", "e"), 
Array("f")), 2)
+assert(strs.flatten.collect().toList === List("a", "b", "c", "d", "e", 
"f"))
--- End diff --

Thx, i will move test codes into "basic operations".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-14 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16276#discussion_r92450063
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+*  Return a new RDD by flattening all elements from RDD with 
traversable elements
+*/
+  def flatten[U: ClassTag](implicit asTraversable: T => 
TraversableOnce[U]): RDD[U] = withScope {
--- End diff --

I think that if this is done, it's worth implementing more efficiently and 
not just by calling flatMap.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-14 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/16276#discussion_r92450069
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -88,6 +88,13 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 }
   }
 
+  test("flatten") {
+val nums = sc.makeRDD(Array(Array(1, 2, 3), Array(4, 5), Array(6)), 2)
+assert(nums.flatten.collect().toList === List(1, 2, 3, 4, 5, 6))
+val strs = sc.makeRDD(Array(Array("a", "b", "c"), Array("d", "e"), 
Array("f")), 2)
+assert(strs.flatten.collect().toList === List("a", "b", "c", "d", "e", 
"f"))
--- End diff --

I don't think it's necessary to test both nums and strings, one would be 
fine and you should just move it to the "basic operations" test above and reuse 
an RDD if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function

2016-12-13 Thread linbojin
GitHub user linbojin opened a pull request:

https://github.com/apache/spark/pull/16276

[SPARK-18855][CORE] Add RDD flatten function

## What changes were proposed in this pull request?

Added a new flatten function for RDD.

## How was this patch tested?

Unit tests inside RDDSuite and manually tests:
```
scala> val rdd = sc.makeRDD(List(List(1, 2, 3), List(4, 5), List(6)))
rdd: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[0] at 
makeRDD at :24

scala> rdd.flatten.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6)
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/linbojin/spark SPARK-18855-add-rdd-flatten

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16276


commit 2c0903ac07367cf203e4b1ed6bf4ac1894976ec9
Author: linbojin 
Date:   2016-12-14T06:04:48Z

add RDD flatten function and tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org