viirya commented on a change in pull request #29092:
URL: https://github.com/apache/spark/pull/29092#discussion_r504079357



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -868,6 +866,41 @@ object TransposeWindow extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Infers filters from [[Generate]], such that rows that would have been 
removed
+ * by this [[Generate]] can be removed earlier - before joins and in data 
sources.
+ */
+object InferFiltersFromGenerate extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+    // This rule does not infer filters from foldable expressions to avoid 
constant filters
+    // like 'size([1, 2, 3]) > 0'. These do not show up in child's constraints 
and
+    // then the idempotence will break.
+    case generate @ Generate(e, _, _, _, _, _)
+      if !e.deterministic || e.children.forall(_.foldable) => generate
+
+    case generate @ Generate(g, _, false, _, _, _) if canInferFilters(g) =>
+      // Exclude child's constraints to guarantee idempotency
+      val inferredFilters = ExpressionSet(
+        Seq(
+          GreaterThan(Size(g.children.head), Literal(0)),

Review comment:
       This can be useful to filter rows before join, but can we pushdown 
`Size` expression to datasource?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to