[jira] [Assigned] (SPARK-24556) ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning
[ https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-24556: --- Assignee: yucai > ReusedExchange should rewrite output partitioning also when child's > partitioning is RangePartitioning > - > > Key: SPARK-24556 > URL: https://issues.apache.org/jira/browse/SPARK-24556 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.2 >Reporter: yucai >Assignee: yucai >Priority: Major > Fix For: 2.4.0 > > > Currently, ReusedExchange would rewrite output partitioning if child's > partitioning is HashPartitioning, but it does not do the same when child's > partitioning is RangePartitioning, sometimes, it could introduce extra > shuffle, see: > {code:java} > val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j") > val df1 = df.as("t1") > val df2 = df.as("t2") > val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right") > t.cache.orderBy($"t2.j").explain > {code} > Before fix: > {code:sql} > == Physical Plan == > *(1) Sort [j#14 ASC NULLS FIRST], true, 0 > +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200) >+- InMemoryTableScan [i#5, j#6, i#13, j#14] > +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... >+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft > :- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, false] as... > : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 > : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, > 200) > :+- LocalTableScan [i#5, j#6] > +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 > +- ReusedExchange [i#13, j#14], Exchange > rangepartitioning(j#6 ASC NULLS FIRST, 200) > {code} > Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, > 200)", like: > {code:sql} > == Physical Plan == > *(1) Sort [j#14 ASC NULLS FIRST], true, 0 > +- InMemoryTableScan [i#5, j#6, i#13, j#14] > +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... > +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft >:- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) >: +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 >: +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) >:+- LocalTableScan [i#5, j#6] >+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 > +- ReusedExchange [i#13, j#14], Exchange > rangepartitioning(j#6 ASC NULLS FIRST, 200) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24556) ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning
[ https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24556: Assignee: Apache Spark > ReusedExchange should rewrite output partitioning also when child's > partitioning is RangePartitioning > - > > Key: SPARK-24556 > URL: https://issues.apache.org/jira/browse/SPARK-24556 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.2 >Reporter: yucai >Assignee: Apache Spark >Priority: Major > > Currently, ReusedExchange would rewrite output partitioning if child's > partitioning is HashPartitioning, but it does not do the same when child's > partitioning is RangePartitioning, sometimes, it could introduce extra > shuffle, see: > {code:java} > val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j") > val df1 = df.as("t1") > val df2 = df.as("t2") > val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right") > t.cache.orderBy($"t2.j").explain > {code} > Before fix: > {code:sql} > == Physical Plan == > *(1) Sort [j#14 ASC NULLS FIRST], true, 0 > +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200) >+- InMemoryTableScan [i#5, j#6, i#13, j#14] > +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... >+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft > :- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, false] as... > : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 > : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, > 200) > :+- LocalTableScan [i#5, j#6] > +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 > +- ReusedExchange [i#13, j#14], Exchange > rangepartitioning(j#6 ASC NULLS FIRST, 200) > {code} > Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, > 200)", like: > {code:sql} > == Physical Plan == > *(1) Sort [j#14 ASC NULLS FIRST], true, 0 > +- InMemoryTableScan [i#5, j#6, i#13, j#14] > +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... > +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft >:- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) >: +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 >: +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) >:+- LocalTableScan [i#5, j#6] >+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 > +- ReusedExchange [i#13, j#14], Exchange > rangepartitioning(j#6 ASC NULLS FIRST, 200) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24556) ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning
[ https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24556: Assignee: (was: Apache Spark) > ReusedExchange should rewrite output partitioning also when child's > partitioning is RangePartitioning > - > > Key: SPARK-24556 > URL: https://issues.apache.org/jira/browse/SPARK-24556 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.2 >Reporter: yucai >Priority: Major > > Currently, ReusedExchange would rewrite output partitioning if child's > partitioning is HashPartitioning, but it does not do the same when child's > partitioning is RangePartitioning, sometimes, it could introduce extra > shuffle, see: > {code:java} > val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j") > val df1 = df.as("t1") > val df2 = df.as("t2") > val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right") > t.cache.orderBy($"t2.j").explain > {code} > Before fix: > {code:sql} > == Physical Plan == > *(1) Sort [j#14 ASC NULLS FIRST], true, 0 > +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200) >+- InMemoryTableScan [i#5, j#6, i#13, j#14] > +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... >+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft > :- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, false] as... > : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 > : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, > 200) > :+- LocalTableScan [i#5, j#6] > +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 > +- ReusedExchange [i#13, j#14], Exchange > rangepartitioning(j#6 ASC NULLS FIRST, 200) > {code} > Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, > 200)", like: > {code:sql} > == Physical Plan == > *(1) Sort [j#14 ASC NULLS FIRST], true, 0 > +- InMemoryTableScan [i#5, j#6, i#13, j#14] > +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... > +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft >:- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) >: +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 >: +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) >:+- LocalTableScan [i#5, j#6] >+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 > +- ReusedExchange [i#13, j#14], Exchange > rangepartitioning(j#6 ASC NULLS FIRST, 200) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org