[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21291


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r189154765
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -55,7 +55,9 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSQLContext {
 val plan = df.queryExecution.executedPlan
 assert(plan.find(p =>
   p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
+p.asInstanceOf[WholeStageCodegenExec].child.collect {
--- End diff --

ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r189151725
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -55,7 +55,9 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSQLContext {
 val plan = df.queryExecution.executedPlan
 assert(plan.find(p =>
   p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
+p.asInstanceOf[WholeStageCodegenExec].child.collect {
--- End diff --

same here, can we change the `groupBy` instead of the test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r189149095
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id.desc)
+  // Range has range partitioning in its output now. To have a range 
shuffle, we
+  // need to run a repartition first.
+  val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
--- End diff --

By `spark.range(0, n, 1, 10).sort('id.desc)`, there is no 3 times liner 
relation between `a` and `b`. As shown above, this is also evenly distribution, 
the chi-sq value is also under `100`.

Here we need a redistribution on data to make sampling difficult. 
Previously, a repartition is added automatically before `sort`. Now `range` has 
correct output partition info, so the repattition must be added manually.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188994080
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id.desc)
+  // Range has range partitioning in its output now. To have a range 
shuffle, we
+  // need to run a repartition first.
+  val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
--- End diff --

i see, so the `100` and `300` in this test are coupled with the physical 
execution. I feel the right way to test this is, instead of hardcoding `100` 
and `300`, we should have `a` and `b`, and check if `b > 3 * a` or something.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188979880
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id.desc)
+  // Range has range partitioning in its output now. To have a range 
shuffle, we
+  // need to run a repartition first.
+  val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
--- End diff --

This is a good point.

This is query plan and partition size for `spark.range(0, n, 1, 
1).repartition(10).sort('id.desc)`, when we set 
`SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION` to 1:

```
== Physical Plan ==
*(2) Sort [id#15L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#15L DESC NULLS LAST, 4)
   +- Exchange RoundRobinPartitioning(10)
  +- *(1) Range (0, 1, step=1, splits=1)

1666, 3766, 2003, 2565
```

`spark.range(0, n, 1, 10).sort('id.desc)`:

```
== Physical Plan ==
*(2) Sort [id#13L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#13L DESC NULLS LAST, 4)
   +- *(1) Range (0, 1, step=1, splits=10)

(2835, 2469, 2362, 2334)
```

Because `repartition` shuffles data with `RoundRobinPartitioning`, I guess 
that it makes the worse sampling for range exchange. Without `repartition`, 
`Range`'s output is already range partitioning, so it can get sampling leading 
better range boundaries.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188972061
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id.desc)
+  // Range has range partitioning in its output now. To have a range 
shuffle, we
+  // need to run a repartition first.
+  val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
--- End diff --

hmm, isn't `spark.range(0, n, 1, 10)` almost same as `spark.range(0, n, 1, 
1).repartition(10)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188953639
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id.desc)
+  // Range has range partitioning in its output now. To have a range 
shuffle, we
+  // need to run a repartition first.
+  val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
--- End diff --

This test uses `SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION` to change 
sample size per partition and check the chi-sq value. It samples just 1 point 
so the chi-sq value is expected to be high.

If we change it from 1 to 10 partition, the chi-sq value will changed too. 
Should we do this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188914786
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id.desc)
+  // Range has range partitioning in its output now. To have a range 
shuffle, we
+  // need to run a repartition first.
+  val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
--- End diff --

then can we change the code to `spark.range(0, n, 1, 10)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r10937
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5239,8 +5239,8 @@ def test_complex_groupby(self):
 expected2 = df.groupby().agg(sum(df.v))
 
 # groupby one column and one sql expression
-result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v))
-expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v))
+result3 = df.groupby(df.id, df.v % 
2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2)
--- End diff --

oh I see now, sorry, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188878956
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5239,8 +5239,8 @@ def test_complex_groupby(self):
 expected2 = df.groupby().agg(sum(df.v))
 
 # groupby one column and one sql expression
-result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v))
-expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v))
+result3 = df.groupby(df.id, df.v % 
2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2)
--- End diff --

They are already ordered by `df.id`. This is the partial data:

```
Expected:
id  (v % 2)  sum(v)
00  0.0   120.0
10  1.0   125.0
21  1.0   125.0
31  0.0   130.0
42  0.0   130.0
52  1.0   135.0
```

```
Result:
id  (v % 2)  sum(v)
00  0.0   120.0
10  1.0   125.0
21  0.0   130.0
31  1.0   125.0
42  0.0   130.0
52  1.0   135.0
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188872101
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5239,8 +5239,8 @@ def test_complex_groupby(self):
 expected2 = df.groupby().agg(sum(df.v))
 
 # groupby one column and one sql expression
-result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v))
-expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v))
+result3 = df.groupby(df.id, df.v % 
2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2)
--- End diff --

