tanelk commented on a change in pull request #29871:
URL: https://github.com/apache/spark/pull/29871#discussion_r495116397
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
##########
@@ -40,30 +40,35 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with
PredicateHelper {
if (!conf.cboEnabled || !conf.joinReorderEnabled) {
plan
} else {
- val result = plan transformDown {
- // Start reordering with a joinable item, which is an InnerLike join
with conditions.
- // Avoid reordering if a join hint is present.
- case j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE) =>
- reorder(j, j.output)
- case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond),
JoinHint.NONE))
- if projectList.forall(_.isInstanceOf[Attribute]) =>
- reorder(p, p.output)
- }
+ val result = plan transformDown applyLocally
// After reordering is finished, convert OrderedJoin back to Join.
result transform {
case OrderedJoin(left, right, jt, cond) => Join(left, right, jt, cond,
JoinHint.NONE)
}
}
}
+ private val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
+ // Start reordering with a joinable item, which is an InnerLike join with
conditions.
+ // Avoid reordering if a join hint is present.
+ case j @ Join(_, _, _: InnerLike, Some(cond), JoinHint.NONE) =>
+ reorder(j, j.output)
+ case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond),
JoinHint.NONE))
+ if projectList.forall(_.isInstanceOf[Attribute]) =>
+ reorder(p, p.output)
+
+ }
+
private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan
= {
val (items, conditions) = extractInnerJoins(plan)
val result =
// Do reordering if the number of items is appropriate and join
conditions exist.
// We also need to check if costs of all items can be evaluated.
if (items.size > 2 && items.size <= conf.joinReorderDPThreshold &&
conditions.nonEmpty &&
items.forall(_.stats.rowCount.isDefined)) {
- JoinReorderDP.search(conf, items, conditions, output)
+ // Transform and sort all the items to guarantee idempotence
+ val transformedItems = items.map(_ transformDown
applyLocally).sortBy(_.semanticHash())
+ JoinReorderDP.search(conf, transformedItems, conditions, output)
Review comment:
This is the most relevant change - previously we reordered the joins
from top to bottom. Now we reorder the joins in the lower groups first and then
reorder the ones on the top.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]