Repository: spark
Updated Branches:
  refs/heads/master 4e107fdb7 -> fe65361b0


[SPARK-22042][FOLLOW-UP][SQL] ReorderJoinPredicates can break when child's 
partitioning is not decided

## What changes were proposed in this pull request?

This is a followup PR of https://github.com/apache/spark/pull/19257 where 
gatorsmile had left couple comments wrt code style.

## How was this patch tested?

Doesn't change any functionality. Will depend on build to see if no checkstyle 
rules are violated.

Author: Tejas Patil <tej...@fb.com>

Closes #20041 from tejasapatil/followup_19257.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe65361b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe65361b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe65361b

Branch: refs/heads/master
Commit: fe65361b0579777c360dee1d7f633f28df0c6aeb
Parents: 4e107fd
Author: Tejas Patil <tej...@fb.com>
Authored: Thu Dec 21 09:22:08 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Dec 21 09:22:08 2017 -0800

----------------------------------------------------------------------
 .../execution/exchange/EnsureRequirements.scala | 82 ++++++++++----------
 .../spark/sql/sources/BucketedReadSuite.scala   |  4 +-
 2 files changed, 44 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe65361b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 82f0b9f..c8e236b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -252,54 +252,56 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
     operator.withNewChildren(children)
   }
 
-  /**
-   * When the physical operators are created for JOIN, the ordering of join 
keys is based on order
-   * in which the join keys appear in the user query. That might not match 
with the output
-   * partitioning of the join node's children (thus leading to extra sort / 
shuffle being
-   * introduced). This rule will change the ordering of the join keys to match 
with the
-   * partitioning of the join nodes' children.
-   */
-  def reorderJoinPredicates(plan: SparkPlan): SparkPlan = {
-    def reorderJoinKeys(
-        leftKeys: Seq[Expression],
-        rightKeys: Seq[Expression],
-        leftPartitioning: Partitioning,
-        rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = 
{
-
-      def reorder(expectedOrderOfKeys: Seq[Expression],
-                  currentOrderOfKeys: Seq[Expression]): (Seq[Expression], 
Seq[Expression]) = {
-        val leftKeysBuffer = ArrayBuffer[Expression]()
-        val rightKeysBuffer = ArrayBuffer[Expression]()
+  private def reorder(
+      leftKeys: Seq[Expression],
+      rightKeys: Seq[Expression],
+      expectedOrderOfKeys: Seq[Expression],
+      currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) 
= {
+    val leftKeysBuffer = ArrayBuffer[Expression]()
+    val rightKeysBuffer = ArrayBuffer[Expression]()
 
-        expectedOrderOfKeys.foreach(expression => {
-          val index = currentOrderOfKeys.indexWhere(e => 
e.semanticEquals(expression))
-          leftKeysBuffer.append(leftKeys(index))
-          rightKeysBuffer.append(rightKeys(index))
-        })
-        (leftKeysBuffer, rightKeysBuffer)
-      }
+    expectedOrderOfKeys.foreach(expression => {
+      val index = currentOrderOfKeys.indexWhere(e => 
e.semanticEquals(expression))
+      leftKeysBuffer.append(leftKeys(index))
+      rightKeysBuffer.append(rightKeys(index))
+    })
+    (leftKeysBuffer, rightKeysBuffer)
+  }
 
-      if (leftKeys.forall(_.deterministic) && 
rightKeys.forall(_.deterministic)) {
-        leftPartitioning match {
-          case HashPartitioning(leftExpressions, _)
-            if leftExpressions.length == leftKeys.length &&
-              leftKeys.forall(x => 
leftExpressions.exists(_.semanticEquals(x))) =>
-            reorder(leftExpressions, leftKeys)
+  private def reorderJoinKeys(
+      leftKeys: Seq[Expression],
+      rightKeys: Seq[Expression],
+      leftPartitioning: Partitioning,
+      rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = {
+    if (leftKeys.forall(_.deterministic) && rightKeys.forall(_.deterministic)) 
{
+      leftPartitioning match {
+        case HashPartitioning(leftExpressions, _)
+          if leftExpressions.length == leftKeys.length &&
+            leftKeys.forall(x => leftExpressions.exists(_.semanticEquals(x))) 
=>
+          reorder(leftKeys, rightKeys, leftExpressions, leftKeys)
 
-          case _ => rightPartitioning match {
-            case HashPartitioning(rightExpressions, _)
-              if rightExpressions.length == rightKeys.length &&
-                rightKeys.forall(x => 
rightExpressions.exists(_.semanticEquals(x))) =>
-              reorder(rightExpressions, rightKeys)
+        case _ => rightPartitioning match {
+          case HashPartitioning(rightExpressions, _)
+            if rightExpressions.length == rightKeys.length &&
+              rightKeys.forall(x => 
rightExpressions.exists(_.semanticEquals(x))) =>
+            reorder(leftKeys, rightKeys, rightExpressions, rightKeys)
 
-            case _ => (leftKeys, rightKeys)
-          }
+          case _ => (leftKeys, rightKeys)
         }
-      } else {
-        (leftKeys, rightKeys)
       }
+    } else {
+      (leftKeys, rightKeys)
     }
+  }
 
+  /**
+   * When the physical operators are created for JOIN, the ordering of join 
keys is based on order
+   * in which the join keys appear in the user query. That might not match 
with the output
+   * partitioning of the join node's children (thus leading to extra sort / 
shuffle being
+   * introduced). This rule will change the ordering of the join keys to match 
with the
+   * partitioning of the join nodes' children.
+   */
+  private def reorderJoinPredicates(plan: SparkPlan): SparkPlan = {
     plan.transformUp {
       case BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, 
condition, left,
         right) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/fe65361b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 9025859..fb61fa7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -620,7 +620,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
                 |) ab
                 |JOIN table2 c
                 |ON ab.i = c.i
-                |""".stripMargin),
+              """.stripMargin),
           sql("""
                 |SELECT a.i, a.j, a.k, c.i, c.j, c.k
                 |FROM bucketed_table a
@@ -628,7 +628,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
                 |ON a.i = b.i
                 |JOIN table2 c
                 |ON a.i = c.i
-                |""".stripMargin))
+              """.stripMargin))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to