LuciferYang commented on PR #35149:
URL: https://github.com/apache/spark/pull/35149#issuecomment-1231688980

   @cloud-fan @ulysses-you Sorry to disturb you in a pr that has been merged, 
but I find that this pr may have some negative effects.
   
   I used [databricks 
spark-sql-pref](https://github.com/databricks/spark-sql-perf) + Spark 3.3 in 
spark-shell to run 3TB TPCDS q24a or q24b, the test code as follows:
   
   ```scala
   val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
   val databaseName = "tpcds_database"
   val scaleFactor = "3072"
   val format = "parquet"
   import com.databricks.spark.sql.perf.tpcds.TPCDSTables
   // just use to load tables
   val tables = new TPCDSTables(
         spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
         scaleFactor = scaleFactor,
         useDoubleForDecimal = false,useStringForDate = false)
   spark.sql(s"create database $databaseName")
   tables.createTemporaryTables(rootDir, format)
   spark.sql(s"use $databaseName")
   // TPCDS 24a
   val result = spark.sql(""" with ssales as
    (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
           i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) 
netpaid
    from store_sales, store_returns, store, item, customer, customer_address
    where ss_ticket_number = sr_ticket_number
      and ss_item_sk = sr_item_sk
      and ss_customer_sk = c_customer_sk
      and ss_item_sk = i_item_sk
      and ss_store_sk = s_store_sk
      and c_birth_country = upper(ca_country)
      and s_zip = ca_zip
    and s_market_id = 8
    group by c_last_name, c_first_name, s_store_name, ca_state, s_state, 
i_color,
             i_current_price, i_manager_id, i_units, i_size)
    select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
    from ssales
    where i_color = 'pale'
    group by c_last_name, c_first_name, s_store_name
    having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
    sc.stop() 
   ```
   
   
   The above test may failed due to `Stage cancelled because SparkContext was 
shut down` of stage 31 and stage 36 when AQE enabled as follows:
   
   <img width="1427" alt="image" 
src="https://user-images.githubusercontent.com/1475305/187451487-148e01b0-3a2c-46d7-a075-ba7d7393bc18.png";>
   
   <img width="1430" alt="image" 
src="https://user-images.githubusercontent.com/1475305/187451605-e9c1819b-4af8-47df-ab06-36c63eaa8bae.png";>
   
   <img width="1427" alt="image" 
src="https://user-images.githubusercontent.com/1475305/187451708-b9a0ea91-35e8-4c22-bb1e-8c4123a3f2ef.png";>
   
   
   The DAG corresponding to sql is as follows:
   
   <img width="421" alt="image" 
src="https://user-images.githubusercontent.com/1475305/187452035-4bb08940-9374-4336-b353-14fc9975a0fd.png";>
   
   
   And the  details as follows:
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan (42)
   +- == Final Plan ==
      LocalTableScan (1)
   +- == Initial Plan ==
      Filter (41)
      +- HashAggregate (40)
         +- Exchange (39)
            +- HashAggregate (38)
               +- HashAggregate (37)
                  +- Exchange (36)
                     +- HashAggregate (35)
                        +- Project (34)
                           +- BroadcastHashJoin Inner BuildRight (33)
                              :- Project (29)
                              :  +- BroadcastHashJoin Inner BuildRight (28)
                              :     :- Project (24)
                              :     :  +- BroadcastHashJoin Inner BuildRight 
(23)
                              :     :     :- Project (19)
                              :     :     :  +- BroadcastHashJoin Inner 
BuildRight (18)
                              :     :     :     :- Project (13)
                              :     :     :     :  +- SortMergeJoin Inner (12)
                              :     :     :     :     :- Sort (6)
                              :     :     :     :     :  +- Exchange (5)
                              :     :     :     :     :     +- Project (4)
                              :     :     :     :     :        +- Filter (3)
                              :     :     :     :     :           +- Scan 
parquet  (2)
                              :     :     :     :     +- Sort (11)
                              :     :     :     :        +- Exchange (10)
                              :     :     :     :           +- Project (9)
                              :     :     :     :              +- Filter (8)
                              :     :     :     :                 +- Scan 
parquet  (7)
                              :     :     :     +- BroadcastExchange (17)
                              :     :     :        +- Project (16)
                              :     :     :           +- Filter (15)
                              :     :     :              +- Scan parquet  (14)
                              :     :     +- BroadcastExchange (22)
                              :     :        +- Filter (21)
                              :     :           +- Scan parquet  (20)
                              :     +- BroadcastExchange (27)
                              :        +- Filter (26)
                              :           +- Scan parquet  (25)
                              +- BroadcastExchange (32)
                                 +- Filter (31)
                                    +- Scan parquet  (30)
   
   
   (1) LocalTableScan
   Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
   Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, 
paid#850]
   
   (2) Scan parquet 
   Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
   Batched: true
   Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales]
   PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), 
IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)]
   ReadSchema: 
struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)>
   
   (3) Filter
   Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
   Condition : (((isnotnull(ss_ticket_number#138L) AND 
isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND 
isnotnull(ss_customer_sk#132))
   
   (4) Project
   Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
   Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
   
   (5) Exchange
   Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
   Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), 
ENSURE_REQUIREMENTS, [id=#309]
   
   (6) Sort
   Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
   Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS 
FIRST], false, 0
   
   (7) Scan parquet 
   Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
   Batched: true
   Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns]
   PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)]
   ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint>
   
   (8) Filter
   Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
   Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177))
   
   (9) Project
   Output [2]: [sr_item_sk#177, sr_ticket_number#184L]
   Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
   
   (10) Exchange
   Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
   Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), 
ENSURE_REQUIREMENTS, [id=#310]
   
   (11) Sort
   Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
   Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS 
FIRST], false, 0
   
   (12) SortMergeJoin
   Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131]
   Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177]
   Join condition: None
   
   (13) Project
   Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_net_paid#149]
   Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L]
   
   (14) Scan parquet 
   Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]
   Batched: true
   Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store]
   PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), 
IsNotNull(s_store_sk), IsNotNull(s_zip)]
   ReadSchema: 
struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string>
   
   (15) Filter
   Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]
   Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND 
isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689))
   
   (16) Project
   Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
   Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]
   
   (17) BroadcastExchange
   Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
   Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#316]
   
   (18) BroadcastHashJoin
   Left keys [1]: [ss_store_sk#136]
   Right keys [1]: [s_store_sk#664]
   Join condition: None
   
   (19) Project
   Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, 
s_store_name#669, s_state#688, s_zip#689]
   Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
   
   (20) Scan parquet 
   Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
   Batched: true
   Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item]
   PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), 
IsNotNull(i_item_sk)]
   ReadSchema: 
struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int>
   
   (21) Filter
   Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
   Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND 
isnotnull(i_item_sk#564))
   
   (22) BroadcastExchange
   Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
   Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint)),false), [id=#320]
   
   (23) BroadcastHashJoin
   Left keys [1]: [ss_item_sk#131]
   Right keys [1]: [i_item_sk#564]
   Join condition: None
   
   (24) Project
   Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, 
s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
   Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, 
s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, 
i_size#579, i_color#581, i_units#582, i_manager_id#584]
   
   (25) Scan parquet 
   Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
   Batched: true
   Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer]
   PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)]
   ReadSchema: 
struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string>
   
   (26) Filter
   Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
   Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426))
   
   (27) BroadcastExchange
   Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
   Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint)),false), [id=#324]
   
   (28) BroadcastHashJoin
   Left keys [1]: [ss_customer_sk#132]
   Right keys [1]: [c_customer_sk#412]
   Join condition: None
   
   (29) Project
   Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, c_birth_country#426]
   Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, 
s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, 
c_last_name#421, c_birth_country#426]
   
   (30) Scan parquet 
   Output [3]: [ca_state#456, ca_zip#457, ca_country#458]
   Batched: true
   Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address]
   PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)]
   ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string>
   
   (31) Filter
   Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
   Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457))
   
   (32) BroadcastExchange
   Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
   Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), 
input[1, string, false]),false), [id=#328]
   
   (33) BroadcastHashJoin
   Left keys [2]: [c_birth_country#426, s_zip#689]
   Right keys [2]: [upper(ca_country#458), ca_zip#457]
   Join condition: None
   
   (34) Project
   Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, ca_state#456]
   Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, 
ca_zip#457, ca_country#458]
   
   (35) HashAggregate
   Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, ca_state#456]
   Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, 
ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, 
i_units#582, i_size#579]
   Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))]
   Aggregate Attributes [1]: [sum#870L]
   Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, 
ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, 
i_units#582, i_size#579, sum#871L]
   
   (36) Exchange
   Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, 
ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, 
i_units#582, i_size#579, sum#871L]
   Arguments: hashpartitioning(c_last_name#421, c_first_name#420, 
s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, 
i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333]
   
   (37) HashAggregate
   Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, 
ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, 
i_units#582, i_size#579, sum#871L]
   Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, 
ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, 
i_units#582, i_size#579]
   Functions [1]: [sum(UnscaledValue(ss_net_paid#149))]
   Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L]
   Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, 
MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852]
   
   (38) HashAggregate
   Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852]
   Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
   Functions [1]: [partial_sum(netpaid#852)]
   Aggregate Attributes [2]: [sum#866, isEmpty#867]
   Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]
   
   (39) Exchange
   Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]
   Arguments: hashpartitioning(c_last_name#421, c_first_name#420, 
s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337]
   
   (40) HashAggregate
   Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]
   Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
   Functions [1]: [sum(netpaid#852)]
   Aggregate Attributes [1]: [sum(netpaid#852)#854]
   Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, 
sum(netpaid#852)#854 AS paid#850]
   
   (41) Filter
   Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
   Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > 
cast(Subquery subquery#851, [id=#294] as decimal(33,8))))
   
   (42) AdaptiveSparkPlan
   Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
   Arguments: isFinalPlan=true
   ```
   
   **Since I saw the `PlanChangeLogger` log related to `Applying Rule 
org.apache.spark.sql.execution.adaptive.AQEPropagateEmptyRelation` and `Result 
of Batch Propagate Empty Relations` in the driver log, so I tried to revert 
this pr and test again. I found that the problem no longer exists.**
   
   
   And already file a jira https://issues.apache.org/jira/browse/SPARK-40278
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to