cloud-fan commented on pull request #34034:
URL: https://github.com/apache/spark/pull/34034#issuecomment-939862907


   Unfortunately, this breaks broadcast reuse which causes perf regression. To 
reproduce
   ```
   scala> val df1 = spark.range(1000)
   df1: org.apache.spark.sql.Dataset[Long] = [id: bigint]
   
   scala> val df2 = spark.range(100)
   df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
   
   scala> val j1 = df1.join(df2, Seq("id"), "inner")
   j1: org.apache.spark.sql.DataFrame = [id: bigint]
   
   scala> val j2 = df1.join(df2, Seq("id"), "left_semi")
   j2: org.apache.spark.sql.DataFrame = [id: bigint]
   
   scala> val res = j1.union(j2)
   res: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint]
   
   scala> res.collect()
   res0: Array[org.apache.spark.sql.Row] = Array([0], ...
   
   scala> res.explain
   ```
   
   Before this PR, the query plan was
   ```
   AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      Union
      :- *(3) Project [id#0L]
      :  +- *(3) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, false
      :     :- *(3) Range (0, 1000, step=1, splits=1)
      :     +- BroadcastQueryStage 0
      :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
bigint, false]),false,false), [id=#79]
      :           +- *(1) Range (0, 100, step=1, splits=1)
      +- *(4) BroadcastHashJoin [id#12L], [id#13L], LeftSemi, BuildRight, false
         :- *(4) Range (0, 1000, step=1, splits=1)
         +- BroadcastQueryStage 2
            +- ReusedExchange [id#13L], BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, bigint, false]),false,false), [id=#79]
   ```
   
   Now it's
   ```
   == Physical Plan ==
   +- == Final Plan ==
      Union
      :- *(3) Project [id#0L]
      :  +- *(3) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, false
      :     :- *(3) Range (0, 1000, step=1, splits=1)
      :     +- BroadcastQueryStage 0
      :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
bigint, false]),false,false), [id=#41]
      :           +- *(1) Range (0, 100, step=1, splits=1)
      +- *(4) BroadcastHashJoin [id#6L], [id#7L], LeftSemi, BuildRight, false
         :- *(4) Range (0, 1000, step=1, splits=1)
         +- BroadcastQueryStage 1
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
bigint, false]),false,true), [id=#50]
               +- *(2) Range (0, 100, step=1, splits=1)
   ```
   
   Ignore duplicated key is a small improvement and broadcast reuse is 
definitely more important to the query performance. I'm reverting this first. 
Please re-propose this optimization without breaking broadcast reuse.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to