viirya commented on a change in pull request #33958:
URL: https://github.com/apache/spark/pull/33958#discussion_r708952359
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
##########
@@ -26,46 +26,32 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
-trait OperationHelper {
- type ReturnType = (Seq[NamedExpression], Seq[Expression], LogicalPlan)
-
- protected def collectAliases(fields: Seq[Expression]):
AttributeMap[Expression] =
- AttributeMap(fields.collect {
- case a: Alias => (a.toAttribute, a.child)
- })
-
- protected def substitute(aliases: AttributeMap[Expression])(expr:
Expression): Expression = {
- // use transformUp instead of transformDown to avoid dead loop
- // in case of there's Alias whose exprId is the same as its child
attribute.
- expr.transformUp {
- case a @ Alias(ref: AttributeReference, name) =>
- aliases.get(ref)
- .map(Alias(_, name)(a.exprId, a.qualifier))
- .getOrElse(a)
-
- case a: AttributeReference =>
- aliases.get(a)
- .map(Alias(_, a.name)(a.exprId, a.qualifier)).getOrElse(a)
- }
- }
-}
+trait OperationHelper extends AliasHelper with PredicateHelper {
+ import
org.apache.spark.sql.catalyst.optimizer.CollapseProject.canCollapseExpressions
-/**
- * A pattern that matches any number of project or filter operations on top of
another relational
- * operator. All filter operators are collected and their conditions are
broken up and returned
- * together with the top project operator.
- * [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are
in-lined/substituted if
- * necessary.
- */
-object PhysicalOperation extends OperationHelper with PredicateHelper {
+ type ReturnType =
+ (Seq[NamedExpression], Seq[Expression], LogicalPlan)
+ type IntermediateType =
+ (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan,
AttributeMap[Alias])
def unapply(plan: LogicalPlan): Option[ReturnType] = {
- val (fields, filters, child, _) = collectProjectsAndFilters(plan)
+ val alwaysInline =
SQLConf.get.getConf(SQLConf.COLLAPSE_PROJECT_ALWAYS_INLINE)
+ val (fields, filters, child, _) = collectProjectsAndFilters(plan,
alwaysInline)
Some((fields.getOrElse(child.output), filters, child))
}
/**
- * Collects all deterministic projects and filters, in-lining/substituting
aliases if necessary.
+ * This legacy mode is for PhysicalOperation which has been there for years
and we want to be
+ * extremely safe to not change its behavior. There are two differences when
legacy mode is off:
+ * 1. We postpone the deterministic check to the very end, so that it's
more likely to collect
Review comment:
Could we add that it is postponed at `canCollapseExpressions`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]