tanelk commented on a change in pull request #29871:
URL: https://github.com/apache/spark/pull/29871#discussion_r495116397



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
##########
@@ -40,30 +40,35 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with 
PredicateHelper {
     if (!conf.cboEnabled || !conf.joinReorderEnabled) {
       plan
     } else {
-      val result = plan transformDown {
-        // Start reordering with a joinable item, which is an InnerLike join 
with conditions.
-        // Avoid reordering if a join hint is present.
-        case j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE) =>
-          reorder(j, j.output)
-        case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond), 
JoinHint.NONE))
-          if projectList.forall(_.isInstanceOf[Attribute]) =>
-          reorder(p, p.output)
-      }
+      val result = plan transformDown applyLocally
       // After reordering is finished, convert OrderedJoin back to Join.
       result transform {
         case OrderedJoin(left, right, jt, cond) => Join(left, right, jt, cond, 
JoinHint.NONE)
       }
     }
   }
 
+  private val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
+    // Start reordering with a joinable item, which is an InnerLike join with 
conditions.
+    // Avoid reordering if a join hint is present.
+    case j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE) =>
+      reorder(j, j.output)
+    case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond), 
JoinHint.NONE))
+      if projectList.forall(_.isInstanceOf[Attribute]) =>
+      reorder(p, p.output)
+
+  }
+
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan 
= {
     val (items, conditions) = extractInnerJoins(plan)
     val result =
       // Do reordering if the number of items is appropriate and join 
conditions exist.
       // We also need to check if costs of all items can be evaluated.
       if (items.size > 2 && items.size <= conf.joinReorderDPThreshold && 
conditions.nonEmpty &&
           items.forall(_.stats.rowCount.isDefined)) {
-        JoinReorderDP.search(conf, items, conditions, output)
+        // Transform and sort all the items to guarantee idempotence
+        val transformedItems = items.map(_ transformDown 
applyLocally).sortBy(_.semanticHash())
+        JoinReorderDP.search(conf, transformedItems, conditions, output)

Review comment:
       This is the most relevant change - previously we reordered the joins 
from top to bottom. Now we reorder the joins in the lower groups first and then 
reorder the ones on the top.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to