wankunde opened a new pull request, #39923:
URL: https://github.com/apache/spark/pull/39923

   ### What changes were proposed in this pull request?
   This PR improves join stats estimation if one side can keep uniqueness(The 
distinct keys of the children of the join are a subset of the join keys). A 
common case is:
   
   ```sql
   SELECT i_item_sk ss_item_sk
   FROM   item,
          (SELECT DISTINCT iss.i_brand_id    brand_id,
                           iss.i_class_id    class_id,
                           iss.i_category_id category_id
           FROM   item iss) x
   WHERE  i_brand_id = brand_id
          AND i_class_id = class_id
          AND i_category_id = category_id 
   ```
   
   In this case, the row count of the join will definitely not expand.
   
   Before this PR:
   
   ```
   == Optimized Logical Plan ==
   Project [i_item_sk#4 AS ss_item_sk#54], Statistics(sizeInBytes=370.8 MiB, 
rowCount=3.24E+7)
   +- Join Inner, (((i_brand_id#11 = brand_id#51) AND (i_class_id#13 = 
class_id#52)) AND (i_category_id#15 = category_id#53)), 
Statistics(sizeInBytes=1112.3 MiB, rowCount=3.24E+7)
      :- Project [i_item_sk#4, i_brand_id#11, i_class_id#13, i_category_id#15], 
Statistics(sizeInBytes=4.6 MiB, rowCount=2.02E+5)
      :  +- Filter ((isnotnull(i_brand_id#11) AND isnotnull(i_class_id#13)) AND 
isnotnull(i_category_id#15)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
      :     +- Relation 
spark_catalog.default.item[i_item_sk#4,i_item_id#5,i_rec_start_date#6,i_rec_end_date#7,i_item_desc#8,i_current_price#9,i_wholesale_cost#10,i_brand_id#11,i_brand#12,i_class_id#13,i_class#14,i_category_id#15,i_category#16,i_manufact_id#17,i_manufact#18,i_size#19,i_formulation#20,i_color#21,i_units#22,i_container#23,i_manager_id#24,i_product_name#25]
 parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
      +- Aggregate [brand_id#51, class_id#52, category_id#53], [brand_id#51, 
class_id#52, category_id#53], Statistics(sizeInBytes=2.6 MiB, rowCount=1.37E+5)
         +- Project [i_brand_id#62 AS brand_id#51, i_class_id#64 AS 
class_id#52, i_category_id#66 AS category_id#53], Statistics(sizeInBytes=3.9 
MiB, rowCount=2.02E+5)
            +- Filter ((isnotnull(i_brand_id#62) AND isnotnull(i_class_id#64)) 
AND isnotnull(i_category_id#66)), Statistics(sizeInBytes=84.6 MiB, 
rowCount=2.02E+5)
               +- Relation 
spark_catalog.default.item[i_item_sk#55,i_item_id#56,i_rec_start_date#57,i_rec_end_date#58,i_item_desc#59,i_current_price#60,i_wholesale_cost#61,i_brand_id#62,i_brand#63,i_class_id#64,i_class#65,i_category_id#66,i_category#67,i_manufact_id#68,i_manufact#69,i_size#70,i_formulation#71,i_color#72,i_units#73,i_container#74,i_manager_id#75,i_product_name#76]
 parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
   ```
   
   After this PR:
   
   ```
   == Optimized Logical Plan ==
   Project [i_item_sk#4 AS ss_item_sk#54], Statistics(sizeInBytes=2.3 MiB, 
rowCount=2.02E+5)
   +- Join Inner, (((i_brand_id#11 = brand_id#51) AND (i_class_id#13 = 
class_id#52)) AND (i_category_id#15 = category_id#53)), 
Statistics(sizeInBytes=7.0 MiB, rowCount=2.02E+5)
      :- Project [i_item_sk#4, i_brand_id#11, i_class_id#13, i_category_id#15], 
Statistics(sizeInBytes=4.6 MiB, rowCount=2.02E+5)
      :  +- Filter ((isnotnull(i_brand_id#11) AND isnotnull(i_class_id#13)) AND 
isnotnull(i_category_id#15)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
      :     +- Relation 
spark_catalog.default.item[i_item_sk#4,i_item_id#5,i_rec_start_date#6,i_rec_end_date#7,i_item_desc#8,i_current_price#9,i_wholesale_cost#10,i_brand_id#11,i_brand#12,i_class_id#13,i_class#14,i_category_id#15,i_category#16,i_manufact_id#17,i_manufact#18,i_size#19,i_formulation#20,i_color#21,i_units#22,i_container#23,i_manager_id#24,i_product_name#25]
 parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
      +- Aggregate [brand_id#51, class_id#52, category_id#53], [brand_id#51, 
class_id#52, category_id#53], Statistics(sizeInBytes=2.6 MiB, rowCount=1.37E+5)
         +- Project [i_brand_id#62 AS brand_id#51, i_class_id#64 AS 
class_id#52, i_category_id#66 AS category_id#53], Statistics(sizeInBytes=3.9 
MiB, rowCount=2.02E+5)
            +- Filter ((isnotnull(i_brand_id#62) AND isnotnull(i_class_id#64)) 
AND isnotnull(i_category_id#66)), Statistics(sizeInBytes=84.6 MiB, 
rowCount=2.02E+5)
               +- Relation 
spark_catalog.default.item[i_item_sk#55,i_item_id#56,i_rec_start_date#57,i_rec_end_date#58,i_item_desc#59,i_current_price#60,i_wholesale_cost#61,i_brand_id#62,i_brand#63,i_class_id#64,i_class#65,i_category_id#66,i_category#67,i_manufact_id#68,i_manufact#69,i_size#70,i_formulation#71,i_color#72,i_units#73,i_container#74,i_manager_id#75,i_product_name#76]
 parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
   ```
   
   ### Why are the changes needed?
   Plan more broadcast joins to improve query performance.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Unit test and TPC-DS benchmark test.
   
   SQL  Before this PR(Seconds) After this PR(Seconds)
   q14a 187     164
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to