thanks for your detailed explanation. Anyway, can we just use 
`orderBy(df.id)` instead of `orderBy(df.id,  df.v % 2)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188871199
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 ---
@@ -34,14 +34,13 @@ class DebuggingSuite extends SparkFunSuite with 
SharedSQLContext {
 
   test("debugCodegen") {
 val res = 
codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
--- End diff --

Ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188870491
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id.desc)
+  // Range has range partitioning in its output now. To have a range 
shuffle, we
+  // need to run a repartition first.
+  val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
--- End diff --

Because it is just one partition now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188868610
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 ---
@@ -34,14 +34,13 @@ class DebuggingSuite extends SparkFunSuite with 
SharedSQLContext {
 
   test("debugCodegen") {
 val res = 
codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
--- End diff --

can we change to `groupBy('id * 2)`? We should try our best to keep what to 
test, and keep 2 the shuffle in this query.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188868678
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 ---
@@ -34,14 +34,13 @@ class DebuggingSuite extends SparkFunSuite with 
SharedSQLContext {
 
   test("debugCodegen") {
 val res = 
codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
-assert(res.contains("Subtree 1 / 2"))
-assert(res.contains("Subtree 2 / 2"))
+assert(res.contains("Subtree 1 / 1"))
 assert(res.contains("Object[]"))
   }
 
   test("debugCodegenStringSeq") {
 val res = 
codegenStringSeq(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
-assert(res.length == 2)
+assert(res.length == 1)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188868165
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id.desc)
+  // Range has range partitioning in its output now. To have a range 
shuffle, we
+  // need to run a repartition first.
+  val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
--- End diff --

I'm also confused here, the range output ordering is `'id.asc`, which 
doesn't match `'id.desc` how can we avoid shuffle?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-16 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188659776
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5239,8 +5239,8 @@ def test_complex_groupby(self):
 expected2 = df.groupby().agg(sum(df.v))
 
 # groupby one column and one sql expression
-result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v))
-expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v))
+result3 = df.groupby(df.id, df.v % 
2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2)
--- End diff --

Simply said, the data ordering between `result3` and `expect3` are 
different now.

Previous query plan for two queries:
```
== Physical Plan ==
!AggregateInPandas [id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#40], [sum(v#8)], 
[id#0L, (v#8 % 2.0)#40 AS (v % 2)#22, sum(v)#21 AS sum(v)#23]
+- *(2) Sort [id#0L ASC NULLS FIRST, (v#8 % 2.0) AS (v#8 % 2.0)#40 ASC 
NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#40, 200)
  +- Generate explode(vs#4), [id#0L], false, [v#8]
 +- *(1) Project [id#0L, array((20.0 + cast(id#0L as double)), 
(21.0 + cast(id#0L as double)), (22.0 + cast(id#0L as double)), (23.0 + 
cast(id#0L as double)), (24.0 + cast(id#0L as double)), (25.0 + cast(id#0L as 
double)), (26.0 + cast(id#0L as double)), (27.0 + cast(id#0L as double)), (28.0 
+ cast(id#0L as double)), (29.0 + cast(id#0L as double))) AS vs#4]
+- *(1) Range (0, 10, step=1, splits=8)
```

