[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21291 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r189154765 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -55,7 +55,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { val plan = df.queryExecution.executedPlan assert(plan.find(p => p.isInstanceOf[WholeStageCodegenExec] && - p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) +p.asInstanceOf[WholeStageCodegenExec].child.collect { --- End diff -- ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r189151725 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -55,7 +55,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { val plan = df.queryExecution.executedPlan assert(plan.find(p => p.isInstanceOf[WholeStageCodegenExec] && - p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) +p.asInstanceOf[WholeStageCodegenExec].child.collect { --- End diff -- same here, can we change the `groupBy` instead of the test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r189149095 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id.desc) + // Range has range partitioning in its output now. To have a range shuffle, we + // need to run a repartition first. + val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc) --- End diff -- By `spark.range(0, n, 1, 10).sort('id.desc)`, there is no 3 times liner relation between `a` and `b`. As shown above, this is also evenly distribution, the chi-sq value is also under `100`. Here we need a redistribution on data to make sampling difficult. Previously, a repartition is added automatically before `sort`. Now `range` has correct output partition info, so the repattition must be added manually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188994080 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id.desc) + // Range has range partitioning in its output now. To have a range shuffle, we + // need to run a repartition first. + val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc) --- End diff -- i see, so the `100` and `300` in this test are coupled with the physical execution. I feel the right way to test this is, instead of hardcoding `100` and `300`, we should have `a` and `b`, and check if `b > 3 * a` or something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188979880 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id.desc) + // Range has range partitioning in its output now. To have a range shuffle, we + // need to run a repartition first. + val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc) --- End diff -- This is a good point. This is query plan and partition size for `spark.range(0, n, 1, 1).repartition(10).sort('id.desc)`, when we set `SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION` to 1: ``` == Physical Plan == *(2) Sort [id#15L DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(id#15L DESC NULLS LAST, 4) +- Exchange RoundRobinPartitioning(10) +- *(1) Range (0, 1, step=1, splits=1) 1666, 3766, 2003, 2565 ``` `spark.range(0, n, 1, 10).sort('id.desc)`: ``` == Physical Plan == *(2) Sort [id#13L DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(id#13L DESC NULLS LAST, 4) +- *(1) Range (0, 1, step=1, splits=10) (2835, 2469, 2362, 2334) ``` Because `repartition` shuffles data with `RoundRobinPartitioning`, I guess that it makes the worse sampling for range exchange. Without `repartition`, `Range`'s output is already range partitioning, so it can get sampling leading better range boundaries. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188972061 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id.desc) + // Range has range partitioning in its output now. To have a range shuffle, we + // need to run a repartition first. + val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc) --- End diff -- hmm, isn't `spark.range(0, n, 1, 10)` almost same as `spark.range(0, n, 1, 1).repartition(10)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188953639 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id.desc) + // Range has range partitioning in its output now. To have a range shuffle, we + // need to run a repartition first. + val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc) --- End diff -- This test uses `SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION` to change sample size per partition and check the chi-sq value. It samples just 1 point so the chi-sq value is expected to be high. If we change it from 1 to 10 partition, the chi-sq value will changed too. Should we do this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188914786 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id.desc) + // Range has range partitioning in its output now. To have a range shuffle, we + // need to run a repartition first. + val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc) --- End diff -- then can we change the code to `spark.range(0, n, 1, 10)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r10937 --- Diff: python/pyspark/sql/tests.py --- @@ -5239,8 +5239,8 @@ def test_complex_groupby(self): expected2 = df.groupby().agg(sum(df.v)) # groupby one column and one sql expression -result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)) -expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)) +result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2) --- End diff -- oh I see now, sorry, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188878956 --- Diff: python/pyspark/sql/tests.py --- @@ -5239,8 +5239,8 @@ def test_complex_groupby(self): expected2 = df.groupby().agg(sum(df.v)) # groupby one column and one sql expression -result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)) -expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)) +result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2) --- End diff -- They are already ordered by `df.id`. This is the partial data: ``` Expected: id (v % 2) sum(v) 00 0.0 120.0 10 1.0 125.0 21 1.0 125.0 31 0.0 130.0 42 0.0 130.0 52 1.0 135.0 ``` ``` Result: id (v % 2) sum(v) 00 0.0 120.0 10 1.0 125.0 21 0.0 130.0 31 1.0 125.0 42 0.0 130.0 52 1.0 135.0 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188872101 --- Diff: python/pyspark/sql/tests.py --- @@ -5239,8 +5239,8 @@ def test_complex_groupby(self): expected2 = df.groupby().agg(sum(df.v)) # groupby one column and one sql expression -result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)) -expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)) +result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2) --- End diff -- thanks for your detailed explanation. Anyway, can we just use `orderBy(df.id)` instead of `orderBy(df.id, df.v % 2)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188871199 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala --- @@ -34,14 +34,13 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { test("debugCodegen") { val res = codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan) --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188870491 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id.desc) + // Range has range partitioning in its output now. To have a range shuffle, we + // need to run a repartition first. + val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc) --- End diff -- Because it is just one partition now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188868610 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala --- @@ -34,14 +34,13 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { test("debugCodegen") { val res = codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan) --- End diff -- can we change to `groupBy('id * 2)`? We should try our best to keep what to test, and keep 2 the shuffle in this query. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188868678 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala --- @@ -34,14 +34,13 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext { test("debugCodegen") { val res = codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan) -assert(res.contains("Subtree 1 / 2")) -assert(res.contains("Subtree 2 / 2")) +assert(res.contains("Subtree 1 / 1")) assert(res.contains("Object[]")) } test("debugCodegenStringSeq") { val res = codegenStringSeq(spark.range(10).groupBy("id").count().queryExecution.executedPlan) -assert(res.length == 2) +assert(res.length == 1) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188868165 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id.desc) + // Range has range partitioning in its output now. To have a range shuffle, we + // need to run a repartition first. + val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc) --- End diff -- I'm also confused here, the range output ordering is `'id.asc`, which doesn't match `'id.desc` how can we avoid shuffle? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188659776 --- Diff: python/pyspark/sql/tests.py --- @@ -5239,8 +5239,8 @@ def test_complex_groupby(self): expected2 = df.groupby().agg(sum(df.v)) # groupby one column and one sql expression -result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)) -expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)) +result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2) --- End diff -- Simply said, the data ordering between `result3` and `expect3` are different now. Previous query plan for two queries: ``` == Physical Plan == !AggregateInPandas [id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#40], [sum(v#8)], [id#0L, (v#8 % 2.0)#40 AS (v % 2)#22, sum(v)#21 AS sum(v)#23] +- *(2) Sort [id#0L ASC NULLS FIRST, (v#8 % 2.0) AS (v#8 % 2.0)#40 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#40, 200) +- Generate explode(vs#4), [id#0L], false, [v#8] +- *(1) Project [id#0L, array((20.0 + cast(id#0L as double)), (21.0 + cast(id#0L as double)), (22.0 + cast(id#0L as double)), (23.0 + cast(id#0L as double)), (24.0 + cast(id#0L as double)), (25.0 + cast(id#0L as double)), (26.0 + cast(id#0L as double)), (27.0 + cast(id#0L as double)), (28.0 + cast(id#0L as double)), (29.0 + cast(id#0L as double))) AS vs#4] +- *(1) Range (0, 10, step=1, splits=8) ``` ``` == Physical Plan == *(3) HashAggregate(keys=[id#0L, (v#8 % 2.0)#36], functions=[sum(v#8)], output=[id#0L, (v % 2)#31, sum(v)#32]) +- Exchange hashpartitioning(id#0L, (v#8 % 2.0)#36, 200) +- *(2) HashAggregate(keys=[id#0L, (v#8 % 2.0) AS (v#8 % 2.0)#36], functions=[partial_sum(v#8)], output=[id#0L, (v#8 % 2.0)#36, sum#38]) +- Generate explode(vs#4), [id#0L], false, [v#8] +- *(1) Project [id#0L, array((20.0 + cast(id#0L as double)), (21.0 + cast(id#0L as double)), (22.0 + cast(id#0L as double)), (23.0 + cast(id#0L as double)), (24.0 + cast(id#0L as double)), (25.0 + cast(id#0L as double)), (26.0 + cast(id#0L as double)), (27.0 + cast(id#0L as double)), (28.0 + cast(id#0L as double)), (29.0 + cast(id#0L as double))) AS vs#4] +- *(1) Range (0, 10, step=1, splits=8) ``` Both have `Exchange hashpartitioning` which produces the same data distribution previously. Notice `Sort` doesn't change data ordering because 200 partitions make sparse distribution. Current query plan: ``` !AggregateInPandas [id#388L, (v#396 % 2.0) AS (v#396 % 2.0)#453], [sum(v#396)], [id#388L, (v#396 % 2.0)#453 AS (v % 2)#438, sum(v)#437 AS s um(v)#439] +- *(2) Sort [id#388L ASC NULLS FIRST, (v#396 % 2.0) AS (v#396 % 2.0)#453 ASC NULLS FIRST], false, 0 +- Generate explode(vs#392), [id#388L], false, [v#396] +- *(1) Project [id#388L, array((20.0 + cast(id#388L as double)), (21.0 + cast(id#388L as double)), (22.0 + cast(id#388L as double)), (23.0 + cast(id#388L as double)), (24.0 + cast(id#388L as double)), (25.0 + cast(id#388L as double)), (26.0 + cast(id#388L as double)), (2 7.0 + cast(id#388L as double)), (28.0 + cast(id#388L as double)), (29.0 + cast(id#388L as double))) AS vs#392] +- *(1) Range (0, 10, step=1, splits=4) ``` ``` == Physical Plan == *(2) HashAggregate(keys=[id#388L, (v#396 % 2.0)#454], functions=[sum(v#396)], output=[id#388L, (v % 2)#447, sum(v)#448]) +- *(2) HashAggregate(keys=[id#388L, (v#396 % 2.0) AS (v#396 % 2.0)#454], functions=[partial_sum(v#396)], output=[id#388L, (v#396 % 2.0)#45 4, sum#456]) +- Generate explode(vs#392), [id#388L], false, [v#396] +- *(1) Project [id#388L, array((20.0 + cast(id#388L as double)), (21.0 + cast(id#388L as double)), (22.0 + cast(id#388L as double)), (23.0 + cast(id#388L as double)), (24.0 + cast(id#388L as double)), (25.0 + cast(id#388L as double)), (26.0 + cast(id#388L as double)), (2 7.0 + cast(id#388L as double)), (28.0 + cast(id#388L as double)), (29.0 + cast(id#388L as double))) AS vs#392] +- *(1) Range (0, 10, step=1, splits=4) ``` `Exchange` is not there anymore. They have same data distribution. But now `Sort` changes data ordering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188629669 --- Diff: python/pyspark/sql/tests.py --- @@ -5239,8 +5239,8 @@ def test_complex_groupby(self): expected2 = df.groupby().agg(sum(df.v)) # groupby one column and one sql expression -result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)) -expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)) +result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2) --- End diff -- why not just `orderBy(df.id)`? and why was this not failing before this fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188131133 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id.desc) + // Range has range partitioning in its output now. To have a range shuffle, we + // need to run a repartition first. + val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc) --- End diff -- This test requires a range shuffle. Previously `range` has unknown output partitioning/ordering, so there is a range shuffle inserted before `sort`. For now `range` has an ordered output, so planner doesn't insert the shuffle we need here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188130563 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext { requiredOrdering = Seq(orderingA, orderingB), shouldHaveSort = true) } + + test("SPARK-24242: RangeExec should have correct output ordering and partitioning") { +val df = spark.range(10) +val rangeExec = df.queryExecution.executedPlan.collect { + case r: RangeExec => r +} +val range = df.queryExecution.optimizedPlan.collect { + case r: Range => r +} +assert(rangeExec.head.outputOrdering == range.head.outputOrdering) +assert(rangeExec.head.outputPartitioning == + RangePartitioning(rangeExec.head.outputOrdering, df.rdd.getNumPartitions)) + +val rangeInOnePartition = spark.range(1, 10, 1, 1) +val rangeExecInOnePartition = rangeInOnePartition.queryExecution.executedPlan.collect { + case r: RangeExec => r +} +assert(rangeExecInOnePartition.head.outputPartitioning == SinglePartition) --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188082738 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id.desc) + // Range has range partitioning in its output now. To have a range shuffle, we + // need to run a repartition first. + val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc) --- End diff -- sorry, I am just curious, why is `sort('id.desc)` not causing a shuffle? Shouldn't it be ordered by `'id.asc` without the sort? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r188081824 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext { requiredOrdering = Seq(orderingA, orderingB), shouldHaveSort = true) } + + test("SPARK-24242: RangeExec should have correct output ordering and partitioning") { +val df = spark.range(10) +val rangeExec = df.queryExecution.executedPlan.collect { + case r: RangeExec => r +} +val range = df.queryExecution.optimizedPlan.collect { + case r: Range => r +} +assert(rangeExec.head.outputOrdering == range.head.outputOrdering) +assert(rangeExec.head.outputPartitioning == + RangePartitioning(rangeExec.head.outputOrdering, df.rdd.getNumPartitions)) + +val rangeInOnePartition = spark.range(1, 10, 1, 1) +val rangeExecInOnePartition = rangeInOnePartition.queryExecution.executedPlan.collect { + case r: RangeExec => r +} +assert(rangeExecInOnePartition.head.outputPartitioning == SinglePartition) --- End diff -- should we also add a test case for the 0 partition case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187951662 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext { requiredOrdering = Seq(orderingA, orderingB), shouldHaveSort = true) } + + test("SPARK-24242: RangeExec should have correct output ordering") { +val df = spark.range(10).orderBy("id") --- End diff -- I used it to check Sort elimination. Removed now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187900810 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext { requiredOrdering = Seq(orderingA, orderingB), shouldHaveSort = true) } + + test("SPARK-24242: RangeExec should have correct output ordering") { +val df = spark.range(10).orderBy("id") --- End diff -- why do we put an `orderBy` in the query? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187900467 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext { requiredOrdering = Seq(orderingA, orderingB), shouldHaveSort = true) } + + test("SPARK-24242: RangeExec should have correct output ordering") { --- End diff -- ordering and partitioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187569307 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2767,7 +2767,12 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => -plan.executeCollect().head.getLong(0) +val collected = plan.executeCollect() +if (collected.isEmpty) { + 0 +} else { + collected.head.getLong(0) +} --- End diff -- Right, making sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187565401 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2767,7 +2767,12 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => -plan.executeCollect().head.getLong(0) +val collected = plan.executeCollect() +if (collected.isEmpty) { + 0 +} else { + collected.head.getLong(0) +} --- End diff -- I think it is caused by returning `SinglePartition` when there is no data (and therefore no partition). So I think we should fix it there and not here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187529583 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2767,7 +2767,12 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => -plan.executeCollect().head.getLong(0) +val collected = plan.executeCollect() +if (collected.isEmpty) { + 0 +} else { + collected.head.getLong(0) +} --- End diff -- `spark.range(-10, -9, -20, 1).select("id").count` in `DataFrameRangeSuite` causes exception here. `plan.executeCollect().head` pulls empty iterator by calling `next`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187502412 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -345,6 +345,16 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) override val output: Seq[Attribute] = range.output + override def outputOrdering: Seq[SortOrder] = range.outputOrdering + + override def outputPartitioning: Partitioning = { +if (numSlices == 1) { + SinglePartition --- End diff -- `SinglePartition` is better --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187489196 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -345,6 +345,16 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) override val output: Seq[Attribute] = range.output + override def outputOrdering: Seq[SortOrder] = range.outputOrdering + + override def outputPartitioning: Partitioning = { +if (numSlices == 1) { + SinglePartition --- End diff -- Which one is better? `SinglePartition` or `RangePartitioning(outputOrdering, 1)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187480496 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -345,6 +345,8 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) override val output: Seq[Attribute] = range.output + override def outputOrdering: Seq[SortOrder] = range.outputOrdering --- End diff -- ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187370831 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -345,6 +345,8 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) override val output: Seq[Attribute] = range.output + override def outputOrdering: Seq[SortOrder] = range.outputOrdering --- End diff -- since we are here, shall we also implement `outputPartitioning`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187353657 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -621,6 +621,17 @@ class PlannerSuite extends SharedSQLContext { requiredOrdering = Seq(orderingA, orderingB), shouldHaveSort = true) } + + test("RangeExec should have correct output ordering") { --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187350669 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -621,6 +621,17 @@ class PlannerSuite extends SharedSQLContext { requiredOrdering = Seq(orderingA, orderingB), shouldHaveSort = true) } + + test("RangeExec should have correct output ordering") { --- End diff -- nit: start with `SPARK-24242: ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/21291 [SPARK-24242][SQL] RangeExec should have correct outputOrdering ## What changes were proposed in this pull request? Logical `Range` node has been added with `outputOrdering` recently. It's used to eliminate redundant `Sort` during optimization. However, this `outputOrdering` doesn't not propagate to physical `RangeExec` node. ## How was this patch tested? Added test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-24242 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21291.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21291 commit 30b42d542b76c7e2e3b755515efbb530bb06509a Author: Liang-Chi HsiehDate: 2018-05-10T10:30:48Z RangeExec should have correct outputOrdering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org