wangyum opened a new pull request, #38222:
URL: https://github.com/apache/spark/pull/38222
### What changes were proposed in this pull request?
This PR extract partitionings from children's output expressions. For
example:
```sql
CREATE TABLE t1(value string) using parquet;
CREATE TABLE t2(value string) using parquet;
set spark.sql.autoBroadcastJoinThreshold=-1;
SELECT upper(tmp.value),
max(tmp.value)
FROM (SELECT value,
upper(value) AS upper_value
FROM t1) tmp
JOIN t2
ON tmp.upper_value = t2.value
GROUP BY upper(tmp.value);
```
Before this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[_groupingexpression#238], functions=[max(value#229)],
output=[upper(value)#233, max(value)#234])
+- Sort [_groupingexpression#238 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(_groupingexpression#238, 5),
ENSURE_REQUIREMENTS, [plan_id=127]
+- SortAggregate(key=[_groupingexpression#238],
functions=[partial_max(value#229)], output=[_groupingexpression#238, max#240])
+- Sort [_groupingexpression#238 ASC NULLS FIRST], false, 0
+- Project [value#229, upper(value#229) AS
_groupingexpression#238]
+- SortMergeJoin [upper_value#228], [value#230], Inner
:- Sort [upper_value#228 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(upper_value#228, 5),
ENSURE_REQUIREMENTS, [plan_id=117]
: +- Project [value#229, upper(value#229) AS
upper_value#228]
: +- Filter isnotnull(upper(value#229))
: +- FileScan parquet
spark_catalog.default.t1[value#229]
+- Sort [value#230 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(value#230, 5),
ENSURE_REQUIREMENTS, [plan_id=118]
+- Filter isnotnull(value#230)
+- FileScan parquet
spark_catalog.default.t2[value#230]
```
After this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[_groupingexpression#238], functions=[max(value#229)],
output=[upper(value)#233, max(value)#234])
+- SortAggregate(key=[_groupingexpression#238],
functions=[partial_max(value#229)], output=[_groupingexpression#238, max#240])
+- Project [value#229, upper(value#229) AS _groupingexpression#238]
+- SortMergeJoin [upper_value#228], [value#230], Inner
:- Sort [upper_value#228 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(upper_value#228, 5),
ENSURE_REQUIREMENTS, [plan_id=117]
: +- Project [value#229, upper(value#229) AS upper_value#228]
: +- Filter isnotnull(upper(value#229))
: +- FileScan parquet
spark_catalog.default.t1[value#229]
+- Sort [value#230 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(value#230, 5),
ENSURE_REQUIREMENTS, [plan_id=118]
+- Filter isnotnull(value#230)
+- FileScan parquet spark_catalog.default.t2[value#230]
```
### Why are the changes needed?
Extract partitionings to reduce shuffle exchange.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]