[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195886078 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -70,7 +70,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan } override def outputPartitioning: Partitioning = child.outputPartitioning match { -case h: HashPartitioning => h.copy(expressions = h.expressions.map(updateAttr)) +case e: Expression => updateAttr(e).asInstanceOf[Partitioning] case other => other --- End diff -- LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195883736 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2270,4 +2270,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1)) checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1)) } + + test("SPARK-24556: ReusedExchange should rewrite output partitioning for RangePartitioning") { --- End diff -- please also mention cached table in PR title --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195883713 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2270,4 +2270,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1)) checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1)) } + + test("SPARK-24556: ReusedExchange should rewrite output partitioning for RangePartitioning") { --- End diff -- this is not an end-to-end test, let's put it in `PlannerSuite` and also test cached table. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195654141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- Looks correct. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195652026 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- @viirya From `updateAttribute`, `relation.cachedPlan.output` and `relation.output` one to one. ``` private def updateAttribute(expr: Expression): Expression = { val attrMap = AttributeMap(relation.cachedPlan.output.zip(relation.output)) } ``` It means "[i#54, j#55, m#58, n#59]" corresponds to "[i#5, j#6, m#15, n#16]", so we can always replace `HashPartitioning(i#5)` to `HashPartitioning(i#54)`. Any idea? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195630214 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- For `PartitioningCollection`, I think it is harder to treat it like `HashPartitioning` and `RangePartitioning` when replacing attributes. In above example, `PartitioningCollection` contains `HashPartitioning(i#5)` and `HashPartitioning(m#15)`, the output of `InMemoryRelation` is `[i#54, j#55, m#58, n#59]`. Can we still replace attributes based on the location of attribute in output? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195463301 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- `PartitioningCollection` should be considered. Like below case: ``` spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) spark.conf.set("spark.sql.codegen.wholeStage", false) val df1 = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j").as("t1") val df2 = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("m", "n").as("t2") val d = df1.join(df2, $"t1.i" === $"t2.m") d.cache val d1 = d.as("t3") val d2 = d.as("t4") d1.join(d2, $"t3.i" === $"t4.i").explain ``` ``` SortMergeJoin [i#5], [i#54], Inner :- InMemoryTableScan [i#5, j#6, m#15, n#16] : +- InMemoryRelation [i#5, j#6, m#15, n#16], CachedRDDBuilder : +- SortMergeJoin [i#5], [m#15], Inner : :- Sort [i#5 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i#5, 10) : : +- LocalTableScan [i#5, j#6] : +- Sort [m#15 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(m#15, 10) :+- LocalTableScan [m#15, n#16] +- Sort [i#54 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i#54, 10) +- InMemoryTableScan [i#54, j#55, m#58, n#59] +- InMemoryRelation [i#54, j#55, m#58, n#59], CachedRDDBuilder +- SortMergeJoin [i#5], [m#15], Inner :- Sort [i#5 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i#5, 10) : +- LocalTableScan [i#5, j#6] +- Sort [m#15 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(m#15, 10) +- LocalTableScan [m#15, n#16] ``` `Exchange hashpartitioning(i#54, 10)` is extra shuffle. How do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195420136 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- Hmm, `HashPartitioning` and `RangePartitioning` can affect later sorting and shuffle. But for `BroadcastPartitioning`, seems to me no such benefit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r19535 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- Oh, like `HashedRelationBroadcastMode`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195361725 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- `BroadcastPartitioning`'s `BroadcastMode` contains `Expression`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195356161 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- I think `PartitioningCollection` is for an operator that has multiple children. `BroadcastPartitioning` is not `Expression`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195355721 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- yes, you're right @viirya , thanks. Then, I'd propose something like: ``` relation.cachedPlan.outputPartitioning match { case e: Expression => updateAttribute(e) case other => other } ``` what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195354829 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- Good suggestion, thanks @mgaido91. @viirya Do we need consider below: `PartitioningCollection` in `InMemoryTableScanExec.outputPartitioning`, which is also `Expression`? `PartitioningCollection` and `BroadcastPartitioning` in `ReusedExchangeExec.outputPartitioning`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195352300 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- Not all `Partitioning` are `Expression`. Only `HashPartitioning` and `RangePartitioning` are. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195349283 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- why not just `updateAttribute(r)`? Moreover, in order to avoid the same issue in the future with other cases, have you considered doing something like: ``` updateAttribute(relation.cachedPlan.outputPartitioning) `` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21564#discussion_r195348233 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -170,6 +170,8 @@ case class InMemoryTableScanExec( override def outputPartitioning: Partitioning = { relation.cachedPlan.outputPartitioning match { case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case r: RangePartitioning => +r.copy(ordering = r.ordering.map(updateAttribute(_).asInstanceOf[SortOrder])) --- End diff -- Not sure why `RangePartitioning` isn't included at first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21564: [SPARK-24556][SQL] ReusedExchange should rewrite ...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/21564 [SPARK-24556][SQL] ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning ## What changes were proposed in this pull request? Currently, ReusedExchange would rewrite output partitioning if child's partitioning is HashPartitioning, but it does not do the same when child's partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, see: ``` val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j") val df1 = df.as("t1") val df2 = df.as("t2") val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right") t.cache.orderBy($"t2.j").explain ``` Before: ``` == Physical Plan == *(1) Sort [j#14 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200) +- InMemoryTableScan [i#5, j#6, i#13, j#14] +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as... : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) :+- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) ``` Better plan should avoid ```Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)```, like: ``` == Physical Plan == *(1) Sort [j#14 ASC NULLS FIRST], true, 0 +- InMemoryTableScan [i#5, j#6, i#13, j#14] +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) :+- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) ``` ## How was this patch tested? Add new tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-24556 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21564.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21564 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org