Jeff Min created SPARK-42935:
--------------------------------

             Summary: Optimze shuffle for union spark plan
                 Key: SPARK-42935
                 URL: https://issues.apache.org/jira/browse/SPARK-42935
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.5.0
            Reporter: Jeff Min
             Fix For: 3.5.0


Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);
The physical plan is
AdaptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28], [id#35], [name#36 DESC NULLS LAST]
   +- Sort [id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST], false, 0
      +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, 
[plan_id=88]
         +- Union
            :- FileScan csv spark_catalog.default.table1[id#35,name#36]
            +- FileScan csv spark_catalog.default.table2[id#37,name#38]
Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan is
daptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#0], [id#7], [name#8 DESC NULLS LAST]
   +- Sort [id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST], false, 0
      +- UnionZip [ClusteredDistribution(ArrayBuffer(id#7),false,None), 
ClusteredDistribution(ArrayBuffer(id#9),false,None)], hashpartitioning(id#7, 
200)
         :- FileScan csv spark_catalog.default.table1[id#7,name#8]
         +- FileScan csv spark_catalog.default.table2[id#9,name#10]
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to