cloud-fan commented on a change in pull request #27833: [SPARK-31070][SQL] make
skew join split skewed partitions more evenly
URL: https://github.com/apache/spark/pull/27833#discussion_r389784781
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
##########
@@ -114,4 +114,45 @@ object ShufflePartitionsCoalescer extends Logging {
partitionSpecs.toArray
}
+
+ /**
+ * Given a list of size, return an array of indices to split the list into
multiple partitions,
+ * so that the size sum of each partition is close to target size. Each
index indicates the start
+ * of a partition.
+ */
+ def splitSizeListByTargetSize(sizes: Seq[Long], targetSize: Long):
Array[Int] = {
+ val partitionStartIndices = ArrayBuffer[Int]()
+ partitionStartIndices += 0
+ var i = 0
+ var currentSizeSum = 0L
+ var lastPartitionSize = -1L
+
+ def tryMergePartitions() = {
+ // When we are going to start a new partition, it's possible that the
current partition or
+ // the previous partition is very small and it's better to merge the
current partition into
+ // the previous partition.
+ val shouldMergePartitions = lastPartitionSize > -1 &&
+ ((currentSizeSum + lastPartitionSize) < targetSize * 1.3 ||
+ (currentSizeSum < targetSize * 0.3 || lastPartitionSize < targetSize *
0.3))
Review comment:
Why pick 1.3? The worst case is: we merge 2 partitions with size 0.65 and
0.65 to a single 1.3 partition, which is acceptable.
Why pick 0.3? personal preference :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]