wangyum commented on a change in pull request #35214:
URL: https://github.com/apache/spark/pull/35214#discussion_r786435360
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1322,6 +1322,12 @@ object CombineUnions extends Rule[LogicalPlan] {
case Union(children, byName, allowMissingCol)
if byName == topByName && allowMissingCol == topAllowMissingCol =>
stack.pushAll(children.reverse)
+ case Project(projectList, Distinct(u @ Union(children, byName,
allowMissingCol)))
Review comment:
It already supports `UNION ALL` but needs to be executed more times. We
can use same logic to make it only execute once.
```
13:51:52.309 ERROR org.apache.spark.sql.catalyst.rules.PlanChangeLogger:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineUnions ===
Union false, false
Union false, false
!:- Union false, false
:- Project [cast(id#34 as decimal(22,5)) AS id#36]
!: :- Project [cast(id#34 as decimal(22,5)) AS id#36]
: +- Union false, false
!: : +- Union false, false
: :- Project [cast(id#32 as decimal(21,4)) AS id#34]
!: : :- Project [cast(id#32 as decimal(21,4)) AS id#34]
: : +- Union false, false
!: : : +- Union false, false
: : :- Project [cast(cast(id#25 as decimal(19,2)) as
decimal(20,3)) AS id#32]
!: : : :- Project [cast(cast(id#25 as decimal(19,2)) as
decimal(20,3)) AS id#32] : : : +- Relation default.t1[id#25] parquet
!: : : : +- Relation default.t1[id#25] parquet
: : +- Project [cast(cast(id#26 as decimal(19,2)) as
decimal(20,3)) AS id#46]
!: : : +- Project [cast(cast(id#26 as decimal(19,2)) as
decimal(20,3)) AS id#46] : : +- Relation default.t2[id#26] parquet
!: : : +- Relation default.t2[id#26] parquet
: +- Project [cast(cast(id#27 as decimal(20,3)) as
decimal(21,4)) AS id#45]
!: : +- Project [cast(cast(id#27 as decimal(20,3)) as decimal(21,4)) AS
id#45] : +- Relation default.t3[id#27] parquet
!: : +- Relation default.t3[id#27] parquet
:- Project [cast(cast(id#28 as decimal(21,4)) as decimal(22,5))
AS id#44]
!: +- Project [cast(cast(id#28 as decimal(21,4)) as decimal(22,5)) AS
id#44] : +- Relation default.t4[id#28] parquet
!: +- Relation default.t4[id#28] parquet
+- Project [cast(id#29 as decimal(22,5)) AS id#37]
!+- Project [cast(id#29 as decimal(22,5)) AS id#37]
+- Relation default.t5[id#29] parquet
! +- Relation default.t5[id#29] parquet
13:51:52.319 ERROR org.apache.spark.sql.catalyst.rules.PlanChangeLogger:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyCasts ===
Union false, false
Union false, false
:- Project [cast(id#34 as decimal(22,5)) AS id#36]
:- Project [cast(id#34 as decimal(22,5)) AS id#36]
: +- Union false, false
: +- Union false, false
: :- Project [cast(id#32 as decimal(21,4)) AS id#34]
: :- Project [cast(id#32 as decimal(21,4)) AS id#34]
: : +- Union false, false
: : +- Union false, false
!: : :- Project [cast(cast(id#25 as decimal(19,2)) as decimal(20,3))
AS id#32] : : :- Project [cast(id#25 as decimal(20,3)) AS id#32]
: : : +- Relation default.t1[id#25] parquet
: : : +- Relation default.t1[id#25] parquet
!: : +- Project [cast(cast(id#26 as decimal(19,2)) as decimal(20,3))
AS id#46] : : +- Project [cast(id#26 as decimal(20,3)) AS id#46]
: : +- Relation default.t2[id#26] parquet
: : +- Relation default.t2[id#26] parquet
!: +- Project [cast(cast(id#27 as decimal(20,3)) as decimal(21,4)) AS
id#45] : +- Project [cast(id#27 as decimal(21,4)) AS id#45]
: +- Relation default.t3[id#27] parquet
: +- Relation default.t3[id#27] parquet
!:- Project [cast(cast(id#28 as decimal(21,4)) as decimal(22,5)) AS id#44]
:- Project [cast(id#28 as decimal(22,5)) AS id#44]
: +- Relation default.t4[id#28] parquet
: +- Relation default.t4[id#28] parquet
+- Project [cast(id#29 as decimal(22,5)) AS id#37]
+- Project [cast(id#29 as decimal(22,5)) AS id#37]
+- Relation default.t5[id#29] parquet
+- Relation default.t5[id#29] parquet
13:51:52.328 ERROR org.apache.spark.sql.catalyst.rules.PlanChangeLogger:
=== Applying Rule
org.apache.spark.sql.catalyst.optimizer.PushProjectionThroughUnion ===
Union false, false Union
false, false
!:- Project [cast(id#34 as decimal(22,5)) AS id#36] :- Union
false, false
!: +- Union false, false : :-
Project [cast(id#34 as decimal(22,5)) AS id#36]
!: :- Project [cast(id#32 as decimal(21,4)) AS id#34] : : +-
Union false, false
!: : +- Union false, false : :
:- Project [cast(id#32 as decimal(21,4)) AS id#34]
!: : :- Project [cast(id#25 as decimal(20,3)) AS id#32] : : :
+- Project [cast(id#25 as decimal(20,3)) AS id#32]
!: : : +- Relation default.t1[id#25] parquet : : :
+- Relation default.t1[id#25] parquet
!: : +- Project [cast(id#26 as decimal(20,3)) AS id#46] : :
+- Project [cast(id#46 as decimal(21,4)) AS id#48]
!: : +- Relation default.t2[id#26] parquet : :
+- Project [cast(id#26 as decimal(20,3)) AS id#46]
!: +- Project [cast(id#27 as decimal(21,4)) AS id#45] : :
+- Relation default.t2[id#26] parquet
!: +- Relation default.t3[id#27] parquet : +-
Project [cast(id#45 as decimal(22,5)) AS id#47]
!:- Project [cast(id#28 as decimal(22,5)) AS id#44] : +-
Project [cast(id#27 as decimal(21,4)) AS id#45]
!: +- Relation default.t4[id#28] parquet :
+- Relation default.t3[id#27] parquet
!+- Project [cast(id#29 as decimal(22,5)) AS id#37] :- Project
[cast(id#28 as decimal(22,5)) AS id#44]
! +- Relation default.t5[id#29] parquet : +-
Relation default.t4[id#28] parquet
! +- Project
[cast(id#29 as decimal(22,5)) AS id#37]
! +-
Relation default.t5[id#29] parquet
13:51:52.341 ERROR org.apache.spark.sql.catalyst.rules.PlanChangeLogger:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
Union false, false Union
false, false
:- Union false, false :- Union
false, false
: :- Project [cast(id#34 as decimal(22,5)) AS id#36] : :-
Project [cast(id#34 as decimal(22,5)) AS id#36]
: : +- Union false, false : : +-
Union false, false
!: : :- Project [cast(id#32 as decimal(21,4)) AS id#34] : :
:- Project [cast(cast(id#25 as decimal(20,3)) as decimal(21,4)) AS id#34]
!: : : +- Project [cast(id#25 as decimal(20,3)) AS id#32] : : :
+- Relation default.t1[id#25] parquet
!: : : +- Relation default.t1[id#25] parquet : :
+- Project [cast(cast(id#26 as decimal(20,3)) as decimal(21,4)) AS id#48]
!: : +- Project [cast(id#46 as decimal(21,4)) AS id#48] : :
+- Relation default.t2[id#26] parquet
!: : +- Project [cast(id#26 as decimal(20,3)) AS id#46] : +-
Project [cast(cast(id#27 as decimal(21,4)) as decimal(22,5)) AS id#47]
!: : +- Relation default.t2[id#26] parquet : +-
Relation default.t3[id#27] parquet
!: +- Project [cast(id#45 as decimal(22,5)) AS id#47] :- Project
[cast(id#28 as decimal(22,5)) AS id#44]
!: +- Project [cast(id#27 as decimal(21,4)) AS id#45] : +-
Relation default.t4[id#28] parquet
!: +- Relation default.t3[id#27] parquet +- Project
[cast(id#29 as decimal(22,5)) AS id#37]
!:- Project [cast(id#28 as decimal(22,5)) AS id#44] +-
Relation default.t5[id#29] parquet
!: +- Relation default.t4[id#28] parquet
!+- Project [cast(id#29 as decimal(22,5)) AS id#37]
! +- Relation default.t5[id#29] parquet
13:51:52.348 ERROR org.apache.spark.sql.catalyst.rules.PlanChangeLogger:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineUnions ===
Union false, false
Union false, false
!:- Union false, false
:- Project [cast(id#34 as decimal(22,5)) AS id#36]
!: :- Project [cast(id#34 as decimal(22,5)) AS id#36]
: +- Union false, false
!: : +- Union false, false
: :- Project [cast(cast(id#25 as decimal(20,3)) as decimal(21,4))
AS id#34]
!: : :- Project [cast(cast(id#25 as decimal(20,3)) as decimal(21,4)) AS
id#34] : : +- Relation default.t1[id#25] parquet
!: : : +- Relation default.t1[id#25] parquet
: +- Project [cast(cast(id#26 as decimal(20,3)) as decimal(21,4))
AS id#48]
!: : +- Project [cast(cast(id#26 as decimal(20,3)) as decimal(21,4)) AS
id#48] : +- Relation default.t2[id#26] parquet
!: : +- Relation default.t2[id#26] parquet
:- Project [cast(cast(id#27 as decimal(21,4)) as decimal(22,5)) AS
id#47]
!: +- Project [cast(cast(id#27 as decimal(21,4)) as decimal(22,5)) AS
id#47] : +- Relation default.t3[id#27] parquet
!: +- Relation default.t3[id#27] parquet
:- Project [cast(id#28 as decimal(22,5)) AS id#44]
!:- Project [cast(id#28 as decimal(22,5)) AS id#44]
: +- Relation default.t4[id#28] parquet
!: +- Relation default.t4[id#28] parquet
+- Project [cast(id#29 as decimal(22,5)) AS id#37]
!+- Project [cast(id#29 as decimal(22,5)) AS id#37]
+- Relation default.t5[id#29] parquet
! +- Relation default.t5[id#29] parquet
13:51:52.354 ERROR org.apache.spark.sql.catalyst.rules.PlanChangeLogger:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.SimplifyCasts ===
Union false, false
Union false, false
:- Project [cast(id#34 as decimal(22,5)) AS id#36]
:- Project [cast(id#34 as decimal(22,5)) AS id#36]
: +- Union false, false
: +- Union false, false
!: :- Project [cast(cast(id#25 as decimal(20,3)) as decimal(21,4)) AS
id#34] : :- Project [cast(id#25 as decimal(21,4)) AS id#34]
: : +- Relation default.t1[id#25] parquet
: : +- Relation default.t1[id#25] parquet
!: +- Project [cast(cast(id#26 as decimal(20,3)) as decimal(21,4)) AS
id#48] : +- Project [cast(id#26 as decimal(21,4)) AS id#48]
: +- Relation default.t2[id#26] parquet
: +- Relation default.t2[id#26] parquet
!:- Project [cast(cast(id#27 as decimal(21,4)) as decimal(22,5)) AS id#47]
:- Project [cast(id#27 as decimal(22,5)) AS id#47]
: +- Relation default.t3[id#27] parquet
: +- Relation default.t3[id#27] parquet
:- Project [cast(id#28 as decimal(22,5)) AS id#44]
:- Project [cast(id#28 as decimal(22,5)) AS id#44]
: +- Relation default.t4[id#28] parquet
: +- Relation default.t4[id#28] parquet
+- Project [cast(id#29 as decimal(22,5)) AS id#37]
+- Project [cast(id#29 as decimal(22,5)) AS id#37]
+- Relation default.t5[id#29] parquet
+- Relation default.t5[id#29] parquet
13:51:52.360 ERROR org.apache.spark.sql.catalyst.rules.PlanChangeLogger:
=== Applying Rule
org.apache.spark.sql.catalyst.optimizer.PushProjectionThroughUnion ===
Union false, false Union false,
false
!:- Project [cast(id#34 as decimal(22,5)) AS id#36] :- Union false,
false
!: +- Union false, false : :- Project
[cast(id#34 as decimal(22,5)) AS id#36]
!: :- Project [cast(id#25 as decimal(21,4)) AS id#34] : : +- Project
[cast(id#25 as decimal(21,4)) AS id#34]
!: : +- Relation default.t1[id#25] parquet : : +-
Relation default.t1[id#25] parquet
!: +- Project [cast(id#26 as decimal(21,4)) AS id#48] : +- Project
[cast(id#48 as decimal(22,5)) AS id#49]
!: +- Relation default.t2[id#26] parquet : +- Project
[cast(id#26 as decimal(21,4)) AS id#48]
!:- Project [cast(id#27 as decimal(22,5)) AS id#47] : +-
Relation default.t2[id#26] parquet
!: +- Relation default.t3[id#27] parquet :- Project
[cast(id#27 as decimal(22,5)) AS id#47]
!:- Project [cast(id#28 as decimal(22,5)) AS id#44] : +- Relation
default.t3[id#27] parquet
!: +- Relation default.t4[id#28] parquet :- Project
[cast(id#28 as decimal(22,5)) AS id#44]
!+- Project [cast(id#29 as decimal(22,5)) AS id#37] : +- Relation
default.t4[id#28] parquet
! +- Relation default.t5[id#29] parquet +- Project
[cast(id#29 as decimal(22,5)) AS id#37]
! +- Relation
default.t5[id#29] parquet
13:51:52.378 ERROR org.apache.spark.sql.catalyst.rules.PlanChangeLogger:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
Union false, false Union false,
false
:- Union false, false :- Union false,
false
!: :- Project [cast(id#34 as decimal(22,5)) AS id#36] : :- Project
[cast(cast(id#25 as decimal(21,4)) as decimal(22,5)) AS id#36]
!: : +- Project [cast(id#25 as decimal(21,4)) AS id#34] : : +-
Relation default.t1[id#25] parquet
!: : +- Relation default.t1[id#25] parquet : +- Project
[cast(cast(id#26 as decimal(21,4)) as decimal(22,5)) AS id#49]
!: +- Project [cast(id#48 as decimal(22,5)) AS id#49] : +-
Relation default.t2[id#26] parquet
!: +- Project [cast(id#26 as decimal(21,4)) AS id#48] :- Project
[cast(id#27 as decimal(22,5)) AS id#47]
!: +- Relation default.t2[id#26] parquet : +- Relation
default.t3[id#27] parquet
!:- Project [cast(id#27 as decimal(22,5)) AS id#47] :- Project
[cast(id#28 as decimal(22,5)) AS id#44]
!: +- Relation default.t3[id#27] parquet : +- Relation
default.t4[id#28] parquet
!:- Project [cast(id#28 as decimal(22,5)) AS id#44] +- Project
[cast(id#29 as decimal(22,5)) AS id#37]
!: +- Relation default.t4[id#28] parquet +- Relation
default.t5[id#29] parquet
!+- Project [cast(id#29 as decimal(22,5)) AS id#37]
! +- Relation default.t5[id#29] parquet
13:51:52.391 ERROR org.apache.spark.sql.catalyst.rules.PlanChangeLogger:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineUnions ===
Union false, false
Union false, false
!:- Union false, false
:- Project [cast(cast(id#25 as decimal(21,4)) as decimal(22,5)) AS id#36]
!: :- Project [cast(cast(id#25 as decimal(21,4)) as decimal(22,5)) AS
id#36] : +- Relation default.t1[id#25] parquet
!: : +- Relation default.t1[id#25] parquet
:- Project [cast(cast(id#26 as decimal(21,4)) as decimal(22,5)) AS id#49]
!: +- Project [cast(cast(id#26 as decimal(21,4)) as decimal(22,5)) AS
id#49] : +- Relation default.t2[id#26] parquet
!: +- Relation default.t2[id#26] parquet
:- Project [cast(id#27 as decimal(22,5)) AS id#47]
!:- Project [cast(id#27 as decimal(22,5)) AS id#47]
: +- Relation default.t3[id#27] parquet
!: +- Relation default.t3[id#27] parquet
:- Project [cast(id#28 as decimal(22,5)) AS id#44]
!:- Project [cast(id#28 as decimal(22,5)) AS id#44]
: +- Relation default.t4[id#28] parquet
!: +- Relation default.t4[id#28] parquet
+- Project [cast(id#29 as decimal(22,5)) AS id#37]
!+- Project [cast(id#29 as decimal(22,5)) AS id#37]
+- Relation default.t5[id#29] parquet
! +- Relation default.t5[id#29] parquet
```
After this pr:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineUnions ===
Union false, false
Union false, false
!:- Project [cast(id#34 as decimal(22,5)) AS id#36] :-
Project [cast(cast(cast(cast(id#25 as decimal(19,2)) as decimal(20,3)) as
decimal(21,4)) as decimal(22,5)) AS id#36]
!: +- Union false, false :
+- Relation default.t1[id#25] parquet
!: :- Project [cast(id#32 as decimal(21,4)) AS id#34] :-
Project [cast(cast(cast(cast(id#26 as decimal(19,2)) as decimal(20,3)) as
decimal(21,4)) as decimal(22,5)) AS id#46]
!: : +- Union false, false :
+- Relation default.t2[id#26] parquet
!: : :- Project [cast(id#30 as decimal(20,3)) AS id#32] :-
Project [cast(cast(cast(id#27 as decimal(20,3)) as decimal(21,4)) as
decimal(22,5)) AS id#45]
!: : : +- Union false, false :
+- Relation default.t3[id#27] parquet
!: : : :- Project [cast(id#25 as decimal(19,2)) AS id#30] :-
Project [cast(cast(id#28 as decimal(21,4)) as decimal(22,5)) AS id#44]
!: : : : +- Relation default.t1[id#25] parquet :
+- Relation default.t4[id#28] parquet
!: : : +- Project [cast(id#26 as decimal(19,2)) AS id#31] +-
Project [cast(id#29 as decimal(22,5)) AS id#37]
!: : : +- Relation default.t2[id#26] parquet
+- Relation default.t5[id#29] parquet
!: : +- Project [cast(id#27 as decimal(20,3)) AS id#33]
!: : +- Relation default.t3[id#27] parquet
!: +- Project [cast(id#28 as decimal(21,4)) AS id#35]
!: +- Relation default.t4[id#28] parquet
!+- Project [cast(id#29 as decimal(22,5)) AS id#37]
! +- Relation default.t5[id#29] parquet
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]