GitHub user yucai opened a pull request:
https://github.com/apache/spark/pull/21564
[SPARK-24556][SQL] ReusedExchange should rewrite output partitioning also
when child's partitioning is RangePartitioning
## What changes were proposed in this pull request?
Currently, ReusedExchange would rewrite output partitioning if child's
partitioning is HashPartitioning, but it does not do the same when child's
partitioning is RangePartitioning, sometimes, it could introduce extra shuffle,
see:
```
val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
val df1 = df.as("t1")
val df2 = df.as("t2")
val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i",
"right")
t.cache.orderBy($"t2.j").explain
```
Before:
```
== Physical Plan ==
*(1) Sort [j#14 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
+- InMemoryTableScan [i#5, j#6, i#13, j#14]
+- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter,
BuildLeft
:- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, false] as...
: +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
: +- Exchange rangepartitioning(j#6 ASC NULLS FIRST,
200)
: +- LocalTableScan [i#5, j#6]
+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
+- ReusedExchange [i#13, j#14], Exchange
rangepartitioning(j#6 ASC NULLS FIRST, 200)
```
Better plan should avoid ```Exchange rangepartitioning(j#14 ASC NULLS
FIRST, 200)```, like:
```
== Physical Plan ==
*(1) Sort [j#14 ASC NULLS FIRST], true, 0
+- InMemoryTableScan [i#5, j#6, i#13, j#14]
+- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
:- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
: +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
: +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
: +- LocalTableScan [i#5, j#6]
+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
+- ReusedExchange [i#13, j#14], Exchange
rangepartitioning(j#6 ASC NULLS FIRST, 200)
```
## How was this patch tested?
Add new tests.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/yucai/spark SPARK-24556
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21564.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21564
----
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]