cloud-fan commented on a change in pull request #26629: [SPARK-29768][SQL] 
Column pruning through nondeterministic expressions 
URL: https://github.com/apache/spark/pull/26629#discussion_r350209881
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ##########
 @@ -74,24 +95,70 @@ object PhysicalOperation extends PredicateHelper {
       case other =>
         (None, Nil, other, AttributeMap(Seq()))
     }
+}
 
-  private def collectAliases(fields: Seq[Expression]): 
AttributeMap[Expression] =
-    AttributeMap(fields.collect {
-      case a: Alias => (a.toAttribute, a.child)
-    })
-
-  private def substitute(aliases: AttributeMap[Expression])(expr: Expression): 
Expression = {
-    expr.transform {
-      case a @ Alias(ref: AttributeReference, name) =>
-        aliases.get(ref)
-          .map(Alias(_, name)(a.exprId, a.qualifier))
-          .getOrElse(a)
+/**
+ * A variant of [[PhysicalOperation]]. It matches any number of project or 
filter
+ * operations even if they are non-deterministic, as long as they satisfy the
+ * requirement of CollapseProject and CombineFilters.
+ */
+object ScanOperation extends OperationHelper with PredicateHelper {
+  type ScanReturnType = Option[(Option[Seq[NamedExpression]],
+    Seq[Expression], LogicalPlan, AttributeMap[Expression])]
 
-      case a: AttributeReference =>
-        aliases.get(a)
-          .map(Alias(_, a.name)(a.exprId, a.qualifier)).getOrElse(a)
+  def unapply(plan: LogicalPlan): Option[ReturnType] = {
+    collectProjectsAndFilters(plan) match {
+      case Some((fields, filters, child, _)) =>
+        Some((fields.getOrElse(child.output), filters, child))
+      case None => None
     }
   }
+
+  private def hasCommonNonDeterministic(expr: Seq[Expression], aliases: 
AttributeMap[Expression])
+    : Boolean = {
+    expr.exists(_.collect {
+      case Alias(ref: AttributeReference, _) if aliases.contains(ref) =>
+        aliases(ref)
+      case a: AttributeReference if aliases.contains(a) =>
+        aliases(a)
+    }.exists(!_.deterministic))
+  }
+
+  private def collectProjectsAndFilters(plan: LogicalPlan): ScanReturnType =
+      plan match {
+        case Project(fields, child) =>
+          collectProjectsAndFilters(child) match {
+            case Some((_, filters, other, aliases)) =>
+              if (!hasCommonNonDeterministic(fields, aliases)) {
+                val substitutedFields =
+                  
fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
+                Some((Some(substitutedFields), filters, other, 
collectAliases(substitutedFields)))
+              } else {
+                None
+              }
+            case None => None
+          }
+
+        case Filter(condition, child) =>
+          collectProjectsAndFilters(child) match {
+            case Some((fields, filters, other, aliases)) =>
+              if (filters.forall(_.deterministic) &&
 
 Review comment:
   ditto

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to