EnricoMi opened a new pull request, #39431:
URL: https://github.com/apache/spark/pull/39431

   ### What changes were proposed in this pull request?
   The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing 
the plan's `outputOrdering`. This is required when planned writing is disabled 
(`spark.sql.optimizer.plannedWrite.enabled` is `true` by default). With planned 
writing enabled `FileFormatWriter` gets the final plan already.
   
   ### Why are the changes needed?
   `FileFormatWriter` enforces an ordering if the written plan does not provide 
that ordering. An `AdaptiveQueryPlan` does not know its final ordering, in 
which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even 
if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In 
case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see 
SPARK-40588).
   
   ### Does this PR introduce _any_ user-facing change?
   This fixes SPARK-40588 for 3.4, which was introduced in 3.0. This restores 
behaviour from Spark 2.4.
   
   ### How was this patch tested?
   The final plan that is written to files is now stored in 
`FileFormatWriter.executedPlan` (similar to existing 
`FileFormatWriter.outputOrderingMatched`). Unit tests assert the outer-most 
sort order written to files.
   
   The actual plan written into the files changed from (taken from 
`"SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition 
column"`):
   
   ```
   Sort [input[2, int, false] ASC NULLS FIRST], false, 0
   +- *(3) Sort [key#13 ASC NULLS FIRST, value#14 ASC NULLS FIRST], false, 0
      +- *(3) Project [b#24, value#14, key#13]
         +- *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
            :- BroadcastQueryStage 2
            :  +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), 
[plan_id=376]
            :     +- AQEShuffleRead local
            :        +- ShuffleQueryStage 0
            :           +- Exchange hashpartitioning(key#13, 5), 
ENSURE_REQUIREMENTS, [plan_id=328]
            :              +- *(1) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, 
true) AS value#14]
            :                 +- Scan[obj#12]
            +- AQEShuffleRead local
               +- ShuffleQueryStage 1
                  +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, 
[plan_id=345]
                     +- *(2) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, 
knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
                        +- Scan[obj#22]
   ```
   
   where `FileFormatWriter` enforces order with `Sort [input[2, int, false] ASC 
NULLS FIRST], false, 0`, to
   
   ```
   *(3) Sort [key#13 ASC NULLS FIRST, value#14 ASC NULLS FIRST], false, 0
   +- *(3) Project [b#24, value#14, key#13]
      +- *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
         :- BroadcastQueryStage 2
         :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, 
int, false] as bigint)),false), [plan_id=375]
         :     +- AQEShuffleRead local
         :        +- ShuffleQueryStage 0
         :           +- Exchange hashpartitioning(key#13, 5), 
ENSURE_REQUIREMENTS, [plan_id=327]
         :              +- *(1) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, 
true) AS value#14]
         :                 +- Scan[obj#12]
         +- AQEShuffleRead local
            +- ShuffleQueryStage 1
               +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, 
[plan_id=344]
                  +- *(2) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, 
knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
                     +- Scan[obj#22]
   ```
   
   where the sort given by the user is the outermost sort now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to