peter-toth opened a new pull request #26044: [SPARK-29375][SQL][WIP] Exchange reuse across all subquery levels URL: https://github.com/apache/spark/pull/26044 ### What changes were proposed in this pull request? This PR: - enables exchange reuse across all subquery levels. - simplifies `ReuseExchange` code Example query: ``` SELECT (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key), a.key FROM testData AS a JOIN testData AS b ON b.key = a.key ``` Plan before this PR: ``` *(5) Project [Subquery scalar-subquery#240, [id=#193] AS scalarsubquery()#247, key#13] : +- Subquery scalar-subquery#240, [id=#193] : +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246]) : +- Exchange SinglePartition, true, [id=#189] : +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251]) : +- *(5) Project [key#13] : +- *(5) SortMergeJoin [key#13], [key#243], Inner : :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(key#13, 5), true, [id=#145] : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0 : +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145] +- *(5) SortMergeJoin [key#13], [key#241], Inner :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#13, 5), true, [id=#205] : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : +- Scan[obj#12] +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0 +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#205] ``` Plan after this PR: ``` *(5) Project [Subquery scalar-subquery#240, [id=#211] AS scalarsubquery()#247, key#13] : +- Subquery scalar-subquery#240, [id=#211] : +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246]) : +- Exchange SinglePartition, true, [id=#207] : +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251]) : +- *(5) Project [key#13] : +- *(5) SortMergeJoin [key#13], [key#243], Inner : :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(key#13, 5), true, [id=#145] : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0 : +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145] +- *(5) SortMergeJoin [key#13], [key#241], Inner :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : +- ReusedExchange [key#13], Exchange hashpartitioning(key#13, 5), true, [id=#145] +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0 +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#145] ``` ### Why are the changes needed? Performance improvement. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added new UTs.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
