Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/9995#discussion_r45956686
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -675,34 +675,34 @@ object PushPredicateThroughGenerate extends
Rule[LogicalPlan] with PredicateHelp
}
/**
- * Push [[Filter]] operators through [[Aggregate]] operators. Parts of the
predicate that reference
- * attributes which are subset of group by attribute set of [[Aggregate]]
will be pushed beneath,
- * and the rest should remain above.
+ * Push [[Filter]] operators through [[Aggregate]] operators, iff the
filters reference only
+ * non-aggregate attributes (typically literals or grouping expressions).
*/
object PushPredicateThroughAggregate extends Rule[LogicalPlan] with
PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case filter @ Filter(condition,
- aggregate @ Aggregate(groupingExpressions, aggregateExpressions,
grandChild)) =>
-
- def hasAggregate(expression: Expression): Boolean = expression match
{
- case agg: AggregateExpression => true
- case other => expression.children.exists(hasAggregate)
- }
- // Create a map of Alias for expressions that does not have
AggregateExpression
- val aliasMap = AttributeMap(aggregateExpressions.collect {
- case a: Alias if !hasAggregate(a.child) => (a.toAttribute, a.child)
+ case filter @ Filter(condition, aggregate: Aggregate) =>
+ // Find all the aliased expressions in the aggregate list that don't
include any actual
+ // AggregateExpression, and create a map from the alias to the
expression
+ val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {
+ case a: Alias if
a.child.find(_.isInstanceOf[AggregateExpression]).isEmpty =>
+ (a.toAttribute, a.child)
})
- val (pushDown, stayUp) =
splitConjunctivePredicates(condition).partition { conjunct =>
- val replaced = replaceAlias(conjunct, aliasMap)
- replaced.references.subsetOf(grandChild.outputSet) &&
replaced.deterministic
+ // For each filter, expand the alias and check if the filter can be
evaluated using
+ // attributes produced by the aggregate operator's child operator.
+ val (pushDown, stayUp) =
splitConjunctivePredicates(condition).partition { cond =>
+ val replaced = replaceAlias(cond, aliasMap)
+ replaced.references.subsetOf(aggregate.child.outputSet) &&
replaced.deterministic
}
+
if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val replaced = replaceAlias(pushDownPredicate, aliasMap)
- val withPushdown = aggregate.copy(child = Filter(replaced,
grandChild))
- stayUp.reduceOption(And).map(Filter(_,
withPushdown)).getOrElse(withPushdown)
+ val newAggregate = aggregate.copy(child = Filter(replaced,
aggregate.child))
+ // If there is no more filter to stay up, just eliminate the
filter.
+ // Otherwise, create Filter(pushDownPredicate) -> Aggregate ->
Filter(stayUp).
--- End diff --
`Filter(stayUp) -> Aggregate -> Filter(pushDownPredicate)`? the `->` looks
like "having a child" to me.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]