[jira] [Assigned] (SPARK-24556) ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning

2018-06-19 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-24556:
---

Assignee: yucai

> ReusedExchange should rewrite output partitioning also when child's 
> partitioning is RangePartitioning
> -
>
> Key: SPARK-24556
> URL: https://issues.apache.org/jira/browse/SPARK-24556
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: yucai
>Assignee: yucai
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, ReusedExchange would rewrite output partitioning if child's 
> partitioning is HashPartitioning, but it does not do the same when child's 
> partitioning is RangePartitioning, sometimes, it could introduce extra 
> shuffle, see:
> {code:java}
> val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
> val df1 = df.as("t1")
> val df2 = df.as("t2")
> val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
> t.cache.orderBy($"t2.j").explain
> {code}
> Before fix:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
>+- InMemoryTableScan [i#5, j#6, i#13, j#14]
>  +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
>+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>   :- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, false] as...
>   :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>   : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 
> 200)
>   :+- LocalTableScan [i#5, j#6]
>   +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>  +- ReusedExchange [i#13, j#14], Exchange 
> rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}
> Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 
> 200)", like:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- InMemoryTableScan [i#5, j#6, i#13, j#14]
>   +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
> +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>:- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
>:  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>: +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
>:+- LocalTableScan [i#5, j#6]
>+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>   +- ReusedExchange [i#13, j#14], Exchange 
> rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24556) ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning

2018-06-14 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24556:


Assignee: Apache Spark

> ReusedExchange should rewrite output partitioning also when child's 
> partitioning is RangePartitioning
> -
>
> Key: SPARK-24556
> URL: https://issues.apache.org/jira/browse/SPARK-24556
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: yucai
>Assignee: Apache Spark
>Priority: Major
>
> Currently, ReusedExchange would rewrite output partitioning if child's 
> partitioning is HashPartitioning, but it does not do the same when child's 
> partitioning is RangePartitioning, sometimes, it could introduce extra 
> shuffle, see:
> {code:java}
> val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
> val df1 = df.as("t1")
> val df2 = df.as("t2")
> val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
> t.cache.orderBy($"t2.j").explain
> {code}
> Before fix:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
>+- InMemoryTableScan [i#5, j#6, i#13, j#14]
>  +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
>+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>   :- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, false] as...
>   :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>   : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 
> 200)
>   :+- LocalTableScan [i#5, j#6]
>   +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>  +- ReusedExchange [i#13, j#14], Exchange 
> rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}
> Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 
> 200)", like:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- InMemoryTableScan [i#5, j#6, i#13, j#14]
>   +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
> +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>:- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
>:  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>: +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
>:+- LocalTableScan [i#5, j#6]
>+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>   +- ReusedExchange [i#13, j#14], Exchange 
> rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24556) ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning

2018-06-14 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24556:


Assignee: (was: Apache Spark)

> ReusedExchange should rewrite output partitioning also when child's 
> partitioning is RangePartitioning
> -
>
> Key: SPARK-24556
> URL: https://issues.apache.org/jira/browse/SPARK-24556
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: yucai
>Priority: Major
>
> Currently, ReusedExchange would rewrite output partitioning if child's 
> partitioning is HashPartitioning, but it does not do the same when child's 
> partitioning is RangePartitioning, sometimes, it could introduce extra 
> shuffle, see:
> {code:java}
> val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
> val df1 = df.as("t1")
> val df2 = df.as("t2")
> val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
> t.cache.orderBy($"t2.j").explain
> {code}
> Before fix:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
>+- InMemoryTableScan [i#5, j#6, i#13, j#14]
>  +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
>+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>   :- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, false] as...
>   :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>   : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 
> 200)
>   :+- LocalTableScan [i#5, j#6]
>   +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>  +- ReusedExchange [i#13, j#14], Exchange 
> rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}
> Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 
> 200)", like:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- InMemoryTableScan [i#5, j#6, i#13, j#14]
>   +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
> +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>:- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
>:  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>: +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
>:+- LocalTableScan [i#5, j#6]
>+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>   +- ReusedExchange [i#13, j#14], Exchange 
> rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org