karuppayya opened a new pull request #23696: [SPARK-26781][SQL]Avoid additional exchanges URL: https://github.com/apache/spark/pull/23696 ## What changes were proposed in this pull request? Avoid additonal exchanges Consider three tables: a(id int), b(id int), c(id, int) query: `select * from (select a.id as newid from a join b where a.id = b.id) temp join c on temp.newid = c.id` Plan(physical plan: org.apache.spark.sql.execution.QueryExecution#executedPlan): Before fix: ``` *(9) SortMergeJoin [newid#1L], [id#6L], Inner :- *(6) Sort [newid#1L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(newid#1L, 200) : +- *(5) Project [id#2L AS newid#1L, name#3] : +- *(5) SortMergeJoin [id#2L], [id#4L], Inner : :- *(2) Sort [id#2L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(id#2L, 200) : : +- *(1) Project [id#2L, name#3] : : +- *(1) Filter isnotnull(id#2L) : : +- *(1) FileScan parquet a[id#2L,name#3] Batched: true, DataFilters: [isnotnull(id#2L)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/spark/a], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string> : +- *(4) Sort [id#4L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#4L, 200) : +- *(3) Project [id#4L] : +- *(3) Filter isnotnull(id#4L) : +- *(3) FileScan parquet b[id#4L] Batched: true, DataFilters: [isnotnull(id#4L)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/spark/b], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> +- *(8) Sort [id#6L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#6L, 200) +- *(7) Project [id#6L, name#7] +- *(7) Filter isnotnull(id#6L) +- *(7) FileScan parquet \c[id#6L,name#7] Batched: true, DataFilters: [isnotnull(id#6L)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/spark/c], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string> ``` The exchange operator below stage 6 is not required since the data from project is already partitioned based on id. An exchange gets added since the outputPartitioning of Project(5) is HashPartitioning on id#2L whereas the requiredPartitioning of Sort(Stage 6) is HashPartitioning on newid#1L which is nothing but alias of id#2L. The exchange operator is not required in this case if we are able to compare the attribute id#2L referenced by alias newid#1L 0 After fix: ``` *(8) Project [newid#1L, name#3, id#6L, name#7] +- *(8) SortMergeJoin [newid#1L], [id#6L], Inner :- *(5) Sort [newid#1L ASC NULLS FIRST], false, 0 : +- *(5) Project [id#2L AS newid#1L, name#3] : +- *(5) SortMergeJoin [id#2L], [id#4L], Inner : :- *(2) Sort [id#2L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(id#2L, 200) : : +- *(1) Project [id#2L, name#3] : : +- *(1) Filter isnotnull(id#2L) : : +- *(1) FileScan parquet a[id#2L,name#3] Batched: true, DataFilters: [isnotnull(id#2L)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/spark/a], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string> : +- *(4) Sort [id#4L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#4L, 200) : +- *(3) Project [id#4L] : +- *(3) Filter isnotnull(id#4L) : +- *(3) FileScan parquet b[id#4L] Batched: true, DataFilters: [isnotnull(id#4L)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/spark/b], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> +- *(7) Sort [id#6L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#6L, 200) +- *(6) Project [id#6L, name#7] +- *(6) Filter isnotnull(id#6L) +- *(6) FileScan parquet c[id#6L,name#7] Batched: true, DataFilters: [isnotnull(id#6L)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/spark/c], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string> ``` ## How was this patch tested? Unit tests, manual test(checked against TPC-DS query2 where this issue occurs)
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
