Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/506#discussion_r11922137
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala ---
    @@ -44,6 +44,54 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
           new SlidingRDD[T](self, windowSize)
         }
       }
    +
    +  /**
    +   * Returns an RDD with the specified slice of partitions.
    +   */
    +  def slicePartitions(slice: Seq[Int]): RDD[T] = {
    +    new PartitionSlicingRDD(self, slice)
    +  }
    +
    +  /**
    +   * Computes the all-reduced RDD of the parent RDD, which has the same 
number of partitions and
    +   * locality information as its parent RDD. Each partition contains only 
one record, which is the
    +   * same as calling `RDD#reduce` on its parent RDD.
    +   *
    +   * @param f reducer
    +   * @return all-reduced RDD
    +   */
    +  def allReduce(f: (T, T) => T): RDD[T] = {
    +    val numPartitions = self.partitions.size
    +    require(numPartitions > 0, "Parent RDD does not have any partitions.")
    +    val nextPowerOfTwo = {
    +      var i = 0
    +      while ((numPartitions >> i) > 0) {
    +        i += 1
    +      }
    +      1 << i
    +    }
    +    var butterfly = self.mapPartitions( (iter) =>
    +      Iterator(iter.reduce(f)),
    +      preservesPartitioning = true
    +    ).cache()
    +
    +    if (nextPowerOfTwo > numPartitions) {
    +      val padding = self.context.parallelize(Seq.empty[T], nextPowerOfTwo 
- numPartitions)
    +      butterfly = butterfly.union(padding)
    +    }
    +
    +    var offset = nextPowerOfTwo >> 1
    +    while (offset > 0) {
    +      butterfly = new ButterflyReducedRDD[T](butterfly, f, offset).cache()
    --- End diff --
    
    When we create a new RDD at each step we store the RDD references in say a 
ArrayBuffer. After the loop exits, we call unpersist on all the older RDDs. 
This doesn't work very well with lazy transformations, though allReduce doesn't 
need to be lazy ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to