cloud-fan commented on a change in pull request #26516: [SPARK-29893] improve
the local shuffle reader performance by changing the reading task number from 1
to multi.
URL: https://github.com/apache/spark/pull/26516#discussion_r346760469
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
##########
@@ -52,16 +51,48 @@ private final class LocalShuffledRowRDDPartition(
*/
class LocalShuffledRowRDD(
var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
- metrics: Map[String, SQLMetric])
+ metrics: Map[String, SQLMetric],
+ specifiedPartitionStartIndices: Option[Array[Int]] = None)
extends RDD[InternalRow](dependency.rdd.context, Nil) {
private[this] val numReducers = dependency.partitioner.numPartitions
private[this] val numMappers = dependency.rdd.partitions.length
override def getDependencies: Seq[Dependency[_]] = List(dependency)
- override def getPartitions: Array[Partition] = {
+ /**
+ * To equally divide n elements into m buckets, basically each bucket should
have n/m elements,
+ * for the remaining n%m elements, add one more element to the first n%m
buckets each. Returns
+ * a sequence with length numBuckets and each value represents the start
index of each bucket.
+ */
+ def equallyDivide(numElements: Int, numBuckets: Int): Seq[Int] = {
+ val elementsPerBucket = numElements / numBuckets
+ val remaining = numElements % numBuckets
+ val splitPoint = (elementsPerBucket + 1) * remaining
+ (0 until remaining).map(_ * (elementsPerBucket + 1)) ++
+ (remaining until numBuckets).map(i => splitPoint + (i - remaining) *
elementsPerBucket)
+ }
+
+ private[this] val partitionStartIndices: Array[Int] =
specifiedPartitionStartIndices match {
+ case Some(indices) => indices
Review comment:
Let's say we have N mappers and M reducers. The coalesced reader reads data
of multiple reducers with one task, so its parallelism is a value K that is
smaller than M.
That said, the `partitionStartIndices` from the coalesced reader is an int
array of length M. If we apply it directly here, out parallelism will be N * K,
which is unexpected.
I think `LocalShuffledRowRDD` should take a parameter `advisoryParallelism:
Option[Int]`, and we equally package the partitions to hit
`advisoryParallelism`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]