Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/506#discussion_r11921949
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala ---
@@ -44,6 +44,54 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
new SlidingRDD[T](self, windowSize)
}
}
+
+ /**
+ * Returns an RDD with the specified slice of partitions.
+ */
+ def slicePartitions(slice: Seq[Int]): RDD[T] = {
+ new PartitionSlicingRDD(self, slice)
+ }
+
+ /**
+ * Computes the all-reduced RDD of the parent RDD, which has the same
number of partitions and
+ * locality information as its parent RDD. Each partition contains only
one record, which is the
+ * same as calling `RDD#reduce` on its parent RDD.
+ *
+ * @param f reducer
+ * @return all-reduced RDD
+ */
+ def allReduce(f: (T, T) => T): RDD[T] = {
+ val numPartitions = self.partitions.size
+ require(numPartitions > 0, "Parent RDD does not have any partitions.")
+ val nextPowerOfTwo = {
+ var i = 0
+ while ((numPartitions >> i) > 0) {
+ i += 1
+ }
+ 1 << i
+ }
+ var butterfly = self.mapPartitions( (iter) =>
+ Iterator(iter.reduce(f)),
+ preservesPartitioning = true
+ ).cache()
+
+ if (nextPowerOfTwo > numPartitions) {
+ val padding = self.context.parallelize(Seq.empty[T], nextPowerOfTwo
- numPartitions)
+ butterfly = butterfly.union(padding)
+ }
+
+ var offset = nextPowerOfTwo >> 1
+ while (offset > 0) {
+ butterfly = new ButterflyReducedRDD[T](butterfly, f, offset).cache()
--- End diff --
Each partition will be visited twice in a butterfly step. If the previous
stage is not cached or falls out cache, the cost is huge. I'm looking at the
`RangeDependency` now. Maybe it can help.
Btw, I don't quite understand what do you mean by `hold references to
them`. Could you elaborate?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---