allisonwang-db commented on code in PR #36216:
URL: https://github.com/apache/spark/pull/36216#discussion_r858055802
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -60,6 +60,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
override protected val excludedOnceBatches: Set[String] =
Set(
"PartitionPruning",
+ "RewriteSubquery",
Review Comment:
I discovered that this Once batch is not idempotent. `ColumnPruning` and
`CollapseProject` can be applied multiple times after correlated IN/EXISTS
subqueries are rewritten. Happy to discuss other ways to fix/improve this
batch. cc @cloud-fan
Attached the plan change log for the test case:
```
=== Applying Rule
org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery ===
Aggregate [count(1) AS count(1)#164L]
Aggregate
[count(1) AS count(1)#164L]
+- Project
+- Project
! +- Filter NOT exists#157 [name#117 && employer#122 && (name#152.first =
name#117.first) && (employer#153.name = employer#122.company.name)] +-
Join LeftAnti, ((name#152.first = name#117.first) AND (employer#153.name =
employer#122.company.name))
! : +- Project [null AS NULL#163, name#152, employer#153]
:-
Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet
! : +- Relation [id#151,name#152,employer#153] parquet
+-
Project [null AS NULL#163, name#152, employer#153]
! +- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet +- Relation
[id#151,name#152,employer#153] parquet
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
Aggregate [count(1) AS count(1)#164L]
Aggregate [count(1) AS
count(1)#164L]
+- Project
+- Project
! +- Join LeftAnti, ((name#152.first = name#117.first) AND
(employer#153.name = employer#122.company.name)) +- Join
LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 =
_extract_name#169))
! :- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet :- Project [id#116, name#117.first AS _extract_first#167,
address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS
_extract_name#169, relations#123, p#124]
! +- Project [null AS NULL#163, name#152, employer#153]
: +- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet
! +- Relation [id#151,name#152,employer#153] parquet
+- Project
[_extract_first#166, _extract_name#168]
!
+- Project
[name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
!
+- Project
[name#152, employer#153]
!
+- Relation
[id#151,name#152,employer#153] parquet
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
Aggregate [count(1) AS count(1)#164L]
Aggregate [count(1) AS count(1)#164L]
+- Project
+- Project
+- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND
(_extract_name#168 = _extract_name#169))
+- Join LeftAnti,
((_extract_first#166 = _extract_first#167) AND (_extract_name#168 =
_extract_name#169))
:- Project [id#116, name#117.first AS _extract_first#167,
address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS
_extract_name#169, relations#123, p#124] :- Project [id#116,
name#117.first AS _extract_first#167, address#118, pets#119, friends#120,
relatives#121, employer#122.company.name AS _extract_name#169, relations#123,
p#124]
: +- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet :
+- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet
! +- Project [_extract_first#166, _extract_name#168]
+- Project [name#152.first AS
_extract_first#166, employer#153.name AS _extract_name#168]
! +- Project [name#152.first AS _extract_first#166,
employer#153.name AS _extract_name#168]
+- Relation
[id#151,name#152,employer#153] parquet
! +- Project [name#152, employer#153]
! +- Relation [id#151,name#152,employer#153] parquet
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
Aggregate [count(1) AS count(1)#164L]
Aggregate [count(1) AS count(1)#164L]
+- Project
+- Project
+- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND
(_extract_name#168 = _extract_name#169))
+- Join LeftAnti,
((_extract_first#166 = _extract_first#167) AND (_extract_name#168 =
_extract_name#169))
! :- Project [id#116, name#117.first AS _extract_first#167,
address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS
_extract_name#169, relations#123, p#124] :- Project
[_extract_first#167, _extract_name#169]
! : +- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet :
+- Project [name#117.first AS _extract_first#167, employer#122.company.name AS
_extract_name#169]
! +- Project [name#152.first AS _extract_first#166, employer#153.name
AS _extract_name#168]
: +- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet
! +- Relation [id#151,name#152,employer#153] parquet
+- Project [name#152.first AS
_extract_first#166, employer#153.name AS _extract_name#168]
!
+- Relation
[id#151,name#152,employer#153] parquet
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
Aggregate [count(1) AS count(1)#164L]
Aggregate [count(1) AS
count(1)#164L]
+- Project
+- Project
+- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND
(_extract_name#168 = _extract_name#169)) +- Join
LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 =
_extract_name#169))
! :- Project [_extract_first#167, _extract_name#169]
:- Project
[name#117.first AS _extract_first#167, employer#122.company.name AS
_extract_name#169]
! : +- Project [name#117.first AS _extract_first#167,
employer#122.company.name AS _extract_name#169]
: +- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet
! : +- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet +- Project [name#152.first AS _extract_first#166,
employer#153.name AS _extract_name#168]
! +- Project [name#152.first AS _extract_first#166, employer#153.name
AS _extract_name#168] +- Relation
[id#151,name#152,employer#153] parquet
! +- Relation [id#151,name#152,employer#153] parquet
=== Result of Batch RewriteSubquery ===
Aggregate [count(1) AS count(1)#164L]
Aggregate
[count(1) AS count(1)#164L]
+- Project
+- Project
! +- Filter NOT exists#157 [name#117 && employer#122 && (name#152.first =
name#117.first) && (employer#153.name = employer#122.company.name)] +-
Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND
(_extract_name#168 = _extract_name#169))
! : +- Project [null AS NULL#163, name#152, employer#153]
:-
Project [name#117.first AS _extract_first#167, employer#122.company.name AS
_extract_name#169]
! : +- Relation [id#151,name#152,employer#153] parquet
:
+- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet
! +- Relation
[id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124]
parquet +- Project [name#152.first AS
_extract_first#166, employer#153.name AS _extract_name#168]
!
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]