karuppayya opened a new pull request #23696: [SPARK-26781][SQL]Avoid additional 
exchanges
URL: https://github.com/apache/spark/pull/23696
 
 
   ## What changes were proposed in this pull request?
   
   Avoid additonal exchanges 
   Consider three tables: a(id int), b(id int), c(id, int)
   
   query:  
   `select * from (select a.id as newid from a join b where a.id = b.id) temp 
join c on temp.newid = c.id`
   
   Plan(physical plan: 
org.apache.spark.sql.execution.QueryExecution#executedPlan):
   Before fix:
   ```
   *(9) SortMergeJoin [newid#1L], [id#6L], Inner
   :- *(6) Sort [newid#1L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(newid#1L, 200)
   :     +- *(5) Project [id#2L AS newid#1L, name#3]
   :        +- *(5) SortMergeJoin [id#2L], [id#4L], Inner
   :           :- *(2) Sort [id#2L ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(id#2L, 200)
   :           :     +- *(1) Project [id#2L, name#3]
   :           :        +- *(1) Filter isnotnull(id#2L)
   :           :           +- *(1) FileScan parquet a[id#2L,name#3] Batched: 
true, DataFilters: [isnotnull(id#2L)], Format: Parquet, Location: 
InMemoryFileIndex[file:/tmp/spark/a], PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
   :           +- *(4) Sort [id#4L ASC NULLS FIRST], false, 0
   :              +- Exchange hashpartitioning(id#4L, 200)
   :                 +- *(3) Project [id#4L]
   :                    +- *(3) Filter isnotnull(id#4L)
   :                       +- *(3) FileScan parquet b[id#4L] Batched: true, 
DataFilters: [isnotnull(id#4L)], Format: Parquet, Location: 
InMemoryFileIndex[file:/tmp/spark/b], PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: struct<id:bigint>
   +- *(8) Sort [id#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#6L, 200)
         +- *(7) Project [id#6L, name#7]
            +- *(7) Filter isnotnull(id#6L)
               +- *(7) FileScan parquet \c[id#6L,name#7] Batched: true, 
DataFilters: [isnotnull(id#6L)], Format: Parquet, Location: 
InMemoryFileIndex[file:/tmp/spark/c], PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
   ```
    The exchange operator below stage 6 is not required since the data from 
project is already partitioned based on id.
   
   An exchange gets added since the outputPartitioning of Project(5) is 
HashPartitioning on id#2L whereas the requiredPartitioning of Sort(Stage 6) is 
HashPartitioning on newid#1L which is nothing but alias of id#2L.
   
   The exchange operator is not required in this case if we are able to compare 
the attribute id#2L referenced by alias newid#1L 0
   
   After fix:
   ```
   *(8) Project [newid#1L, name#3, id#6L, name#7]
   +- *(8) SortMergeJoin [newid#1L], [id#6L], Inner
      :- *(5) Sort [newid#1L ASC NULLS FIRST], false, 0
      :  +- *(5) Project [id#2L AS newid#1L, name#3]
      :     +- *(5) SortMergeJoin [id#2L], [id#4L], Inner
      :        :- *(2) Sort [id#2L ASC NULLS FIRST], false, 0
      :        :  +- Exchange hashpartitioning(id#2L, 200)
      :        :     +- *(1) Project [id#2L, name#3]
      :        :        +- *(1) Filter isnotnull(id#2L)
      :        :           +- *(1) FileScan parquet a[id#2L,name#3] Batched: 
true, DataFilters: [isnotnull(id#2L)], Format: Parquet, Location: 
InMemoryFileIndex[file:/tmp/spark/a], PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
      :        +- *(4) Sort [id#4L ASC NULLS FIRST], false, 0
      :           +- Exchange hashpartitioning(id#4L, 200)
      :              +- *(3) Project [id#4L]
      :                 +- *(3) Filter isnotnull(id#4L)
      :                    +- *(3) FileScan parquet b[id#4L] Batched: true, 
DataFilters: [isnotnull(id#4L)], Format: Parquet, Location: 
InMemoryFileIndex[file:/tmp/spark/b], PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: struct<id:bigint>
      +- *(7) Sort [id#6L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#6L, 200)
            +- *(6) Project [id#6L, name#7]
               +- *(6) Filter isnotnull(id#6L)
                  +- *(6) FileScan parquet c[id#6L,name#7] Batched: true, 
DataFilters: [isnotnull(id#6L)], Format: Parquet, Location: 
InMemoryFileIndex[file:/tmp/spark/c], PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: struct<id:bigint,name:string>
   
   ```
   
   
   ## How was this patch tested?
   
   Unit tests, manual test(checked against TPC-DS query2 where this issue 
occurs)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to