[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin closed the pull request at: https://github.com/apache/spark/pull/16276 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin commented on a diff in the pull request: https://github.com/apache/spark/pull/16276#discussion_r92550699 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag]( } /** +* Return a new RDD by flattening all elements from RDD with traversable elements +*/ + def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { --- End diff -- @srowen I think i figured out a simpler way: ``` def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { new MapPartitionsRDD[U, T](this, (context, pid, iter) => { var newIter: Iterator[U] = Iterator.empty for (x <- iter) newIter ++= asTraversable(x) newIter }) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin commented on a diff in the pull request: https://github.com/apache/spark/pull/16276#discussion_r92546374 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag]( } /** +* Return a new RDD by flattening all elements from RDD with traversable elements +*/ + def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { --- End diff -- Hi @srowen, thx for your suggestion. I have one way to use scala flatMap as follows: ``` def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { val f = (x: T) => asTraversable(x) val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(f)) } ``` Or i implement the logic by myself: ``` def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { new MapPartitionsRDD[U, T](this, (context, pid, iter) => new Iterator[U] { private val empty = Iterator.empty private var cur: Iterator[U] = empty private def nextCur() { cur = asTraversable(iter.next).toIterator } def hasNext: Boolean = { while (!cur.hasNext) { if (!iter.hasNext) return false nextCur() } true } def next(): U = (if (hasNext) cur else empty).next() }) } ``` ref: https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/Iterator.scala#L432 Which one do you think is better? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user linbojin commented on a diff in the pull request: https://github.com/apache/spark/pull/16276#discussion_r92531327 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -88,6 +88,13 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("flatten") { +val nums = sc.makeRDD(Array(Array(1, 2, 3), Array(4, 5), Array(6)), 2) +assert(nums.flatten.collect().toList === List(1, 2, 3, 4, 5, 6)) +val strs = sc.makeRDD(Array(Array("a", "b", "c"), Array("d", "e"), Array("f")), 2) +assert(strs.flatten.collect().toList === List("a", "b", "c", "d", "e", "f")) --- End diff -- Thx, i will move test codes into "basic operations". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/16276#discussion_r92450063 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag]( } /** +* Return a new RDD by flattening all elements from RDD with traversable elements +*/ + def flatten[U: ClassTag](implicit asTraversable: T => TraversableOnce[U]): RDD[U] = withScope { --- End diff -- I think that if this is done, it's worth implementing more efficiently and not just by calling flatMap. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/16276#discussion_r92450069 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -88,6 +88,13 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("flatten") { +val nums = sc.makeRDD(Array(Array(1, 2, 3), Array(4, 5), Array(6)), 2) +assert(nums.flatten.collect().toList === List(1, 2, 3, 4, 5, 6)) +val strs = sc.makeRDD(Array(Array("a", "b", "c"), Array("d", "e"), Array("f")), 2) +assert(strs.flatten.collect().toList === List("a", "b", "c", "d", "e", "f")) --- End diff -- I don't think it's necessary to test both nums and strings, one would be fine and you should just move it to the "basic operations" test above and reuse an RDD if possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16276: [SPARK-18855][CORE] Add RDD flatten function
GitHub user linbojin opened a pull request: https://github.com/apache/spark/pull/16276 [SPARK-18855][CORE] Add RDD flatten function ## What changes were proposed in this pull request? Added a new flatten function for RDD. ## How was this patch tested? Unit tests inside RDDSuite and manually tests: ``` scala> val rdd = sc.makeRDD(List(List(1, 2, 3), List(4, 5), List(6))) rdd: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[0] at makeRDD at :24 scala> rdd.flatten.collect res0: Array[Int] = Array(1, 2, 3, 4, 5, 6) ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/linbojin/spark SPARK-18855-add-rdd-flatten Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16276.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16276 commit 2c0903ac07367cf203e4b1ed6bf4ac1894976ec9 Author: linbojinDate: 2016-12-14T06:04:48Z add RDD flatten function and tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org