wangyum commented on PR #48789:
URL: https://github.com/apache/spark/pull/48789#issuecomment-2475505591
Logical plan:
```
GlobalLimit 100
+- LocalLimit 100
+- Sort [year#263 ASC NULLS FIRST, course#262 ASC NULLS FIRST], true
+- Window [sum(earnings#264) windowspecdefinition(year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS balance#261], [year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST]
+- Project [year#263, course#262, earnings#264]
+- Relation spark_catalog.default.t
```
Input physical plan in `AdaptiveSparkPlanExec`:
```
TakeOrderedAndProject(limit=100, orderBy=[year#263 ASC NULLS
FIRST,course#262 ASC NULLS FIRST],
output=[year#263,course#262,earnings#264,balance#261])
+- Window [sum(earnings#264) windowspecdefinition(year#263 ASC NULLS FIRST,
course#262 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS balance#261], [year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST]
+- Project [year#263, course#262, earnings#264]
+- FileScan parquet spark_catalog.default.t
```
After `RemoveRedundantProjects` and `EnsureRequirements` in
`queryStagePreparationRules`:
```
TakeOrderedAndProject(limit=100, orderBy=[year#263 ASC NULLS
FIRST,course#262 ASC NULLS FIRST],
output=[year#263,course#262,earnings#264,balance#261])
+- Window [sum(earnings#264) windowspecdefinition(year#263 ASC NULLS FIRST,
course#262 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS balance#261], [year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST]
+- FileScan parquet spark_catalog.default.t
TakeOrderedAndProject(limit=100, orderBy=[year#263 ASC NULLS
FIRST,course#262 ASC NULLS FIRST],
output=[year#263,course#262,earnings#264,balance#261])
+- Window [sum(earnings#264) windowspecdefinition(year#263 ASC NULLS FIRST,
course#262 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS balance#261], [year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST]
+- Sort [year#263 ASC NULLS FIRST, course#262 ASC NULLS FIRST], false, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=108]
+- FileScan parquet spark_catalog.default.t
```
So Initial physical plan in `AdaptiveSparkPlanExec`:
```
TakeOrderedAndProject(limit=100, orderBy=[year#263 ASC NULLS
FIRST,course#262 ASC NULLS FIRST],
output=[year#263,course#262,earnings#264,balance#261])
+- Window [sum(earnings#264) windowspecdefinition(year#263 ASC NULLS FIRST,
course#262 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS balance#261], [year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST]
+- Sort [year#263 ASC NULLS FIRST, course#262 ASC NULLS FIRST], false, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=108]
+- FileScan parquet spark_catalog.default.t
```
After first query stage finished,
Logical plan:
```
GlobalLimit 100
+- LocalLimit 100
+- Sort [year#263 ASC NULLS FIRST, course#262 ASC NULLS FIRST], true
+- Window [sum(earnings#264) windowspecdefinition(year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS balance#261], [year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST]
+- LogicalQueryStage Project [year#263, course#262, earnings#264],
ShuffleQueryStage 0
```
Current physical plan:
```
TakeOrderedAndProject(limit=100, orderBy=[year#263 ASC NULLS
FIRST,course#262 ASC NULLS FIRST],
output=[year#263,course#262,earnings#264,balance#261])
+- Window [sum(earnings#264) windowspecdefinition(year#263 ASC NULLS FIRST,
course#262 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS balance#261], [year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST]
+- Sort [year#263 ASC NULLS FIRST, course#262 ASC NULLS FIRST], false, 0
+- ShuffleQueryStage 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=119]
+- *(1) ColumnarToRow
+- FileScan parquet spark_catalog.default.t
```
After
[reOptimize](https://github.com/apache/spark/blob/c96a570b1b00c51f296ae2cbcfd45caa930fe995/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L382),`EliminateLimits`
will remove the limit, the logical plan:
```
Window [sum(earnings#264) windowspecdefinition(year#263 ASC NULLS FIRST,
course#262 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS balance#261], [year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST]
+- Sort [year#263 ASC NULLS FIRST, course#262 ASC NULLS FIRST], false, 0
+- ShuffleQueryStage 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=119]
+- *(1) ColumnarToRow
+- FileScan parquet spark_catalog.default.t
```
The physical plan:
```
Window [sum(earnings#264) windowspecdefinition(year#263 ASC NULLS FIRST,
course#262 ASC NULLS FIRST, specifiedwindowframe(RangeFrame,
unboundedpreceding$(), currentrow$())) AS balance#261], [year#263 ASC NULLS
FIRST, course#262 ASC NULLS FIRST]
+- Sort [year#263 ASC NULLS FIRST, course#262 ASC NULLS FIRST], false, 0
+- ShuffleQueryStage 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=119]
+- *(1) ColumnarToRow
+- FileScan parquet spark_catalog.default.t
```
Then the output column changed,
Before:
```
0 = {AttributeReference@21872} year#263
1 = {AttributeReference@21873} course#262
2 = {AttributeReference@21874} earnings#264
3 = {AttributeReference@21875} balance#261
```
After:
```
0 = {AttributeReference@21865} course#262
1 = {AttributeReference@21866} year#263
2 = {AttributeReference@21867} earnings#264
3 = {AttributeReference@21868} balance#261
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]