wangyum opened a new pull request, #38047:
URL: https://github.com/apache/spark/pull/38047

   ### What changes were proposed in this pull request?
   
   It will invalidate the bucket scan if add a cast to bucket column. In fact, 
sometimes we can avoid adding this cast if it is an equality expression and 
both sides are integral types.
   
   This PR adds a new type coercion rule(`EqualityTypeCasts`) to support 
casting types according to bucket info for Equality expressions. For example:
   ```sql
   CREATE TABLE t1(l bigint) USING parquet CLUSTERED BY (l) INTO 200 buckets;
   CREATE TABLE t2(dec decimal(18, 0)) USING parquet CLUSTERED BY (dec) INTO 
200 buckets;
   
   SET spark.sql.autoBroadcastJoinThreshold=-1;
   SELECT * FROM t1 JOIN t2 ON t1.l = t2.dec;
   ```
   
   Before this PR:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- SortMergeJoin [cast(l#10L as decimal(20,0))], [cast(dec#11 as 
decimal(20,0))], Inner
      :- Sort [cast(l#10L as decimal(20,0)) ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(cast(l#10L as decimal(20,0)), 5), 
ENSURE_REQUIREMENTS, [plan_id=38]
      :     +- Filter isnotnull(l#10L)
      :        +- FileScan parquet spark_catalog.default.t1[l#10L]
      +- Sort [cast(dec#11 as decimal(20,0)) ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(cast(dec#11 as decimal(20,0)), 5), 
ENSURE_REQUIREMENTS, [plan_id=42]
            +- Filter isnotnull(dec#11)
               +- FileScan parquet spark_catalog.default.t2[dec#11]
   ```
   After this PR:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- SortMergeJoin [l#10L], [try_cast(dec#11 as bigint)], Inner
      :- Sort [l#10L ASC NULLS FIRST], false, 0
      :  +- Filter isnotnull(l#10L)
      :     +- FileScan parquet spark_catalog.default.t1[l#10L]
      +- Sort [try_cast(dec#11 as bigint) ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(try_cast(dec#11 as bigint), 200), 
ENSURE_REQUIREMENTS, [plan_id=38]
            +- Filter isnotnull(dec#11)
               +- FileScan parquet spark_catalog.default.t2[dec#11]
   ```
   
   
   ### Why are the changes needed?
   
   Reduce shuffle to improve query performance.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Unit test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to