zml1206 commented on code in PR #44460:
URL: https://github.com/apache/spark/pull/44460#discussion_r1473727151


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1718,22 +1718,26 @@ object PushPredicateThroughNonJoin extends 
Rule[LogicalPlan] with PredicateHelpe
     // implies that, for a given input row, the output are determined by the 
expression's initial
     // state and all the input rows processed before. In another word, the 
order of input rows
     // matters for non-deterministic expressions, while pushing down 
predicates changes the order.
-    // This also applies to Aggregate.
     case Filter(condition, project @ Project(fields, grandChild))
       if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, 
condition) =>
       val aliasMap = getAliasMap(project)
       project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
 
+    // Push `Filter` operators through `Aggregate` operators. Parts of the 
predicates that can
+    // be beneath must satisfy the following conditions:
+    // 1. Grouping expressions are not empty.
+    // 2. Predicate expression is deterministic.
+    // 3. References of predicate expression are subset of aggregate's child.
     case filter @ Filter(condition, aggregate: Aggregate)
-      if aggregate.aggregateExpressions.forall(_.deterministic)
+      if aggregate.aggregateExpressions.exists(_.deterministic)
         && aggregate.groupingExpressions.nonEmpty =>
       val aliasMap = getAliasMap(aggregate)
 
       // For each filter, expand the alias and check if the filter can be 
evaluated using
       // attributes produced by the aggregate operator's child operator.
       val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{ cond =>
         val replaced = replaceAlias(cond, aliasMap)
-        cond.deterministic && !cond.throwable &&
+        replaced.deterministic && !cond.throwable &&

Review Comment:
   I think can push down throwable filter through aggregate, it seems does not 
affect exception thrown
   . What do you think? @cloud-fan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to