cloud-fan commented on code in PR #39431:
URL: https://github.com/apache/spark/pull/39431#discussion_r1064261228
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala:
##########
@@ -138,9 +144,21 @@ object FileFormatWriter extends Logging {
val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan)
+
+ // SPARK-40588: when planned writing is disabled and AQE is enabled,
+ // plan contains an AdaptiveSparkPlanExec, which does not know
+ // its final plan's ordering, so we have to materialize that plan first
+ // it is fine to use plan further down as the final plan is cached in that
plan
+ def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {
+ case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
+ case p: SparkPlan =>
p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan))
+ }
+
// the sort order doesn't matter
// Use the output ordering from the original plan before adding the
empty2null projection.
- val actualOrdering =
writeFilesOpt.map(_.child).getOrElse(plan).outputOrdering.map(_.child)
+ val actualOrdering = writeFilesOpt.map(_.child)
+ .getOrElse(materializeAdaptiveSparkPlan(plan))
Review Comment:
`.getOrElse` already does what you said, isn't it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]