Github user linbojin commented on a diff in the pull request:
https://github.com/apache/spark/pull/16276#discussion_r92546374
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -381,6 +381,14 @@ abstract class RDD[T: ClassTag](
}
/**
+ * Return a new RDD by flattening all elements from RDD with
traversable elements
+ */
+ def flatten[U: ClassTag](implicit asTraversable: T =>
TraversableOnce[U]): RDD[U] = withScope {
--- End diff --
Hi @srowen, thx for your suggestion. I have one way to use scala flatMap as
follows:
```
def flatten[U: ClassTag](implicit asTraversable: T =>
TraversableOnce[U]): RDD[U] = withScope {
val f = (x: T) => asTraversable(x)
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) =>
iter.flatMap(f))
}
```
Or i implement the logic by myself:
```
def flatten[U: ClassTag](implicit asTraversable: T =>
TraversableOnce[U]): RDD[U] = withScope {
new MapPartitionsRDD[U, T](this, (context, pid, iter) => new
Iterator[U] {
private val empty = Iterator.empty
private var cur: Iterator[U] = empty
private def nextCur() { cur = asTraversable(iter.next).toIterator }
def hasNext: Boolean = {
while (!cur.hasNext) {
if (!iter.hasNext) return false
nextCur()
}
true
}
def next(): U = (if (hasNext) cur else empty).next()
})
}
```
ref:
https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/Iterator.scala#L432
Which one do you think is better?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]