viirya commented on pull request #28846:
URL: https://github.com/apache/spark/pull/28846#issuecomment-645634571


   Use an example to elaborate it. This query `SELECT * FROM testData join 
testData2 ON key = a where value = '1'` is one of test case in 
`AdaptiveQueryExecSuite`.
   
   The adaptivePlan in current master:
   ```
    *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft
   :- BroadcastQueryStage 2
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
false] as bigint))), [id=#144]
   :     +- CustomShuffleReader local
   :        +- ShuffleQueryStage 0
   :           +- Exchange hashpartitioning(key#13, 5), true, [id=#110]
   :              +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
   :                 +- *(1) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS 
value#14]                                                                       
                           
   :                    +- Scan[obj#12]
   +- CustomShuffleReader local
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(a#23, 5), true, [id=#121]
            +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, 
knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
               +- Scan[obj#22]
   ```
   
   In above adaptivePlan, AQE produces two `ShuffleQueryStage`s because two 
exchanges were materialized in batch and then AQE re-optimizes the query. 
Although AQE can optimize SortMergeJoin as BroadcastHashJoin, the exchanges 
were already materialized and only thing AQE can do is reading it with local 
reader.
   
   The adaptivePlan in this change:
   ```
   *(2) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft                    
              
   :- BroadcastQueryStage 1
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
false] as bigint))), [id=#137]
   :     +- CustomShuffleReader local
   :        +- ShuffleQueryStage 0
   :           +- Exchange hashpartitioning(key#13, 5), true, [id=#110]
   :              +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
   :                 +- *(1) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, from
   String, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS 
value#14]
   :                    +- Scan[obj#12]
   +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, 
knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS
   b#24]
      +- Scan[obj#22]
   ```
   
   In this change, AQE only materializes one exchange and then optimizes 
SortMergeJoin as BroadcastHashJoin. After that, we don't need to produce 
another exchange.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to