Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/20091#discussion_r162548620
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -67,31 +69,32 @@ object Partitioner {
None
}
- if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
+ val defaultNumPartitions = if
(rdd.context.conf.contains("spark.default.parallelism")) {
+ rdd.context.defaultParallelism
+ } else {
+ rdds.map(_.partitions.length).max
+ }
+
+ // If the existing max partitioner is an eligible one, or its
partitions number is larger
+ // than the default number of partitions, use the existing partitioner.
+ if (hasMaxPartitioner.nonEmpty &&
(isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
+ defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
--- End diff --
There are multiple cases here.
a) spark.default.parallelism is not set by user.
For this case, PR is a noop
b) maxPartitions is atleast an order higher than max partitioner
b.1) If spark.default.parallelism is not set, the PR is a noop.
b.2) spark.default.parallelism is explicitly set by user.
This is a change in behavior which has been introduced - rely on user
specified value instead of trying to infer it when inferred value is off by
atleast an order.
If users were setting suboptimal values for "spark.default.parallelism" -
then there will be a change in behavior - though I would argue this is the
*expected* behavior given documentation of 'spark.default.parallelism'
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]