[GitHub] spark pull request #21083: [SPARK-23564][SQL] infer additional filters from ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21083 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21083: [SPARK-23564][SQL] infer additional filters from ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21083#discussion_r183299130 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -664,53 +662,52 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe } case join @ Join(left, right, joinType, conditionOpt) => - // Only consider constraints that can be pushed down completely to either the left or the - // right child - val constraints = join.allConstraints.filter { c => -c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet) - } - // Remove those constraints that are already enforced by either the left or the right child - val additionalConstraints = constraints -- (left.constraints ++ right.constraints) - val newConditionOpt = conditionOpt match { -case Some(condition) => - val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt -case None => - additionalConstraints.reduceOption(And) - } - // Infer filter for left/right outer joins - val newLeftOpt = joinType match { -case RightOuter if newConditionOpt.isDefined => - val inferredConstraints = left.getRelevantConstraints( -left.constraints - .union(right.constraints) - .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) - val newFilters = inferredConstraints -.filterNot(left.constraints.contains) -.reduceLeftOption(And) - newFilters.map(Filter(_, left)) -case _ => None - } - val newRightOpt = joinType match { -case LeftOuter if newConditionOpt.isDefined => - val inferredConstraints = right.getRelevantConstraints( -right.constraints - .union(left.constraints) - .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) - val newFilters = inferredConstraints -.filterNot(right.constraints.contains) -.reduceLeftOption(And) - newFilters.map(Filter(_, right)) -case _ => None - } + joinType match { +// For inner join, we can infer additional filters for both sides. LeftSemi is kind of an +// inner join, it just drops the right side in the final output. +case _: InnerLike | LeftSemi => + val allConstraints = getAllConstraints(left, right, conditionOpt) + val newLeft = inferNewFilter(left, allConstraints) + val newRight = inferNewFilter(right, allConstraints) + join.copy(left = newLeft, right = newRight) - if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt)) -|| newLeftOpt.isDefined || newRightOpt.isDefined) { -Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), joinType, newConditionOpt) - } else { -join +// For right outer join, we can only infer additional filters for left side. +case RightOuter => + val allConstraints = getAllConstraints(left, right, conditionOpt) + val newLeft = inferNewFilter(left, allConstraints) + join.copy(left = newLeft) + +// For left join, we can only infer additional filters for right side. +case LeftOuter | LeftAnti => + val allConstraints = getAllConstraints(left, right, conditionOpt) + val newRight = inferNewFilter(right, allConstraints) + join.copy(right = newRight) + +case _ => join } } + + private def getAllConstraints( + left: LogicalPlan, + right: LogicalPlan, + conditionOpt: Option[Expression]): Set[Expression] = { +val baseConstraints = left.constraints.union(right.constraints) + .union(conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil).toSet) +baseConstraints.union(inferAdditionalConstraints(baseConstraints)) + } + + private def inferNewFilter(plan: LogicalPlan, constraints: Set[Expression]): LogicalPlan = { +val newPredicates = constraints + .union(constructIsNotNullConstraints(constraints, plan.output)) --- End diff -- ``` case _: InnerLike | LeftSemi => val allConstraints = getAllConstraints(left, right, conditionOpt) val newLeft = inferNewFilter(left, allConstraints) val newRight = inferNewFilter(right, allConstraints) join.copy(left = newLeft, right
[GitHub] spark pull request #21083: [SPARK-23564][SQL] infer additional filters from ...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21083#discussion_r183295844 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -664,53 +662,52 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe } case join @ Join(left, right, joinType, conditionOpt) => - // Only consider constraints that can be pushed down completely to either the left or the - // right child - val constraints = join.allConstraints.filter { c => -c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet) - } - // Remove those constraints that are already enforced by either the left or the right child - val additionalConstraints = constraints -- (left.constraints ++ right.constraints) - val newConditionOpt = conditionOpt match { -case Some(condition) => - val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt -case None => - additionalConstraints.reduceOption(And) - } - // Infer filter for left/right outer joins - val newLeftOpt = joinType match { -case RightOuter if newConditionOpt.isDefined => - val inferredConstraints = left.getRelevantConstraints( -left.constraints - .union(right.constraints) - .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) - val newFilters = inferredConstraints -.filterNot(left.constraints.contains) -.reduceLeftOption(And) - newFilters.map(Filter(_, left)) -case _ => None - } - val newRightOpt = joinType match { -case LeftOuter if newConditionOpt.isDefined => - val inferredConstraints = right.getRelevantConstraints( -right.constraints - .union(left.constraints) - .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) - val newFilters = inferredConstraints -.filterNot(right.constraints.contains) -.reduceLeftOption(And) - newFilters.map(Filter(_, right)) -case _ => None - } + joinType match { +// For inner join, we can infer additional filters for both sides. LeftSemi is kind of an +// inner join, it just drops the right side in the final output. +case _: InnerLike | LeftSemi => + val allConstraints = getAllConstraints(left, right, conditionOpt) + val newLeft = inferNewFilter(left, allConstraints) + val newRight = inferNewFilter(right, allConstraints) + join.copy(left = newLeft, right = newRight) - if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt)) -|| newLeftOpt.isDefined || newRightOpt.isDefined) { -Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), joinType, newConditionOpt) - } else { -join +// For right outer join, we can only infer additional filters for left side. +case RightOuter => + val allConstraints = getAllConstraints(left, right, conditionOpt) + val newLeft = inferNewFilter(left, allConstraints) + join.copy(left = newLeft) + +// For left join, we can only infer additional filters for right side. +case LeftOuter | LeftAnti => + val allConstraints = getAllConstraints(left, right, conditionOpt) + val newRight = inferNewFilter(right, allConstraints) + join.copy(right = newRight) + +case _ => join } } + + private def getAllConstraints( + left: LogicalPlan, + right: LogicalPlan, + conditionOpt: Option[Expression]): Set[Expression] = { +val baseConstraints = left.constraints.union(right.constraints) + .union(conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil).toSet) +baseConstraints.union(inferAdditionalConstraints(baseConstraints)) + } + + private def inferNewFilter(plan: LogicalPlan, constraints: Set[Expression]): LogicalPlan = { +val newPredicates = constraints + .union(constructIsNotNullConstraints(constraints, plan.output)) --- End diff -- Can we put the code ``` constraints .union(inferAdditionalConstraints(constraints)) .union(constructIsNotNullConstraints(constraints, output)) .filter { c => c.references.nonEmpty && c.references.subsetOf(outputSet) &&