[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-15 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195886078
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -70,7 +70,7 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   }
 
   override def outputPartitioning: Partitioning = child.outputPartitioning 
match {
-case h: HashPartitioning => h.copy(expressions = 
h.expressions.map(updateAttr))
+case e: Expression => updateAttr(e).asInstanceOf[Partitioning]
 case other => other
--- End diff --

LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195883736
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -2270,4 +2270,15 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1))
 
checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))),
 Row(1))
   }
+
+  test("SPARK-24556: ReusedExchange should rewrite output partitioning for 
RangePartitioning") {
--- End diff --

please also mention cached table in PR title


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195883713
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -2270,4 +2270,15 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1))
 
checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))),
 Row(1))
   }
+
+  test("SPARK-24556: ReusedExchange should rewrite output partitioning for 
RangePartitioning") {
--- End diff --

this is not an end-to-end test, let's put it in `PlannerSuite` and also 
test cached table.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-15 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195654141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

Looks correct.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-15 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195652026
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

@viirya From `updateAttribute`, `relation.cachedPlan.output` and 
`relation.output` one to one.
``` 
 private def updateAttribute(expr: Expression): Expression = {

val attrMap = 
AttributeMap(relation.cachedPlan.output.zip(relation.output))

  }
```
It means "[i#54, j#55, m#58, n#59]" corresponds to "[i#5, j#6, m#15, 
n#16]", so we can always replace `HashPartitioning(i#5)` to 
`HashPartitioning(i#54)`.
Any idea?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195630214
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

For `PartitioningCollection`, I think it is harder to treat it like 
`HashPartitioning` and `RangePartitioning` when replacing attributes.

In above example, `PartitioningCollection` contains `HashPartitioning(i#5)` 
and `HashPartitioning(m#15)`, the output of `InMemoryRelation` is `[i#54, j#55, 
m#58, n#59]`.  Can we still replace attributes based on the location of 
attribute in output?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195463301
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

`PartitioningCollection` should be considered. Like below case:
```
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.codegen.wholeStage", false)
val df1 = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j").as("t1")
val df2 = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("m", "n").as("t2")
val d = df1.join(df2, $"t1.i" === $"t2.m")
d.cache
val d1 = d.as("t3")
val d2 = d.as("t4")
d1.join(d2, $"t3.i" === $"t4.i").explain
```
```
SortMergeJoin [i#5], [i#54], Inner
:- InMemoryTableScan [i#5, j#6, m#15, n#16]
: +- InMemoryRelation [i#5, j#6, m#15, n#16], CachedRDDBuilder
:   +- SortMergeJoin [i#5], [m#15], Inner
:  :- Sort [i#5 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(i#5, 10)
:  : +- LocalTableScan [i#5, j#6]
:  +- Sort [m#15 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(m#15, 10)
:+- LocalTableScan [m#15, n#16]
+- Sort [i#54 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#54, 10)
  +- InMemoryTableScan [i#54, j#55, m#58, n#59]
+- InMemoryRelation [i#54, j#55, m#58, n#59], CachedRDDBuilder
  +- SortMergeJoin [i#5], [m#15], Inner
 :- Sort [i#5 ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(i#5, 10)
 : +- LocalTableScan [i#5, j#6]
 +- Sort [m#15 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(m#15, 10)
   +- LocalTableScan [m#15, n#16]
```
`Exchange hashpartitioning(i#54, 10)` is extra shuffle.

How do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195420136
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

Hmm, `HashPartitioning` and `RangePartitioning` can affect later sorting 
and shuffle. But for `BroadcastPartitioning`, seems to me no such benefit. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r19535
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

Oh, like `HashedRelationBroadcastMode`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195361725
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

`BroadcastPartitioning`'s `BroadcastMode` contains `Expression`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195356161
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

I think `PartitioningCollection` is for an operator that has multiple 
children. `BroadcastPartitioning` is not `Expression`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195355721
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

yes, you're right @viirya , thanks. Then, I'd propose something like:
```
relation.cachedPlan.outputPartitioning match {
 case e: Expression => updateAttribute(e)
 case other => other
}
```

what do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195354829
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

Good suggestion, thanks @mgaido91.

@viirya Do we need consider below:
`PartitioningCollection` in `InMemoryTableScanExec.outputPartitioning`, 
which is also `Expression`?
`PartitioningCollection` and `BroadcastPartitioning` in 
`ReusedExchangeExec.outputPartitioning`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195352300
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

Not all `Partitioning` are `Expression`. Only `HashPartitioning` and 
`RangePartitioning` are.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195349283
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

why not just `updateAttribute(r)`?

Moreover, in order to avoid the same issue in the future with other cases, 
have you considered doing something like:
```
updateAttribute(relation.cachedPlan.outputPartitioning)
``
?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21564#discussion_r195348233
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -170,6 +170,8 @@ case class InMemoryTableScanExec(
   override def outputPartitioning: Partitioning = {
 relation.cachedPlan.outputPartitioning match {
   case h: HashPartitioning => 
updateAttribute(h).asInstanceOf[HashPartitioning]
+  case r: RangePartitioning =>
+r.copy(ordering = 
r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder]))
--- End diff --

Not sure why `RangePartitioning` isn't included at first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...

2018-06-14 Thread yucai
GitHub user yucai opened a pull request:

https://github.com/apache/spark/pull/21564

[SPARK-24556][SQL] ReusedExchange should rewrite output partitioning also 
when child's partitioning is RangePartitioning

## What changes were proposed in this pull request?

Currently, ReusedExchange would rewrite output partitioning if child's 
partitioning is HashPartitioning, but it does not do the same when child's 
partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, 
see:

```
val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
val df1 = df.as("t1")
val df2 = df.as("t2")
val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", 
"right")
t.cache.orderBy($"t2.j").explain
```
Before:
```
== Physical Plan ==
*(1) Sort [j#14 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
   +- InMemoryTableScan [i#5, j#6, i#13, j#14]
 +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
   +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, 
BuildLeft
  :- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as...
  :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
  : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 
200)
  :+- LocalTableScan [i#5, j#6]
  +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
 +- ReusedExchange [i#13, j#14], Exchange 
rangepartitioning(j#6 ASC NULLS FIRST, 200)
```
Better plan should avoid ```Exchange rangepartitioning(j#14 ASC NULLS 
FIRST, 200)```, like:
```
== Physical Plan ==
*(1) Sort [j#14 ASC NULLS FIRST], true, 0
+- InMemoryTableScan [i#5, j#6, i#13, j#14]
  +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
+- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
   :- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
   : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
   :+- LocalTableScan [i#5, j#6]
   +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
  +- ReusedExchange [i#13, j#14], Exchange 
rangepartitioning(j#6 ASC NULLS FIRST, 200)
```

## How was this patch tested?

Add new tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yucai/spark SPARK-24556

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21564






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org