EnricoMi opened a new pull request, #39431:
URL: https://github.com/apache/spark/pull/39431
### What changes were proposed in this pull request?
The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing
the plan's `outputOrdering`. This is required when planned writing is disabled
(`spark.sql.optimizer.plannedWrite.enabled` is `true` by default). With planned
writing enabled `FileFormatWriter` gets the final plan already.
### Why are the changes needed?
`FileFormatWriter` enforces an ordering if the written plan does not provide
that ordering. An `AdaptiveQueryPlan` does not know its final ordering, in
which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even
if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In
case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see
SPARK-40588).
### Does this PR introduce _any_ user-facing change?
This fixes SPARK-40588 for 3.4, which was introduced in 3.0. This restores
behaviour from Spark 2.4.
### How was this patch tested?
The final plan that is written to files is now stored in
`FileFormatWriter.executedPlan` (similar to existing
`FileFormatWriter.outputOrderingMatched`). Unit tests assert the outer-most
sort order written to files.
The actual plan written into the files changed from (taken from
`"SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition
column"`):
```
Sort [input[2, int, false] ASC NULLS FIRST], false, 0
+- *(3) Sort [key#13 ASC NULLS FIRST, value#14 ASC NULLS FIRST], false, 0
+- *(3) Project [b#24, value#14, key#13]
+- *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
:- BroadcastQueryStage 2
: +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false),
[plan_id=376]
: +- AQEShuffleRead local
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(key#13, 5),
ENSURE_REQUIREMENTS, [plan_id=328]
: +- *(1) SerializeFromObject
[knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13,
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false,
true) AS value#14]
: +- Scan[obj#12]
+- AQEShuffleRead local
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS,
[plan_id=345]
+- *(2) SerializeFromObject
[knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23,
knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
+- Scan[obj#22]
```
where `FileFormatWriter` enforces order with `Sort [input[2, int, false] ASC
NULLS FIRST], false, 0`, to
```
*(3) Sort [key#13 ASC NULLS FIRST, value#14 ASC NULLS FIRST], false, 0
+- *(3) Project [b#24, value#14, key#13]
+- *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
:- BroadcastQueryStage 2
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, false] as bigint)),false), [plan_id=375]
: +- AQEShuffleRead local
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(key#13, 5),
ENSURE_REQUIREMENTS, [plan_id=327]
: +- *(1) SerializeFromObject
[knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13,
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false,
true) AS value#14]
: +- Scan[obj#12]
+- AQEShuffleRead local
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS,
[plan_id=344]
+- *(2) SerializeFromObject
[knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23,
knownnotnull(assertnotnull(input[0,
org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
+- Scan[obj#22]
```
where the sort given by the user is the outermost sort now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]