```
== Physical Plan ==
*(3) HashAggregate(keys=[id#0L, (v#8 % 2.0)#36], functions=[sum(v#8)], 
output=[id#0L, (v % 2)#31, sum(v)#32])
+- Exchange hashpartitioning(id#0L, (v#8 % 2.0)#36, 200)
   +- *(2) HashAggregate(keys=[id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#36], 
functions=[partial_sum(v#8)], output=[id#0L, (v#8 % 2.0)#36, sum#38])
  +- Generate explode(vs#4), [id#0L], false, [v#8]
 +- *(1) Project [id#0L, array((20.0 + cast(id#0L as double)), 
(21.0 + cast(id#0L as double)), (22.0 + cast(id#0L as double)), (23.0 + 
cast(id#0L as double)), (24.0 + cast(id#0L as double)), (25.0 + cast(id#0L as 
double)), (26.0 + cast(id#0L as double)), (27.0 + cast(id#0L as double)), (28.0 
+ cast(id#0L as double)), (29.0 + cast(id#0L as double))) AS vs#4]
+- *(1) Range (0, 10, step=1, splits=8)
```

Both have `Exchange hashpartitioning` which produces the same data 
distribution previously. Notice `Sort` doesn't change data ordering because 200 
partitions make sparse distribution.

Current query plan:
```
!AggregateInPandas [id#388L, (v#396 % 2.0) AS (v#396 % 2.0)#453], 
[sum(v#396)], [id#388L, (v#396 % 2.0)#453 AS (v % 2)#438, sum(v)#437 AS s
um(v)#439]
+- *(2) Sort [id#388L ASC NULLS FIRST, (v#396 % 2.0) AS (v#396 % 2.0)#453 
ASC NULLS FIRST], false, 0
   +- Generate explode(vs#392), [id#388L], false, [v#396]
  +- *(1) Project [id#388L, array((20.0 + cast(id#388L as double)), 
(21.0 + cast(id#388L as double)), (22.0 + cast(id#388L as double)),
 (23.0 + cast(id#388L as double)), (24.0 + cast(id#388L as double)), (25.0 
+ cast(id#388L as double)), (26.0 + cast(id#388L as double)), (2
7.0 + cast(id#388L as double)), (28.0 + cast(id#388L as double)), (29.0 + 
cast(id#388L as double))) AS vs#392]
 +- *(1) Range (0, 10, step=1, splits=4)
```

```
== Physical Plan ==
*(2) HashAggregate(keys=[id#388L, (v#396 % 2.0)#454], 
functions=[sum(v#396)], output=[id#388L, (v % 2)#447, sum(v)#448])
+- *(2) HashAggregate(keys=[id#388L, (v#396 % 2.0) AS (v#396 % 2.0)#454], 
functions=[partial_sum(v#396)], output=[id#388L, (v#396 % 2.0)#45
4, sum#456])
   +- Generate explode(vs#392), [id#388L], false, [v#396]
  +- *(1) Project [id#388L, array((20.0 + cast(id#388L as double)), 
(21.0 + cast(id#388L as double)), (22.0 + cast(id#388L as double)),
 (23.0 + cast(id#388L as double)), (24.0 + cast(id#388L as double)), (25.0 
+ cast(id#388L as double)), (26.0 + cast(id#388L as double)), (2
7.0 + cast(id#388L as double)), (28.0 + cast(id#388L as double)), (29.0 + 
cast(id#388L as double))) AS vs#392]
 +- *(1) Range (0, 10, step=1, splits=4)
```

`Exchange` is not there anymore. They have same data distribution. But now 
`Sort` changes data ordering.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-16 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188629669
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5239,8 +5239,8 @@ def test_complex_groupby(self):
 expected2 = df.groupby().agg(sum(df.v))
 
 # groupby one column and one sql expression
