zhengruifeng commented on a change in pull request #27947: 
[SPARK-31182][CORE][ML] PairRDD support aggregateByKeyWithinPartitions
URL: https://github.com/apache/spark/pull/27947#discussion_r394202748
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
 ##########
 @@ -99,6 +99,35 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     }
   }
 
+  /**
+   * Generic function to combine the elements for each key using a custom set 
of aggregation
+   * functions within each partition. Turns an RDD[(K, V)] into a result of 
type RDD[(K, C)],
+   * for a "combined type" C
+   *
+   * Users provide three functions:
+   *
+   *  - `createCombiner`, which turns a V into a C (e.g., creates a 
one-element list)
+   *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a 
list)
+   *  - `mergeCombiners`, to combine two C's into a single one.
+   *
+   * @note V and C can be different -- for example, one might group an RDD of 
type
+   * (Int, Int) into an RDD of type (Int, Seq[Int]).
+   */
+  def combineByKeyWithClassTagWithinPartitions[C](
 
 Review comment:
   this impl follows `combineByKeyWithClassTag` (treat as if `self.partitioner 
== Some(partitioner)`)
   ```scala
   def combineByKeyWithClassTag[C](
         createCombiner: V => C,
         mergeValue: (C, V) => C,
         mergeCombiners: (C, C) => C,
         partitioner: Partitioner,
         mapSideCombine: Boolean = true,
         serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] 
= self.withScope {
       require(mergeCombiners != null, "mergeCombiners must be defined") // 
required as of Spark 0.9.0
       if (keyClass.isArray) {
         if (mapSideCombine) {
           throw new SparkException("Cannot use map-side combining with array 
keys.")
         }
         if (partitioner.isInstanceOf[HashPartitioner]) {
           throw new SparkException("HashPartitioner cannot partition array 
keys.")
         }
       }
       val aggregator = new Aggregator[K, V, C](
         self.context.clean(createCombiner),
         self.context.clean(mergeValue),
         self.context.clean(mergeCombiners))
       if (self.partitioner == Some(partitioner)) {
         self.mapPartitions(iter => {
           val context = TaskContext.get()
           new InterruptibleIterator(context, 
aggregator.combineValuesByKey(iter, context))
         }, preservesPartitioning = true)
       } else {
         new ShuffledRDD[K, V, C](self, partitioner)
           .setSerializer(serializer)
           .setAggregator(aggregator)
           .setMapSideCombine(mapSideCombine)
       }
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to