cloud-fan commented on code in PR #46143:
URL: https://github.com/apache/spark/pull/46143#discussion_r2618463096
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -2053,17 +2053,124 @@ object PushDownPredicates extends Rule[LogicalPlan] {
object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with
PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
+ // Match the column ordering. It's not something we guarantee but seems like
it
+ // could be useful especially for folks writing out to "raw" files.
+ private def matchColumnOrdering(original: Seq[NamedExpression], updated:
Seq[NamedExpression]):
+ Seq[NamedExpression] = {
+ val nameToIndex = original.map(_.name).zipWithIndex.toMap
+ val response = updated.toArray
+ updated.foreach { e =>
+ val idx = nameToIndex(e.name)
+ response(idx) = e
+ }
+ response.toSeq
+ }
+
val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
// SPARK-13473: We can't push the predicate down when the underlying
projection output non-
// deterministic field(s). Non-deterministic expressions are essentially
stateful. This
// implies that, for a given input row, the output are determined by the
expression's initial
// state and all the input rows processed before. In another word, the
order of input rows
// matters for non-deterministic expressions, while pushing down
predicates changes the order.
// This also applies to Aggregate.
- case Filter(condition, project @ Project(fields, grandChild))
+ case f @ Filter(condition, project @ Project(fields, grandChild))
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild,
condition) =>
val aliasMap = getAliasMap(project)
- project.copy(child = Filter(replaceAlias(condition, aliasMap),
grandChild))
+ // Projection aliases that the filter references
+ val expensiveFilterAliasesBuf = mutable.ArrayBuffer.empty[Alias]
+ // Projection aliases that are not used in the filter, so we don't need
to push
+ var leftBehindAliases: Map[ExprId, Alias] = aliasMap.baseMap.values.map {
+ kv => (kv._2.exprId, kv._2)
+ }.toMap
+ val (cheap, expensive) = splitConjunctivePredicates(condition).partition
{ cond =>
+ if (!SQLConf.get.avoidDoubleFilterEvail) {
+ // If we are always pushing through short circuit the check.
+ true
+ } else {
+ val (replaced, usedAliases) = replaceAliasWhileTracking(cond,
aliasMap)
+ if (cond == replaced) {
+ // If nothing changes then our alias is cheap
+ true
+ } else if (usedAliases.iterator.map(_._2.child.expensive).forall(_
== false)) {
+ // If it's cheap we can push it because it might eliminate more
data quickly and
+ // it may also be something which could be evaluated at the
storage layer.
+ // We may wish to improve this heuristic in the future.
+ true
+ } else {
+ // This is an expensive replacement so we see resolve it
+ usedAliases.iterator.foreach {
+ e: (Attribute, Alias) =>
+ if (leftBehindAliases.contains(e._2.exprId)) {
+ leftBehindAliases = leftBehindAliases.removed(e._2.exprId)
+ expensiveFilterAliasesBuf += e._2
+ }
+ }
+ false
+ }
+ }
+ }
+
+ val expensiveFilterAliases = expensiveFilterAliasesBuf.toArray.toSeq
+ val expensiveLeftBehindAliases: Map[ExprId, Alias] =
leftBehindAliases.filter {
+ kv => kv._2.child.expensive
+ }
+
+ if (expensiveFilterAliases.isEmpty) {
+ // If the filter does not reference any expensive aliases then we
+ // just push the filter while resolving the non-expensive aliases.
+ project.copy(child = Filter(replaceAlias(condition, aliasMap), child =
grandChild))
+ } else if (leftBehindAliases.isEmpty) {
+ // If there are no left behind aliases then we've used all of the
aliases in our filter.
+ // We don't need to introduce any more projections since we already
have the "minimal"
+ // projection for our expensive conditions, but we might be able to
move the "cheap"
+ // conditions.
+ if (!cheap.isEmpty) {
+ val expensiveFilterCond = expensive.reduce(And)
+ // Replace the aliases in the cheap filter since we are pushing them
down.
+ val cheapFilterCond = replaceAlias(cheap.reduce(And), aliasMap)
+ Filter(expensiveFilterCond, project.copy(child =
Filter(cheapFilterCond, grandChild)))
+ } else {
+ f
+ }
+ } else if (expensiveLeftBehindAliases.isEmpty) {
+ // If there are no expensive left behind aliases then introducing
+ // a new projection might not be worth the overhead.
+ project.copy(child = Filter(replaceAlias(condition, aliasMap), child =
grandChild))
+ } else {
+ // If we're here then we have a filter with an expensive alias along
with
+ // some left behind aliases which are also expensive to compute. In
this case we will
+ // add a new projection which computes the expensive aliases needed
for the filter then
+ // filter and then the final top level projection with the remaining
aliases.
+ val projectionReferences = project.references.toSeq
+
+ val expensiveCondition = expensive.reduce(And)
+
+ // If we don't have any cheap conditions we do:
+ // non-expensive-filter-used Project -> Filter -> Used Expensive
Projections
+ // If we do we go:
+ // non-expensive-filter-used Project -> Filter -> Used Expensive
Projections -> Cheap Filter
+ val filterChild = if (cheap.isEmpty) {
+ project.copy(
+ projectList = (projectionReferences ++ expensiveFilterAliases))
+ } else {
+ val cheapCondition = replaceAlias(cheap.reduce(And), aliasMap)
+ project.copy(
+ projectList = (projectionReferences ++ expensiveFilterAliases),
+ child = Filter(cheapCondition, grandChild))
+ }
+
+
+ // Our top level projection is going to be the existing projection
minus the
+ // leftBehindAliases + all of the output fields which we've
effectively pushed down.
+ val pushedAliasNames = project.output.filter(
+ a => !leftBehindAliases.contains(a.exprId)).toSeq
+ val remainingAliasesToEval = leftBehindAliases.iterator.map(_._2).toSeq
+ val topLevelProjection = project.copy(
+ projectList = matchColumnOrdering(fields, pushedAliasNames ++
remainingAliasesToEval),
+ child = Filter(expensiveCondition, child = filterChild)
+ )
+ topLevelProjection
+ }
Review Comment:
OK I'm fine with a single rule, but we need better code comment as the
algorithm will be more complicated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]