-result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v))
-expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v))
+result3 = df.groupby(df.id, df.v % 
2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2)
--- End diff --

why not just `orderBy(df.id)`? and why was this not failing before this fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188131133
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id.desc)
+  // Range has range partitioning in its output now. To have a range 
shuffle, we
+  // need to run a repartition first.
+  val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
--- End diff --

This test requires a range shuffle. Previously `range` has unknown output 
partitioning/ordering, so there is a range shuffle inserted before `sort`.

For now `range` has an ordered output, so planner doesn't insert the 
shuffle we need here.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188130563
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext {
   requiredOrdering = Seq(orderingA, orderingB),
   shouldHaveSort = true)
   }
+
+  test("SPARK-24242: RangeExec should have correct output ordering and 
partitioning") {
+val df = spark.range(10)
+val rangeExec = df.queryExecution.executedPlan.collect {
+  case r: RangeExec => r
+}
+val range = df.queryExecution.optimizedPlan.collect {
+  case r: Range => r
+}
+assert(rangeExec.head.outputOrdering == range.head.outputOrdering)
+assert(rangeExec.head.outputPartitioning ==
+  RangePartitioning(rangeExec.head.outputOrdering, 
df.rdd.getNumPartitions))
+
+val rangeInOnePartition = spark.range(1, 10, 1, 1)
+val rangeExecInOnePartition = 
rangeInOnePartition.queryExecution.executedPlan.collect {
+  case r: RangeExec => r
+}
+assert(rangeExecInOnePartition.head.outputPartitioning == 
SinglePartition)
--- End diff --

Ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-14 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188082738
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
 def computeChiSquareTest(): Double = {
   val n = 1
   // Trigger a sort
-  val data = spark.range(0, n, 1, 1).sort('id.desc)
+  // Range has range partitioning in its output now. To have a range 
shuffle, we
+  // need to run a repartition first.
+  val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
--- End diff --

sorry, I am just curious, why is `sort('id.desc)` not causing a shuffle? 
Shouldn't it be ordered by `'id.asc` without the sort?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-14 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r188081824
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext {
   requiredOrdering = Seq(orderingA, orderingB),
   shouldHaveSort = true)
   }
+
+  test("SPARK-24242: RangeExec should have correct output ordering and 
partitioning") {
+val df = spark.range(10)
+val rangeExec = df.queryExecution.executedPlan.collect {
+  case r: RangeExec => r
+}
+val range = df.queryExecution.optimizedPlan.collect {
+  case r: Range => r
+}
+assert(rangeExec.head.outputOrdering == range.head.outputOrdering)
+assert(rangeExec.head.outputPartitioning ==
+  RangePartitioning(rangeExec.head.outputOrdering, 
df.rdd.getNumPartitions))
+
+val rangeInOnePartition = spark.range(1, 10, 1, 1)
+val rangeExecInOnePartition = 
rangeInOnePartition.queryExecution.executedPlan.collect {
+  case r: RangeExec => r
+}
+assert(rangeExecInOnePartition.head.outputPartitioning == 
SinglePartition)
--- End diff --

should we also add a test case for the 0 partition case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187951662
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext {
   requiredOrdering = Seq(orderingA, orderingB),
   shouldHaveSort = true)
   }
+
+  test("SPARK-24242: RangeExec should have correct output ordering") {
+val df = spark.range(10).orderBy("id")
--- End diff --

I used it to check Sort elimination. Removed now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187900810
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext {
   requiredOrdering = Seq(orderingA, orderingB),
   shouldHaveSort = true)
   }
+
+  test("SPARK-24242: RangeExec should have correct output ordering") {
+val df = spark.range(10).orderBy("id")
--- End diff --

why do we put an `orderBy` in the query?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187900467
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext {
   requiredOrdering = Seq(orderingA, orderingB),
   shouldHaveSort = true)
   }
+
+  test("SPARK-24242: RangeExec should have correct output ordering") {
--- End diff --

ordering and partitioning


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187569307
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2767,7 +2767,12 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
   def count(): Long = withAction("count", 
groupBy().count().queryExecution) { plan =>
-plan.executeCollect().head.getLong(0)
+val collected = plan.executeCollect()
+if (collected.isEmpty) {
+  0
+} else {
+  collected.head.getLong(0)
+}
--- End diff --

Right, making sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-11 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187565401
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2767,7 +2767,12 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
   def count(): Long = withAction("count", 
groupBy().count().queryExecution) { plan =>
-plan.executeCollect().head.getLong(0)
+val collected = plan.executeCollect()
+if (collected.isEmpty) {
+  0
+} else {
+  collected.head.getLong(0)
+}
--- End diff --

I think it is caused by returning `SinglePartition` when there is no data 
(and therefore no partition). So I think we should fix it there and not here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187529583
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2767,7 +2767,12 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
   def count(): Long = withAction("count", 
groupBy().count().queryExecution) { plan =>
-plan.executeCollect().head.getLong(0)
+val collected = plan.executeCollect()
+if (collected.isEmpty) {
+  0
+} else {
+  collected.head.getLong(0)
+}
--- End diff --

`spark.range(-10, -9, -20, 1).select("id").count` in `DataFrameRangeSuite` 
causes exception here. `plan.executeCollect().head` pulls empty iterator by 
calling `next`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187502412
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -345,6 +345,16 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
 
   override val output: Seq[Attribute] = range.output
 
+  override def outputOrdering: Seq[SortOrder] = range.outputOrdering
+
+  override def outputPartitioning: Partitioning = {
+if (numSlices == 1) {
+  SinglePartition
--- End diff --

`SinglePartition` is better


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187489196
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -345,6 +345,16 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
 
   override val output: Seq[Attribute] = range.output
 
+  override def outputOrdering: Seq[SortOrder] = range.outputOrdering
+
+  override def outputPartitioning: Partitioning = {
+if (numSlices == 1) {
+  SinglePartition
--- End diff --

Which one is better? `SinglePartition` or 
`RangePartitioning(outputOrdering, 1)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187480496
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -345,6 +345,8 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
 
   override val output: Seq[Attribute] = range.output
 
+  override def outputOrdering: Seq[SortOrder] = range.outputOrdering
--- End diff --

ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187370831
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -345,6 +345,8 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
 
   override val output: Seq[Attribute] = range.output
 
+  override def outputOrdering: Seq[SortOrder] = range.outputOrdering
--- End diff --

since we are here, shall we also implement `outputPartitioning`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187353657
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -621,6 +621,17 @@ class PlannerSuite extends SharedSQLContext {
   requiredOrdering = Seq(orderingA, orderingB),
   shouldHaveSort = true)
   }
+
+  test("RangeExec should have correct output ordering") {
--- End diff --

Ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-10 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21291#discussion_r187350669
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -621,6 +621,17 @@ class PlannerSuite extends SharedSQLContext {
   requiredOrdering = Seq(orderingA, orderingB),
   shouldHaveSort = true)
   }
+
+  test("RangeExec should have correct output ordering") {
--- End diff --

nit: start with `SPARK-24242: ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...

2018-05-10 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/21291

[SPARK-24242][SQL] RangeExec should have correct outputOrdering

## What changes were proposed in this pull request?

Logical `Range` node has been added with `outputOrdering` recently. It's 
used to eliminate redundant `Sort` during optimization. However, this 
`outputOrdering` doesn't not propagate to physical `RangeExec` node.

## How was this patch tested?

Added test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-24242

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21291.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21291


commit 30b42d542b76c7e2e3b755515efbb530bb06509a
Author: Liang-Chi Hsieh 
Date:   2018-05-10T10:30:48Z

RangeExec should have correct outputOrdering.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org