This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ca1a1d0710d [SPARK-40628][SQL] Do not push complex left semi/anti join 
condition through project
ca1a1d0710d is described below

commit ca1a1d0710d7fdeac3ed6afa075b214e342fae08
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Thu Oct 6 22:50:31 2022 -0700

    [SPARK-40628][SQL] Do not push complex left semi/anti join condition 
through project
    
    ### What changes were proposed in this pull request?
    
    This PR makes `PushDownLeftSemiAntiJoin` do not push complex left semi/anti 
join condition through project.
    
    ### Why are the changes needed?
    
    It will impact performance because the complex expression will evaluate 
three times if it is SortMergeJoin. For example:
    ```sql
    CREATE TABLE t1(item_id BIGINT, event_type STRING, dt STRING) USING parquet 
PARTITIONED BY (dt);
    CREATE TABLE t2(item_id BIGINT, cal_dt DATE) using parquet;
    set spark.sql.autoBroadcastJoinThreshold=-1;
    
    SELECT item_id,
           event_type
    FROM   (
                  SELECT *,
                         To_date(t1.dt, 'yyyyMMdd') AS new_dt
                  FROM   t1) tmp LEFT SEMI
    JOIN   t2
    ON     tmp.item_id = t2.item_id
    AND    tmp.new_dt = t2.cal_dt;
    ```
    Before this PR, `cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, 
Some(America/Los_Angeles), false) as date)` will evaluate three times:
    ```
    AdaptiveSparkPlan isFinalPlan=false
    +- Project [item_id#28L, event_type#29]
       +- SortMergeJoin [item_id#28L, cast(gettimestamp(dt#30, yyyyMMdd, 
TimestampType, Some(America/Los_Angeles), false) as date)], [item_id#31L, 
cal_dt#32], LeftSemi
          :- Sort [item_id#28L ASC NULLS FIRST, cast(gettimestamp(dt#30, 
yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date) ASC NULLS 
FIRST], false, 0
          :  +- Exchange hashpartitioning(item_id#28L, cast(gettimestamp(dt#30, 
yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date), 5), 
ENSURE_REQUIREMENTS, [plan_id=110]
          :     +- Filter isnotnull(item_id#28L)
          :        +- FileScan parquet 
spark_catalog.default.t1[item_id#28L,event_type#29,dt#30]
          +- Sort [item_id#31L ASC NULLS FIRST, cal_dt#32 ASC NULLS FIRST], 
false, 0
             +- Exchange hashpartitioning(item_id#31L, cal_dt#32, 5), 
ENSURE_REQUIREMENTS, [plan_id=111]
                +- Filter (isnotnull(item_id#31L) AND isnotnull(cal_dt#32))
                   +- FileScan parquet 
spark_catalog.default.t2[item_id#31L,cal_dt#32]
    ```
    The task stack trace:
    ```
    java.base17.0.4.1/java.text.DecimalFormat.parse(DecimalFormat.java:2149)
    
java.base17.0.4.1/java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1935)
    
java.base17.0.4.1/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1545)
    java.base17.0.4.1/java.text.DateFormat.parse(DateFormat.java:397)
    
app//org.apache.spark.sql.catalyst.util.LegacySimpleTimestampFormatter.parse(TimestampFormatter.scala:237)
    
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering.Cast_0$(Unknown
 Source)
    
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering.compare(Unknown
 Source)
    
app//org.apache.spark.sql.catalyst.expressions.BaseOrdering.compare(ordering.scala:29)
    ...
    ```
    
    After this PR:
    ```
    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- Project [item_id#28L, event_type#29]
       +- SortMergeJoin [item_id#28L, new_dt#37], [item_id#31L, cal_dt#32], 
LeftSemi
          :- Sort [item_id#28L ASC NULLS FIRST, new_dt#37 ASC NULLS FIRST], 
false, 0
          :  +- Exchange hashpartitioning(item_id#28L, new_dt#37, 5), 
ENSURE_REQUIREMENTS, [plan_id=110]
          :     +- Project [item_id#28L, event_type#29, 
cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, Some(America/Los_Angeles), 
false) as date) AS new_dt#37]
          :        +- Filter isnotnull(item_id#28L)
          :           +- FileScan parquet 
spark_catalog.default.t1[item_id#28L,event_type#29,dt#30]
          +- Sort [item_id#31L ASC NULLS FIRST, cal_dt#32 ASC NULLS FIRST], 
false, 0
             +- Exchange hashpartitioning(item_id#31L, cal_dt#32, 5), 
ENSURE_REQUIREMENTS, [plan_id=111]
                +- Filter (isnotnull(item_id#31L) AND isnotnull(cal_dt#32))
                   +- FileScan parquet 
spark_catalog.default.t2[item_id#31L,cal_dt#32]
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #38069 from wangyum/SPARK-40628.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../optimizer/PushDownLeftSemiAntiJoin.scala       |  15 +-
 .../optimizer/LeftSemiAntiJoinPushDownSuite.scala  |   8 +
 .../approved-plans-v1_4/q8.sf100/explain.txt       | 202 ++++++++++-----------
 .../approved-plans-v1_4/q8.sf100/simplified.txt    |  66 +++----
 .../approved-plans-v1_4/q8/explain.txt             | 170 ++++++++---------
 .../approved-plans-v1_4/q8/simplified.txt          |  52 +++---
 6 files changed, 263 insertions(+), 250 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
index 31b9d604060..9f3c7ef9c28 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
@@ -38,7 +38,7 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan]
   def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
     _.containsPattern(LEFT_SEMI_OR_ANTI_JOIN), ruleId) {
     // LeftSemi/LeftAnti over Project
-    case Join(p @ Project(pList, gChild), rightOp, LeftSemiOrAnti(joinType), 
joinCond, hint)
+    case j @ Join(p @ Project(pList, gChild), rightOp, 
LeftSemiOrAnti(joinType), joinCond, hint)
         if pList.forall(_.deterministic) &&
         !pList.exists(ScalarSubquery.hasCorrelatedScalarSubquery) &&
         canPushThroughCondition(Seq(gChild), joinCond, rightOp) =>
@@ -47,12 +47,17 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan]
         p.copy(child = Join(gChild, rightOp, joinType, joinCond, hint))
       } else {
         val aliasMap = getAliasMap(p)
-        val newJoinCond = if (aliasMap.nonEmpty) {
-          Option(replaceAlias(joinCond.get, aliasMap))
+        // Do not push complex join condition
+        if (aliasMap.forall(_._2.child.children.isEmpty)) {
+          val newJoinCond = if (aliasMap.nonEmpty) {
+            Option(replaceAlias(joinCond.get, aliasMap))
+          } else {
+            joinCond
+          }
+          p.copy(child = Join(gChild, rightOp, joinType, newJoinCond, hint))
         } else {
-          joinCond
+          j
         }
-        p.copy(child = Join(gChild, rightOp, joinType, newJoinCond, hint))
       }
 
     // LeftSemi/LeftAnti over Aggregate, only push down if join can be planned 
as broadcast join.
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
index 77f58746dfc..9f77f448d23 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
@@ -483,4 +483,12 @@ class LeftSemiPushdownSuite extends PlanTest {
     }
   }
 
+  test("SPARK-40628: Do not push complex left semi/anti join condition through 
project") {
+    val originalQuery = testRelation
+      .select(($"a" + 1).as("new_a"))
+      .join(testRelation1, joinType = LeftSemi, condition = Some($"new_a" === 
$"d"))
+      .analyze
+
+    comparePlans(Optimize.execute(originalQuery), originalQuery)
+  }
 }
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt
index ca6a5cc1ffb..d443723b063 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt
@@ -24,30 +24,30 @@ TakeOrderedAndProject (49)
                      +- * HashAggregate (41)
                         +- Exchange (40)
                            +- * HashAggregate (39)
-                              +- * Project (38)
-                                 +- * BroadcastHashJoin LeftSemi BuildRight 
(37)
-                                    :- * Filter (17)
-                                    :  +- * ColumnarToRow (16)
-                                    :     +- Scan parquet 
spark_catalog.default.customer_address (15)
-                                    +- BroadcastExchange (36)
-                                       +- * Project (35)
-                                          +- * Filter (34)
-                                             +- * HashAggregate (33)
-                                                +- Exchange (32)
-                                                   +- * HashAggregate (31)
-                                                      +- * Project (30)
-                                                         +- * SortMergeJoin 
Inner (29)
-                                                            :- * Sort (22)
-                                                            :  +- Exchange (21)
-                                                            :     +- * Filter 
(20)
-                                                            :        +- * 
ColumnarToRow (19)
-                                                            :           +- 
Scan parquet spark_catalog.default.customer_address (18)
-                                                            +- * Sort (28)
-                                                               +- Exchange (27)
-                                                                  +- * Project 
(26)
-                                                                     +- * 
Filter (25)
-                                                                        +- * 
ColumnarToRow (24)
-                                                                           +- 
Scan parquet spark_catalog.default.customer (23)
+                              +- * BroadcastHashJoin LeftSemi BuildRight (38)
+                                 :- * Project (18)
+                                 :  +- * Filter (17)
+                                 :     +- * ColumnarToRow (16)
+                                 :        +- Scan parquet 
spark_catalog.default.customer_address (15)
+                                 +- BroadcastExchange (37)
+                                    +- * Project (36)
+                                       +- * Filter (35)
+                                          +- * HashAggregate (34)
+                                             +- Exchange (33)
+                                                +- * HashAggregate (32)
+                                                   +- * Project (31)
+                                                      +- * SortMergeJoin Inner 
(30)
+                                                         :- * Sort (23)
+                                                         :  +- Exchange (22)
+                                                         :     +- * Filter (21)
+                                                         :        +- * 
ColumnarToRow (20)
+                                                         :           +- Scan 
parquet spark_catalog.default.customer_address (19)
+                                                         +- * Sort (29)
+                                                            +- Exchange (28)
+                                                               +- * Project 
(27)
+                                                                  +- * Filter 
(26)
+                                                                     +- * 
ColumnarToRow (25)
+                                                                        +- 
Scan parquet spark_catalog.default.customer (24)
 
 
 (1) Scan parquet spark_catalog.default.store_sales
@@ -127,139 +127,139 @@ Input [1]: [ca_zip#9]
 Input [1]: [ca_zip#9]
 Condition : (substr(ca_zip#9, 1, 5) INSET 10144, 10336, 10390, 10445, 10516, 
10567, 11101, 11356, 11376, 11489, 11634, 11928, 12305, 13354, 13375, 13376, 
13394, 13595, 13695, 13955, 14060, 14089, 14171, 14328, 14663, 14867, 14922, 
15126, 15146, 15371, 15455, 15559, 15723, 15734, 15765, 15798, 15882, 16021, 
16725, 16807, 17043, 17183, 17871, 17879, 17920, 18119, 18270, 18376, 18383, 
18426, 18652, 18767, 18799, 18840, 18842, 18845, 18906, 19430, 19505, 19512, 
19515, 19736, 19769, 19849, 20 [...]
 
-(18) Scan parquet spark_catalog.default.customer_address
-Output [2]: [ca_address_sk#10, ca_zip#11]
+(18) Project [codegen id : 11]
+Output [1]: [substr(ca_zip#9, 1, 5) AS ca_zip#10]
+Input [1]: [ca_zip#9]
+
+(19) Scan parquet spark_catalog.default.customer_address
+Output [2]: [ca_address_sk#11, ca_zip#12]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/customer_address]
 PushedFilters: [IsNotNull(ca_address_sk)]
 ReadSchema: struct<ca_address_sk:int,ca_zip:string>
 
-(19) ColumnarToRow [codegen id : 5]
-Input [2]: [ca_address_sk#10, ca_zip#11]
+(20) ColumnarToRow [codegen id : 5]
+Input [2]: [ca_address_sk#11, ca_zip#12]
 
-(20) Filter [codegen id : 5]
-Input [2]: [ca_address_sk#10, ca_zip#11]
-Condition : isnotnull(ca_address_sk#10)
+(21) Filter [codegen id : 5]
+Input [2]: [ca_address_sk#11, ca_zip#12]
+Condition : isnotnull(ca_address_sk#11)
 
-(21) Exchange
-Input [2]: [ca_address_sk#10, ca_zip#11]
-Arguments: hashpartitioning(ca_address_sk#10, 5), ENSURE_REQUIREMENTS, 
[plan_id=3]
+(22) Exchange
+Input [2]: [ca_address_sk#11, ca_zip#12]
+Arguments: hashpartitioning(ca_address_sk#11, 5), ENSURE_REQUIREMENTS, 
[plan_id=3]
 
-(22) Sort [codegen id : 6]
-Input [2]: [ca_address_sk#10, ca_zip#11]
-Arguments: [ca_address_sk#10 ASC NULLS FIRST], false, 0
+(23) Sort [codegen id : 6]
+Input [2]: [ca_address_sk#11, ca_zip#12]
+Arguments: [ca_address_sk#11 ASC NULLS FIRST], false, 0
 
-(23) Scan parquet spark_catalog.default.customer
-Output [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13]
+(24) Scan parquet spark_catalog.default.customer
+Output [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/customer]
 PushedFilters: [IsNotNull(c_preferred_cust_flag), 
EqualTo(c_preferred_cust_flag,Y), IsNotNull(c_current_addr_sk)]
 ReadSchema: struct<c_current_addr_sk:int,c_preferred_cust_flag:string>
 
-(24) ColumnarToRow [codegen id : 7]
-Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13]
+(25) ColumnarToRow [codegen id : 7]
+Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14]
 
-(25) Filter [codegen id : 7]
-Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13]
-Condition : ((isnotnull(c_preferred_cust_flag#13) AND 
(c_preferred_cust_flag#13 = Y)) AND isnotnull(c_current_addr_sk#12))
+(26) Filter [codegen id : 7]
+Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14]
+Condition : ((isnotnull(c_preferred_cust_flag#14) AND 
(c_preferred_cust_flag#14 = Y)) AND isnotnull(c_current_addr_sk#13))
 
-(26) Project [codegen id : 7]
-Output [1]: [c_current_addr_sk#12]
-Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13]
+(27) Project [codegen id : 7]
+Output [1]: [c_current_addr_sk#13]
+Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14]
 
-(27) Exchange
-Input [1]: [c_current_addr_sk#12]
-Arguments: hashpartitioning(c_current_addr_sk#12, 5), ENSURE_REQUIREMENTS, 
[plan_id=4]
+(28) Exchange
+Input [1]: [c_current_addr_sk#13]
+Arguments: hashpartitioning(c_current_addr_sk#13, 5), ENSURE_REQUIREMENTS, 
[plan_id=4]
 
-(28) Sort [codegen id : 8]
-Input [1]: [c_current_addr_sk#12]
-Arguments: [c_current_addr_sk#12 ASC NULLS FIRST], false, 0
+(29) Sort [codegen id : 8]
+Input [1]: [c_current_addr_sk#13]
+Arguments: [c_current_addr_sk#13 ASC NULLS FIRST], false, 0
 
-(29) SortMergeJoin [codegen id : 9]
-Left keys [1]: [ca_address_sk#10]
-Right keys [1]: [c_current_addr_sk#12]
+(30) SortMergeJoin [codegen id : 9]
+Left keys [1]: [ca_address_sk#11]
+Right keys [1]: [c_current_addr_sk#13]
 Join type: Inner
 Join condition: None
 
-(30) Project [codegen id : 9]
-Output [1]: [ca_zip#11]
-Input [3]: [ca_address_sk#10, ca_zip#11, c_current_addr_sk#12]
+(31) Project [codegen id : 9]
+Output [1]: [ca_zip#12]
+Input [3]: [ca_address_sk#11, ca_zip#12, c_current_addr_sk#13]
 
-(31) HashAggregate [codegen id : 9]
-Input [1]: [ca_zip#11]
-Keys [1]: [ca_zip#11]
+(32) HashAggregate [codegen id : 9]
+Input [1]: [ca_zip#12]
+Keys [1]: [ca_zip#12]
 Functions [1]: [partial_count(1)]
-Aggregate Attributes [1]: [count#14]
-Results [2]: [ca_zip#11, count#15]
+Aggregate Attributes [1]: [count#15]
+Results [2]: [ca_zip#12, count#16]
 
-(32) Exchange
-Input [2]: [ca_zip#11, count#15]
-Arguments: hashpartitioning(ca_zip#11, 5), ENSURE_REQUIREMENTS, [plan_id=5]
+(33) Exchange
+Input [2]: [ca_zip#12, count#16]
+Arguments: hashpartitioning(ca_zip#12, 5), ENSURE_REQUIREMENTS, [plan_id=5]
 
-(33) HashAggregate [codegen id : 10]
-Input [2]: [ca_zip#11, count#15]
-Keys [1]: [ca_zip#11]
+(34) HashAggregate [codegen id : 10]
+Input [2]: [ca_zip#12, count#16]
+Keys [1]: [ca_zip#12]
 Functions [1]: [count(1)]
-Aggregate Attributes [1]: [count(1)#16]
-Results [2]: [substr(ca_zip#11, 1, 5) AS ca_zip#17, count(1)#16 AS cnt#18]
+Aggregate Attributes [1]: [count(1)#17]
+Results [2]: [substr(ca_zip#12, 1, 5) AS ca_zip#18, count(1)#17 AS cnt#19]
 
-(34) Filter [codegen id : 10]
-Input [2]: [ca_zip#17, cnt#18]
-Condition : (cnt#18 > 10)
+(35) Filter [codegen id : 10]
+Input [2]: [ca_zip#18, cnt#19]
+Condition : (cnt#19 > 10)
 
-(35) Project [codegen id : 10]
-Output [1]: [ca_zip#17]
-Input [2]: [ca_zip#17, cnt#18]
+(36) Project [codegen id : 10]
+Output [1]: [ca_zip#18]
+Input [2]: [ca_zip#18, cnt#19]
 
-(36) BroadcastExchange
-Input [1]: [ca_zip#17]
+(37) BroadcastExchange
+Input [1]: [ca_zip#18]
 Arguments: HashedRelationBroadcastMode(List(coalesce(input[0, string, true], 
), isnull(input[0, string, true])),false), [plan_id=6]
 
-(37) BroadcastHashJoin [codegen id : 11]
-Left keys [2]: [coalesce(substr(ca_zip#9, 1, 5), ), isnull(substr(ca_zip#9, 1, 
5))]
-Right keys [2]: [coalesce(ca_zip#17, ), isnull(ca_zip#17)]
+(38) BroadcastHashJoin [codegen id : 11]
+Left keys [2]: [coalesce(ca_zip#10, ), isnull(ca_zip#10)]
+Right keys [2]: [coalesce(ca_zip#18, ), isnull(ca_zip#18)]
 Join type: LeftSemi
 Join condition: None
 
-(38) Project [codegen id : 11]
-Output [1]: [substr(ca_zip#9, 1, 5) AS ca_zip#19]
-Input [1]: [ca_zip#9]
-
 (39) HashAggregate [codegen id : 11]
-Input [1]: [ca_zip#19]
-Keys [1]: [ca_zip#19]
+Input [1]: [ca_zip#10]
+Keys [1]: [ca_zip#10]
 Functions: []
 Aggregate Attributes: []
-Results [1]: [ca_zip#19]
+Results [1]: [ca_zip#10]
 
 (40) Exchange
-Input [1]: [ca_zip#19]
-Arguments: hashpartitioning(ca_zip#19, 5), ENSURE_REQUIREMENTS, [plan_id=7]
+Input [1]: [ca_zip#10]
+Arguments: hashpartitioning(ca_zip#10, 5), ENSURE_REQUIREMENTS, [plan_id=7]
 
 (41) HashAggregate [codegen id : 12]
-Input [1]: [ca_zip#19]
-Keys [1]: [ca_zip#19]
+Input [1]: [ca_zip#10]
+Keys [1]: [ca_zip#10]
 Functions: []
 Aggregate Attributes: []
-Results [1]: [ca_zip#19]
+Results [1]: [ca_zip#10]
 
 (42) Exchange
-Input [1]: [ca_zip#19]
-Arguments: hashpartitioning(substr(ca_zip#19, 1, 2), 5), ENSURE_REQUIREMENTS, 
[plan_id=8]
+Input [1]: [ca_zip#10]
+Arguments: hashpartitioning(substr(ca_zip#10, 1, 2), 5), ENSURE_REQUIREMENTS, 
[plan_id=8]
 
 (43) Sort [codegen id : 13]
-Input [1]: [ca_zip#19]
-Arguments: [substr(ca_zip#19, 1, 2) ASC NULLS FIRST], false, 0
+Input [1]: [ca_zip#10]
+Arguments: [substr(ca_zip#10, 1, 2) ASC NULLS FIRST], false, 0
 
 (44) SortMergeJoin [codegen id : 14]
 Left keys [1]: [substr(s_zip#8, 1, 2)]
-Right keys [1]: [substr(ca_zip#19, 1, 2)]
+Right keys [1]: [substr(ca_zip#10, 1, 2)]
 Join type: Inner
 Join condition: None
 
 (45) Project [codegen id : 14]
 Output [2]: [ss_net_profit#2, s_store_name#7]
-Input [4]: [ss_net_profit#2, s_store_name#7, s_zip#8, ca_zip#19]
+Input [4]: [ss_net_profit#2, s_store_name#7, s_zip#8, ca_zip#10]
 
 (46) HashAggregate [codegen id : 14]
 Input [2]: [ss_net_profit#2, s_store_name#7]
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/simplified.txt
index 912904d5643..86dd6134fc2 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/simplified.txt
@@ -49,42 +49,42 @@ TakeOrderedAndProject [s_store_name,sum(ss_net_profit)]
                                   Exchange [ca_zip] #6
                                     WholeStageCodegen (11)
                                       HashAggregate [ca_zip]
-                                        Project [ca_zip]
-                                          BroadcastHashJoin [ca_zip,ca_zip]
+                                        BroadcastHashJoin [ca_zip,ca_zip]
+                                          Project [ca_zip]
                                             Filter [ca_zip]
                                               ColumnarToRow
                                                 InputAdapter
                                                   Scan parquet 
spark_catalog.default.customer_address [ca_zip]
-                                            InputAdapter
-                                              BroadcastExchange #7
-                                                WholeStageCodegen (10)
-                                                  Project [ca_zip]
-                                                    Filter [cnt]
-                                                      HashAggregate 
[ca_zip,count] [count(1),ca_zip,cnt,count]
-                                                        InputAdapter
-                                                          Exchange [ca_zip] #8
-                                                            WholeStageCodegen 
(9)
-                                                              HashAggregate 
[ca_zip] [count,count]
-                                                                Project 
[ca_zip]
-                                                                  
SortMergeJoin [ca_address_sk,c_current_addr_sk]
-                                                                    
InputAdapter
-                                                                      
WholeStageCodegen (6)
-                                                                        Sort 
[ca_address_sk]
-                                                                          
InputAdapter
-                                                                            
Exchange [ca_address_sk] #9
-                                                                              
WholeStageCodegen (5)
-                                                                               
 Filter [ca_address_sk]
+                                          InputAdapter
+                                            BroadcastExchange #7
+                                              WholeStageCodegen (10)
+                                                Project [ca_zip]
+                                                  Filter [cnt]
+                                                    HashAggregate 
[ca_zip,count] [count(1),ca_zip,cnt,count]
+                                                      InputAdapter
+                                                        Exchange [ca_zip] #8
+                                                          WholeStageCodegen (9)
+                                                            HashAggregate 
[ca_zip] [count,count]
+                                                              Project [ca_zip]
+                                                                SortMergeJoin 
[ca_address_sk,c_current_addr_sk]
+                                                                  InputAdapter
+                                                                    
WholeStageCodegen (6)
+                                                                      Sort 
[ca_address_sk]
+                                                                        
InputAdapter
+                                                                          
Exchange [ca_address_sk] #9
+                                                                            
WholeStageCodegen (5)
+                                                                              
Filter [ca_address_sk]
+                                                                               
 ColumnarToRow
+                                                                               
   InputAdapter
+                                                                               
     Scan parquet spark_catalog.default.customer_address [ca_address_sk,ca_zip]
+                                                                  InputAdapter
+                                                                    
WholeStageCodegen (8)
+                                                                      Sort 
[c_current_addr_sk]
+                                                                        
InputAdapter
+                                                                          
Exchange [c_current_addr_sk] #10
+                                                                            
WholeStageCodegen (7)
+                                                                              
Project [c_current_addr_sk]
+                                                                               
 Filter [c_preferred_cust_flag,c_current_addr_sk]
                                                                                
   ColumnarToRow
                                                                                
     InputAdapter
-                                                                               
       Scan parquet spark_catalog.default.customer_address 
[ca_address_sk,ca_zip]
-                                                                    
InputAdapter
-                                                                      
WholeStageCodegen (8)
-                                                                        Sort 
[c_current_addr_sk]
-                                                                          
InputAdapter
-                                                                            
Exchange [c_current_addr_sk] #10
-                                                                              
WholeStageCodegen (7)
-                                                                               
 Project [c_current_addr_sk]
-                                                                               
   Filter [c_preferred_cust_flag,c_current_addr_sk]
-                                                                               
     ColumnarToRow
-                                                                               
       InputAdapter
-                                                                               
         Scan parquet spark_catalog.default.customer 
[c_current_addr_sk,c_preferred_cust_flag]
+                                                                               
       Scan parquet spark_catalog.default.customer 
[c_current_addr_sk,c_preferred_cust_flag]
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt
index bc6b4781839..24c3a657ddd 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt
@@ -21,27 +21,27 @@ TakeOrderedAndProject (43)
                   +- * HashAggregate (36)
                      +- Exchange (35)
                         +- * HashAggregate (34)
-                           +- * Project (33)
-                              +- * BroadcastHashJoin LeftSemi BuildRight (32)
-                                 :- * Filter (15)
-                                 :  +- * ColumnarToRow (14)
-                                 :     +- Scan parquet 
spark_catalog.default.customer_address (13)
-                                 +- BroadcastExchange (31)
-                                    +- * Project (30)
-                                       +- * Filter (29)
-                                          +- * HashAggregate (28)
-                                             +- Exchange (27)
-                                                +- * HashAggregate (26)
-                                                   +- * Project (25)
-                                                      +- * BroadcastHashJoin 
Inner BuildRight (24)
-                                                         :- * Filter (18)
-                                                         :  +- * ColumnarToRow 
(17)
-                                                         :     +- Scan parquet 
spark_catalog.default.customer_address (16)
-                                                         +- BroadcastExchange 
(23)
-                                                            +- * Project (22)
-                                                               +- * Filter (21)
-                                                                  +- * 
ColumnarToRow (20)
-                                                                     +- Scan 
parquet spark_catalog.default.customer (19)
+                           +- * BroadcastHashJoin LeftSemi BuildRight (33)
+                              :- * Project (16)
+                              :  +- * Filter (15)
+                              :     +- * ColumnarToRow (14)
+                              :        +- Scan parquet 
spark_catalog.default.customer_address (13)
+                              +- BroadcastExchange (32)
+                                 +- * Project (31)
+                                    +- * Filter (30)
+                                       +- * HashAggregate (29)
+                                          +- Exchange (28)
+                                             +- * HashAggregate (27)
+                                                +- * Project (26)
+                                                   +- * BroadcastHashJoin 
Inner BuildRight (25)
+                                                      :- * Filter (19)
+                                                      :  +- * ColumnarToRow 
(18)
+                                                      :     +- Scan parquet 
spark_catalog.default.customer_address (17)
+                                                      +- BroadcastExchange (24)
+                                                         +- * Project (23)
+                                                            +- * Filter (22)
+                                                               +- * 
ColumnarToRow (21)
+                                                                  +- Scan 
parquet spark_catalog.default.customer (20)
 
 
 (1) Scan parquet spark_catalog.default.store_sales
@@ -113,123 +113,123 @@ Input [1]: [ca_zip#9]
 Input [1]: [ca_zip#9]
 Condition : (substr(ca_zip#9, 1, 5) INSET 10144, 10336, 10390, 10445, 10516, 
10567, 11101, 11356, 11376, 11489, 11634, 11928, 12305, 13354, 13375, 13376, 
13394, 13595, 13695, 13955, 14060, 14089, 14171, 14328, 14663, 14867, 14922, 
15126, 15146, 15371, 15455, 15559, 15723, 15734, 15765, 15798, 15882, 16021, 
16725, 16807, 17043, 17183, 17871, 17879, 17920, 18119, 18270, 18376, 18383, 
18426, 18652, 18767, 18799, 18840, 18842, 18845, 18906, 19430, 19505, 19512, 
19515, 19736, 19769, 19849, 20 [...]
 
-(16) Scan parquet spark_catalog.default.customer_address
-Output [2]: [ca_address_sk#10, ca_zip#11]
+(16) Project [codegen id : 6]
+Output [1]: [substr(ca_zip#9, 1, 5) AS ca_zip#10]
+Input [1]: [ca_zip#9]
+
+(17) Scan parquet spark_catalog.default.customer_address
+Output [2]: [ca_address_sk#11, ca_zip#12]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/customer_address]
 PushedFilters: [IsNotNull(ca_address_sk)]
 ReadSchema: struct<ca_address_sk:int,ca_zip:string>
 
-(17) ColumnarToRow [codegen id : 4]
-Input [2]: [ca_address_sk#10, ca_zip#11]
+(18) ColumnarToRow [codegen id : 4]
+Input [2]: [ca_address_sk#11, ca_zip#12]
 
-(18) Filter [codegen id : 4]
-Input [2]: [ca_address_sk#10, ca_zip#11]
-Condition : isnotnull(ca_address_sk#10)
+(19) Filter [codegen id : 4]
+Input [2]: [ca_address_sk#11, ca_zip#12]
+Condition : isnotnull(ca_address_sk#11)
 
-(19) Scan parquet spark_catalog.default.customer
-Output [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13]
+(20) Scan parquet spark_catalog.default.customer
+Output [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/customer]
 PushedFilters: [IsNotNull(c_preferred_cust_flag), 
EqualTo(c_preferred_cust_flag,Y), IsNotNull(c_current_addr_sk)]
 ReadSchema: struct<c_current_addr_sk:int,c_preferred_cust_flag:string>
 
-(20) ColumnarToRow [codegen id : 3]
-Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13]
+(21) ColumnarToRow [codegen id : 3]
+Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14]
 
-(21) Filter [codegen id : 3]
-Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13]
-Condition : ((isnotnull(c_preferred_cust_flag#13) AND 
(c_preferred_cust_flag#13 = Y)) AND isnotnull(c_current_addr_sk#12))
+(22) Filter [codegen id : 3]
+Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14]
+Condition : ((isnotnull(c_preferred_cust_flag#14) AND 
(c_preferred_cust_flag#14 = Y)) AND isnotnull(c_current_addr_sk#13))
 
-(22) Project [codegen id : 3]
-Output [1]: [c_current_addr_sk#12]
-Input [2]: [c_current_addr_sk#12, c_preferred_cust_flag#13]
+(23) Project [codegen id : 3]
+Output [1]: [c_current_addr_sk#13]
+Input [2]: [c_current_addr_sk#13, c_preferred_cust_flag#14]
 
-(23) BroadcastExchange
-Input [1]: [c_current_addr_sk#12]
+(24) BroadcastExchange
+Input [1]: [c_current_addr_sk#13]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=2]
 
-(24) BroadcastHashJoin [codegen id : 4]
-Left keys [1]: [ca_address_sk#10]
-Right keys [1]: [c_current_addr_sk#12]
+(25) BroadcastHashJoin [codegen id : 4]
+Left keys [1]: [ca_address_sk#11]
+Right keys [1]: [c_current_addr_sk#13]
 Join type: Inner
 Join condition: None
 
-(25) Project [codegen id : 4]
-Output [1]: [ca_zip#11]
-Input [3]: [ca_address_sk#10, ca_zip#11, c_current_addr_sk#12]
+(26) Project [codegen id : 4]
+Output [1]: [ca_zip#12]
+Input [3]: [ca_address_sk#11, ca_zip#12, c_current_addr_sk#13]
 
-(26) HashAggregate [codegen id : 4]
-Input [1]: [ca_zip#11]
-Keys [1]: [ca_zip#11]
+(27) HashAggregate [codegen id : 4]
+Input [1]: [ca_zip#12]
+Keys [1]: [ca_zip#12]
 Functions [1]: [partial_count(1)]
-Aggregate Attributes [1]: [count#14]
-Results [2]: [ca_zip#11, count#15]
+Aggregate Attributes [1]: [count#15]
+Results [2]: [ca_zip#12, count#16]
 
-(27) Exchange
-Input [2]: [ca_zip#11, count#15]
-Arguments: hashpartitioning(ca_zip#11, 5), ENSURE_REQUIREMENTS, [plan_id=3]
+(28) Exchange
+Input [2]: [ca_zip#12, count#16]
+Arguments: hashpartitioning(ca_zip#12, 5), ENSURE_REQUIREMENTS, [plan_id=3]
 
-(28) HashAggregate [codegen id : 5]
-Input [2]: [ca_zip#11, count#15]
-Keys [1]: [ca_zip#11]
+(29) HashAggregate [codegen id : 5]
+Input [2]: [ca_zip#12, count#16]
+Keys [1]: [ca_zip#12]
 Functions [1]: [count(1)]
-Aggregate Attributes [1]: [count(1)#16]
-Results [2]: [substr(ca_zip#11, 1, 5) AS ca_zip#17, count(1)#16 AS cnt#18]
+Aggregate Attributes [1]: [count(1)#17]
+Results [2]: [substr(ca_zip#12, 1, 5) AS ca_zip#18, count(1)#17 AS cnt#19]
 
-(29) Filter [codegen id : 5]
-Input [2]: [ca_zip#17, cnt#18]
-Condition : (cnt#18 > 10)
+(30) Filter [codegen id : 5]
+Input [2]: [ca_zip#18, cnt#19]
+Condition : (cnt#19 > 10)
 
-(30) Project [codegen id : 5]
-Output [1]: [ca_zip#17]
-Input [2]: [ca_zip#17, cnt#18]
+(31) Project [codegen id : 5]
+Output [1]: [ca_zip#18]
+Input [2]: [ca_zip#18, cnt#19]
 
-(31) BroadcastExchange
-Input [1]: [ca_zip#17]
+(32) BroadcastExchange
+Input [1]: [ca_zip#18]
 Arguments: HashedRelationBroadcastMode(List(coalesce(input[0, string, true], 
), isnull(input[0, string, true])),false), [plan_id=4]
 
-(32) BroadcastHashJoin [codegen id : 6]
-Left keys [2]: [coalesce(substr(ca_zip#9, 1, 5), ), isnull(substr(ca_zip#9, 1, 
5))]
-Right keys [2]: [coalesce(ca_zip#17, ), isnull(ca_zip#17)]
+(33) BroadcastHashJoin [codegen id : 6]
+Left keys [2]: [coalesce(ca_zip#10, ), isnull(ca_zip#10)]
+Right keys [2]: [coalesce(ca_zip#18, ), isnull(ca_zip#18)]
 Join type: LeftSemi
 Join condition: None
 
-(33) Project [codegen id : 6]
-Output [1]: [substr(ca_zip#9, 1, 5) AS ca_zip#19]
-Input [1]: [ca_zip#9]
-
 (34) HashAggregate [codegen id : 6]
-Input [1]: [ca_zip#19]
-Keys [1]: [ca_zip#19]
+Input [1]: [ca_zip#10]
+Keys [1]: [ca_zip#10]
 Functions: []
 Aggregate Attributes: []
-Results [1]: [ca_zip#19]
+Results [1]: [ca_zip#10]
 
 (35) Exchange
-Input [1]: [ca_zip#19]
-Arguments: hashpartitioning(ca_zip#19, 5), ENSURE_REQUIREMENTS, [plan_id=5]
+Input [1]: [ca_zip#10]
+Arguments: hashpartitioning(ca_zip#10, 5), ENSURE_REQUIREMENTS, [plan_id=5]
 
 (36) HashAggregate [codegen id : 7]
-Input [1]: [ca_zip#19]
-Keys [1]: [ca_zip#19]
+Input [1]: [ca_zip#10]
+Keys [1]: [ca_zip#10]
 Functions: []
 Aggregate Attributes: []
-Results [1]: [ca_zip#19]
+Results [1]: [ca_zip#10]
 
 (37) BroadcastExchange
-Input [1]: [ca_zip#19]
+Input [1]: [ca_zip#10]
 Arguments: HashedRelationBroadcastMode(List(substr(input[0, string, true], 1, 
2)),false), [plan_id=6]
 
 (38) BroadcastHashJoin [codegen id : 8]
 Left keys [1]: [substr(s_zip#8, 1, 2)]
-Right keys [1]: [substr(ca_zip#19, 1, 2)]
+Right keys [1]: [substr(ca_zip#10, 1, 2)]
 Join type: Inner
 Join condition: None
 
 (39) Project [codegen id : 8]
 Output [2]: [ss_net_profit#2, s_store_name#7]
-Input [4]: [ss_net_profit#2, s_store_name#7, s_zip#8, ca_zip#19]
+Input [4]: [ss_net_profit#2, s_store_name#7, s_zip#8, ca_zip#10]
 
 (40) HashAggregate [codegen id : 8]
 Input [2]: [ss_net_profit#2, s_store_name#7]
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/simplified.txt
index 4be906d4f50..6ea5a786125 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/simplified.txt
@@ -40,33 +40,33 @@ TakeOrderedAndProject [s_store_name,sum(ss_net_profit)]
                             Exchange [ca_zip] #5
                               WholeStageCodegen (6)
                                 HashAggregate [ca_zip]
-                                  Project [ca_zip]
-                                    BroadcastHashJoin [ca_zip,ca_zip]
+                                  BroadcastHashJoin [ca_zip,ca_zip]
+                                    Project [ca_zip]
                                       Filter [ca_zip]
                                         ColumnarToRow
                                           InputAdapter
                                             Scan parquet 
spark_catalog.default.customer_address [ca_zip]
-                                      InputAdapter
-                                        BroadcastExchange #6
-                                          WholeStageCodegen (5)
-                                            Project [ca_zip]
-                                              Filter [cnt]
-                                                HashAggregate [ca_zip,count] 
[count(1),ca_zip,cnt,count]
-                                                  InputAdapter
-                                                    Exchange [ca_zip] #7
-                                                      WholeStageCodegen (4)
-                                                        HashAggregate [ca_zip] 
[count,count]
-                                                          Project [ca_zip]
-                                                            BroadcastHashJoin 
[ca_address_sk,c_current_addr_sk]
-                                                              Filter 
[ca_address_sk]
-                                                                ColumnarToRow
-                                                                  InputAdapter
-                                                                    Scan 
parquet spark_catalog.default.customer_address [ca_address_sk,ca_zip]
-                                                              InputAdapter
-                                                                
BroadcastExchange #8
-                                                                  
WholeStageCodegen (3)
-                                                                    Project 
[c_current_addr_sk]
-                                                                      Filter 
[c_preferred_cust_flag,c_current_addr_sk]
-                                                                        
ColumnarToRow
-                                                                          
InputAdapter
-                                                                            
Scan parquet spark_catalog.default.customer 
[c_current_addr_sk,c_preferred_cust_flag]
+                                    InputAdapter
+                                      BroadcastExchange #6
+                                        WholeStageCodegen (5)
+                                          Project [ca_zip]
+                                            Filter [cnt]
+                                              HashAggregate [ca_zip,count] 
[count(1),ca_zip,cnt,count]
+                                                InputAdapter
+                                                  Exchange [ca_zip] #7
+                                                    WholeStageCodegen (4)
+                                                      HashAggregate [ca_zip] 
[count,count]
+                                                        Project [ca_zip]
+                                                          BroadcastHashJoin 
[ca_address_sk,c_current_addr_sk]
+                                                            Filter 
[ca_address_sk]
+                                                              ColumnarToRow
+                                                                InputAdapter
+                                                                  Scan parquet 
spark_catalog.default.customer_address [ca_address_sk,ca_zip]
+                                                            InputAdapter
+                                                              
BroadcastExchange #8
+                                                                
WholeStageCodegen (3)
+                                                                  Project 
[c_current_addr_sk]
+                                                                    Filter 
[c_preferred_cust_flag,c_current_addr_sk]
+                                                                      
ColumnarToRow
+                                                                        
InputAdapter
+                                                                          Scan 
parquet spark_catalog.default.customer [c_current_addr_sk,c_preferred_cust_flag]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to