This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2fece076a30 [SPARK-42779][SQL][FOLLOWUP] Allow V2 writes to indicate advisory shuffle partition size 2fece076a30 is described below commit 2fece076a30566bc152152fef587f5c1b4fca980 Author: aokolnychyi <aokolnyc...@apple.com> AuthorDate: Mon Mar 20 19:27:56 2023 +0800 [SPARK-42779][SQL][FOLLOWUP] Allow V2 writes to indicate advisory shuffle partition size ### What changes were proposed in this pull request? This PR addresses non-blocking comments for PR #40421. ### Why are the changes needed? These changes are needed to make sure the new logic only applies in expected cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #40478 from aokolnychyi/spark-42779-followup. Authored-by: aokolnychyi <aokolnyc...@apple.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/adaptive/CoalesceShufflePartitions.scala | 14 +++++++++----- .../spark/sql/execution/adaptive/QueryStageExec.scala | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index 6cca562b6ab..34399001c72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -122,12 +122,16 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe } } + // data sources may request a particular advisory partition size for the final write stage + // if it happens, the advisory partition size will be set in ShuffleQueryStageExec + // only one shuffle stage is expected in such cases private def advisoryPartitionSize(shuffleStages: Seq[ShuffleStageInfo]): Long = { - val advisorySizes = shuffleStages.flatMap(_.shuffleStage.advisoryPartitionSize).toSet - if (advisorySizes.size == 1) { - advisorySizes.head - } else { - conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) + val defaultAdvisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) + shuffleStages match { + case Seq(stage) => + stage.shuffleStage.advisoryPartitionSize.getOrElse(defaultAdvisorySize) + case _ => + defaultAdvisorySize } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 97a4bd617e9..a27f783215e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -180,7 +180,7 @@ case class ShuffleQueryStageExec( throw new IllegalStateException(s"wrong plan for shuffle stage:\n ${plan.treeString}") } - @transient val advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize + def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize @transient private lazy val shuffleFuture = shuffle.submitShuffleJob --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org