Github user shivaram commented on a diff in the pull request:
https://github.com/apache/spark/pull/506#discussion_r11921440
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala ---
@@ -44,6 +44,54 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
new SlidingRDD[T](self, windowSize)
}
}
+
+ /**
+ * Returns an RDD with the specified slice of partitions.
+ */
+ def slicePartitions(slice: Seq[Int]): RDD[T] = {
+ new PartitionSlicingRDD(self, slice)
+ }
+
+ /**
+ * Computes the all-reduced RDD of the parent RDD, which has the same
number of partitions and
+ * locality information as its parent RDD. Each partition contains only
one record, which is the
+ * same as calling `RDD#reduce` on its parent RDD.
+ *
+ * @param f reducer
+ * @return all-reduced RDD
+ */
+ def allReduce(f: (T, T) => T): RDD[T] = {
+ val numPartitions = self.partitions.size
+ require(numPartitions > 0, "Parent RDD does not have any partitions.")
+ val nextPowerOfTwo = {
+ var i = 0
+ while ((numPartitions >> i) > 0) {
+ i += 1
+ }
+ 1 << i
+ }
+ var butterfly = self.mapPartitions( (iter) =>
+ Iterator(iter.reduce(f)),
+ preservesPartitioning = true
+ ).cache()
+
+ if (nextPowerOfTwo > numPartitions) {
+ val padding = self.context.parallelize(Seq.empty[T], nextPowerOfTwo
- numPartitions)
+ butterfly = butterfly.union(padding)
+ }
+
+ var offset = nextPowerOfTwo >> 1
+ while (offset > 0) {
+ butterfly = new ButterflyReducedRDD[T](butterfly, f, offset).cache()
--- End diff --
IMHO its a little risky to cache all the iterations of this loop in terms
of memory usage. The right thing to do is to probably hold references to them
and unpersist at the end ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---