LuciferYang commented on PR #35149: URL: https://github.com/apache/spark/pull/35149#issuecomment-1231688980
@cloud-fan @ulysses-you Sorry to disturb you in a pr that has been merged, but I find that this pr may have some negative effects. I used [databricks spark-sql-pref](https://github.com/databricks/spark-sql-perf) + Spark 3.3 in spark-shell to run 3TB TPCDS q24a or q24b, the test code as follows: ```scala val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T" val databaseName = "tpcds_database" val scaleFactor = "3072" val format = "parquet" import com.databricks.spark.sql.perf.tpcds.TPCDSTables // just use to load tables val tables = new TPCDSTables( spark.sqlContext,dsdgenDir = "./tpcds-kit/tools", scaleFactor = scaleFactor, useDoubleForDecimal = false,useStringForDate = false) spark.sql(s"create database $databaseName") tables.createTemporaryTables(rootDir, format) spark.sql(s"use $databaseName") // TPCDS 24a val result = spark.sql(""" with ssales as (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid from store_sales, store_returns, store, item, customer, customer_address where ss_ticket_number = sr_ticket_number and ss_item_sk = sr_item_sk and ss_customer_sk = c_customer_sk and ss_item_sk = i_item_sk and ss_store_sk = s_store_sk and c_birth_country = upper(ca_country) and s_zip = ca_zip and s_market_id = 8 group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size) select c_last_name, c_first_name, s_store_name, sum(netpaid) paid from ssales where i_color = 'pale' group by c_last_name, c_first_name, s_store_name having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect() sc.stop() ``` The above test may failed due to `Stage cancelled because SparkContext was shut down` of stage 31 and stage 36 when AQE enabled as follows: <img width="1427" alt="image" src="https://user-images.githubusercontent.com/1475305/187451487-148e01b0-3a2c-46d7-a075-ba7d7393bc18.png"> <img width="1430" alt="image" src="https://user-images.githubusercontent.com/1475305/187451605-e9c1819b-4af8-47df-ab06-36c63eaa8bae.png"> <img width="1427" alt="image" src="https://user-images.githubusercontent.com/1475305/187451708-b9a0ea91-35e8-4c22-bb1e-8c4123a3f2ef.png"> The DAG corresponding to sql is as follows: <img width="421" alt="image" src="https://user-images.githubusercontent.com/1475305/187452035-4bb08940-9374-4336-b353-14fc9975a0fd.png"> And the details as follows: ``` == Physical Plan == AdaptiveSparkPlan (42) +- == Final Plan == LocalTableScan (1) +- == Initial Plan == Filter (41) +- HashAggregate (40) +- Exchange (39) +- HashAggregate (38) +- HashAggregate (37) +- Exchange (36) +- HashAggregate (35) +- Project (34) +- BroadcastHashJoin Inner BuildRight (33) :- Project (29) : +- BroadcastHashJoin Inner BuildRight (28) : :- Project (24) : : +- BroadcastHashJoin Inner BuildRight (23) : : :- Project (19) : : : +- BroadcastHashJoin Inner BuildRight (18) : : : :- Project (13) : : : : +- SortMergeJoin Inner (12) : : : : :- Sort (6) : : : : : +- Exchange (5) : : : : : +- Project (4) : : : : : +- Filter (3) : : : : : +- Scan parquet (2) : : : : +- Sort (11) : : : : +- Exchange (10) : : : : +- Project (9) : : : : +- Filter (8) : : : : +- Scan parquet (7) : : : +- BroadcastExchange (17) : : : +- Project (16) : : : +- Filter (15) : : : +- Scan parquet (14) : : +- BroadcastExchange (22) : : +- Filter (21) : : +- Scan parquet (20) : +- BroadcastExchange (27) : +- Filter (26) : +- Scan parquet (25) +- BroadcastExchange (32) +- Filter (31) +- Scan parquet (30) (1) LocalTableScan Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] (2) Scan parquet Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales] PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)] ReadSchema: struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)> (3) Filter Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132)) (4) Project Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] (5) Exchange Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), ENSURE_REQUIREMENTS, [id=#309] (6) Sort Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS FIRST], false, 0 (7) Scan parquet Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns] PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)] ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint> (8) Filter Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177)) (9) Project Output [2]: [sr_item_sk#177, sr_ticket_number#184L] Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] (10) Exchange Input [2]: [sr_item_sk#177, sr_ticket_number#184L] Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), ENSURE_REQUIREMENTS, [id=#310] (11) Sort Input [2]: [sr_item_sk#177, sr_ticket_number#184L] Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS FIRST], false, 0 (12) SortMergeJoin Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131] Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177] Join condition: None (13) Project Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149] Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L] (14) Scan parquet Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store] PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), IsNotNull(s_store_sk), IsNotNull(s_zip)] ReadSchema: struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string> (15) Filter Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689)) (16) Project Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] (17) BroadcastExchange Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#316] (18) BroadcastHashJoin Left keys [1]: [ss_store_sk#136] Right keys [1]: [s_store_sk#664] Join condition: None (19) Project Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689] Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] (20) Scan parquet Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item] PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)] ReadSchema: struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int> (21) Filter Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND isnotnull(i_item_sk#564)) (22) BroadcastExchange Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#320] (23) BroadcastHashJoin Left keys [1]: [ss_item_sk#131] Right keys [1]: [i_item_sk#564] Join condition: None (24) Project Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] (25) Scan parquet Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer] PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)] ReadSchema: struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string> (26) Filter Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426)) (27) BroadcastExchange Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#324] (28) BroadcastHashJoin Left keys [1]: [ss_customer_sk#132] Right keys [1]: [c_customer_sk#412] Join condition: None (29) Project Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426] Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] (30) Scan parquet Output [3]: [ca_state#456, ca_zip#457, ca_country#458] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address] PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)] ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string> (31) Filter Input [3]: [ca_state#456, ca_zip#457, ca_country#458] Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457)) (32) BroadcastExchange Input [3]: [ca_state#456, ca_zip#457, ca_country#458] Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [id=#328] (33) BroadcastHashJoin Left keys [2]: [c_birth_country#426, s_zip#689] Right keys [2]: [upper(ca_country#458), ca_zip#457] Join condition: None (34) Project Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456] Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, ca_zip#457, ca_country#458] (35) HashAggregate Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456] Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579] Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))] Aggregate Attributes [1]: [sum#870L] Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] (36) Exchange Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333] (37) HashAggregate Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579] Functions [1]: [sum(UnscaledValue(ss_net_paid#149))] Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L] Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852] (38) HashAggregate Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852] Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669] Functions [1]: [partial_sum(netpaid#852)] Aggregate Attributes [2]: [sum#866, isEmpty#867] Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] (39) Exchange Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337] (40) HashAggregate Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669] Functions [1]: [sum(netpaid#852)] Aggregate Attributes [1]: [sum(netpaid#852)#854] Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, sum(netpaid#852)#854 AS paid#850] (41) Filter Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > cast(Subquery subquery#851, [id=#294] as decimal(33,8)))) (42) AdaptiveSparkPlan Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Arguments: isFinalPlan=true ``` **Since I saw the `PlanChangeLogger` log related to `Applying Rule org.apache.spark.sql.execution.adaptive.AQEPropagateEmptyRelation` and `Result of Batch Propagate Empty Relations` in the driver log, so I tried to revert this pr and test again. I found that the problem no longer exists.** And already file a jira https://issues.apache.org/jira/browse/SPARK-40278 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
