Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20091#discussion_r162556966
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -67,31 +69,32 @@ object Partitioner {
None
}
- if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
+ val defaultNumPartitions = if
(rdd.context.conf.contains("spark.default.parallelism")) {
+ rdd.context.defaultParallelism
+ } else {
+ rdds.map(_.partitions.length).max
+ }
+
+ // If the existing max partitioner is an eligible one, or its
partitions number is larger
+ // than the default number of partitions, use the existing partitioner.
+ if (hasMaxPartitioner.nonEmpty &&
(isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
+ defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
--- End diff --
I think we all agree that reusing partitioner is an existing behavior and
we should not stick to `spark.default.parallelism` here.
#20002 is good as it fixes a bad case where reusing partitioner slows down
the query. And this PR surgically fixed one regression introduced by #20002
that, even if the existing partitioner is not eligible(has very little
partitions), it's still better than fallback to default parallelism.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]