tanelk commented on a change in pull request #29871:
URL: https://github.com/apache/spark/pull/29871#discussion_r495614536



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
##########
@@ -40,30 +40,35 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with 
PredicateHelper {
     if (!conf.cboEnabled || !conf.joinReorderEnabled) {
       plan
     } else {
-      val result = plan transformDown {
-        // Start reordering with a joinable item, which is an InnerLike join 
with conditions.
-        // Avoid reordering if a join hint is present.
-        case j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE) =>
-          reorder(j, j.output)
-        case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond), 
JoinHint.NONE))
-          if projectList.forall(_.isInstanceOf[Attribute]) =>
-          reorder(p, p.output)
-      }
+      val result = plan transformDown applyLocally
       // After reordering is finished, convert OrderedJoin back to Join.
       result transform {
         case OrderedJoin(left, right, jt, cond) => Join(left, right, jt, cond, 
JoinHint.NONE)
       }
     }
   }
 
+  private val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
+    // Start reordering with a joinable item, which is an InnerLike join with 
conditions.
+    // Avoid reordering if a join hint is present.
+    case j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE) =>
+      reorder(j, j.output)
+    case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond), 
JoinHint.NONE))
+      if projectList.forall(_.isInstanceOf[Attribute]) =>
+      reorder(p, p.output)
+
+  }
+
   private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan 
= {
     val (items, conditions) = extractInnerJoins(plan)
     val result =
       // Do reordering if the number of items is appropriate and join 
conditions exist.
       // We also need to check if costs of all items can be evaluated.
       if (items.size > 2 && items.size <= conf.joinReorderDPThreshold && 
conditions.nonEmpty &&
           items.forall(_.stats.rowCount.isDefined)) {
-        JoinReorderDP.search(conf, items, conditions, output)
+        // Transform and sort all the items to guarantee idempotence
+        val transformedItems = items.map(_ transformDown 
applyLocally).sortBy(_.semanticHash())
+        JoinReorderDP.search(conf, transformedItems, conditions, output)

Review comment:
       This is outdated

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
##########
@@ -342,7 +347,10 @@ object JoinReorderDP extends PredicateHelper with Logging {
     /** Get the cost of the root node of this plan tree. */
     def rootCost(conf: SQLConf): Cost = {
       if (itemIds.size > 1) {
-        val rootStats = plan.stats
+        val rootStats = (plan transform {
+          // The OrderedJoin does not propagate stats
+          case OrderedJoin(left, right, jt, cond) => Join(left, right, jt, 
cond, JoinHint.NONE)
+        }).stats

Review comment:
       This is outdated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to