Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/21083#discussion_r182034339
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -661,21 +661,51 @@ object InferFiltersFromConstraints extends
Rule[LogicalPlan] with PredicateHelpe
}
case join @ Join(left, right, joinType, conditionOpt) =>
- // Only consider constraints that can be pushed down completely to
either the left or the
- // right child
- val constraints = join.allConstraints.filter { c =>
- c.references.subsetOf(left.outputSet) ||
c.references.subsetOf(right.outputSet)
- }
- // Remove those constraints that are already enforced by either the
left or the right child
- val additionalConstraints = constraints -- (left.constraints ++
right.constraints)
- val newConditionOpt = conditionOpt match {
- case Some(condition) =>
- val newFilters = additionalConstraints --
splitConjunctivePredicates(condition)
- if (newFilters.nonEmpty) Option(And(newFilters.reduce(And),
condition)) else None
- case None =>
- additionalConstraints.reduceOption(And)
+ joinType match {
+ // For inner join, we can infer additional filters for both sides.
LeftSemi is kind of an
+ // inner join, it just drops the right side in the final output.
+ case _: InnerLike | LeftSemi =>
+ val allConstraints = getAllConstraints(left, right, conditionOpt)
+ val newLeft = inferNewFilter(left, allConstraints)
+ val newRight = inferNewFilter(right, allConstraints)
+ join.copy(left = newLeft, right = newRight)
+
+ // For right outer join, we can only infer additional filters for
left side.
+ case RightOuter =>
+ val allConstraints = getAllConstraints(left, right, conditionOpt)
+ val newLeft = inferNewFilter(left, allConstraints)
+ join.copy(left = newLeft)
+
+ // For left join, we can only infer additional filters for right
side.
+ case LeftOuter | LeftAnti =>
+ val allConstraints = getAllConstraints(left, right, conditionOpt)
+ val newRight = inferNewFilter(right, allConstraints)
+ join.copy(right = newRight)
+
+ case _ => join
}
- if (newConditionOpt.isDefined) Join(left, right, joinType,
newConditionOpt) else join
+ }
+
+ private def getAllConstraints(
+ left: LogicalPlan,
+ right: LogicalPlan,
+ conditionOpt: Option[Expression]): Set[Expression] = {
+ val baseConstraints = left.constraints.union(right.constraints)
+
.union(conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil).toSet)
+
baseConstraints.union(ConstraintsUtils.inferAdditionalConstraints(baseConstraints))
+ }
+
+ private def inferNewFilter(plan: LogicalPlan, constraints:
Set[Expression]): LogicalPlan = {
--- End diff --
may we return here the additional constraints instead of the new plan, so
that in L669 and similar we can copy the plan only if it is needed?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]