Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20091#discussion_r162531289
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -67,31 +69,32 @@ object Partitioner {
None
}
- if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
+ val defaultNumPartitions = if
(rdd.context.conf.contains("spark.default.parallelism")) {
+ rdd.context.defaultParallelism
+ } else {
+ rdds.map(_.partitions.length).max
+ }
+
+ // If the existing max partitioner is an eligible one, or its
partitions number is larger
+ // than the default number of partitions, use the existing partitioner.
+ if (hasMaxPartitioner.nonEmpty &&
(isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
+ defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
--- End diff --
This is the core change. I think it makes sense as it fixes a regression in
https://github.com/apache/spark/pull/20002
If the partitioner is not eligible, but its numPartition is larger the the
default one, we should still pick this partitioner instead of creating a new
one.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]