wangyum opened a new pull request, #39512:
URL: https://github.com/apache/spark/pull/39512
### What changes were proposed in this pull request?
This PR introduces shuffle on SinglePartition if it's physical size greater
than `spark.sql.adaptive.advisoryPartitionSizeInBytes`.
### Why are the changes needed?
Improve parallelism. For example:
```scala
spark.range(100000000L).selectExpr("id as a", "id as
b").write.saveAsTable("t1")
sql(
"""
|WITH base
| AS (select *, ROW_NUMBER() OVER(ORDER BY a) AS new_a FROM t1)
|SELECT * FROM base t1 JOIN base t2 ON t1.a = t2.b
|""".stripMargin).explain()
```
Before this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [a#10L], [b#26L], Inner
:- Filter isnotnull(a#10L)
: +- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
new_a#8], [a#10L ASC NULLS FIRST]
: +- Sort [a#10L ASC NULLS FIRST], false, 0
: +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=50]
: +- FileScan parquet spark_catalog.default.t1[a#10L,b#11L]
+- Sort [b#26L ASC NULLS FIRST], false, 0
+- Filter isnotnull(b#26L)
+- Window [row_number() windowspecdefinition(a#25L ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
new_a#27], [a#25L ASC NULLS FIRST]
+- Sort [a#25L ASC NULLS FIRST], false, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=54]
+- FileScan parquet spark_catalog.default.t1[a#25L,b#26L]
```
After this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [a#10L], [b#26L], Inner
:- Sort [a#10L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(a#10L, 5), ENSURE_REQUIREMENTS,
[plan_id=60]
: +- Filter isnotnull(a#10L)
: +- Window [row_number() windowspecdefinition(a#10L ASC NULLS
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
new_a#8], [a#10L ASC NULLS FIRST]
: +- Sort [a#10L ASC NULLS FIRST], false, 0
: +- Exchange SinglePartition, ENSURE_REQUIREMENTS,
[plan_id=50]
: +- FileScan parquet
spark_catalog.default.t1[a#10L,b#11L]
+- Sort [b#26L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(b#26L, 5), ENSURE_REQUIREMENTS,
[plan_id=61]
+- Filter isnotnull(b#26L)
+- Window [row_number() windowspecdefinition(a#25L ASC NULLS
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
new_a#27], [a#25L ASC NULLS FIRST]
+- Sort [a#25L ASC NULLS FIRST], false, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS,
[plan_id=54]
+- FileScan parquet
spark_catalog.default.t1[a#25L,b#26L]
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]