Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/14452#discussion_r94001153
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -1167,3 +1173,211 @@ object RemoveRepetitionFromGroupExpressions extends
Rule[LogicalPlan] {
a.copy(groupingExpressions = newGrouping)
}
}
+
+/**
+ * Optimizes the logical plans wrapped in SubqueryAlias and operators on
them.
+ * The SubqueryAlias which are remaining in optimization phase are common
subqueries,
+ * i.e., they are duplicate in the whole query plan. The logical plans
wrapped in
+ * SubqueryAlias will be executed individually later. However, some
operators such as
+ * Project and Filter can be optimized with the wrapped logical plans.
Thus, this rule
+ * considers the optimization of the wrapped logical plans and operators
on SubqueryAlias.
+ */
+case class OptimizeCommonSubqueries(optimizer: Optimizer)
+ extends Rule[LogicalPlan] with PredicateHelper {
+ // Optimized the subqueries which all have a Project parent node and the
same results.
+ private def optimizeProjectWithSubqueries(
+ plan: LogicalPlan,
+ keyPlan: LogicalPlan,
+ subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = {
+ plan transform {
+ case p @ Project(pList, s @ SubqueryAlias(alias, subquery, v, true))
+ if s.sameResult(keyPlan) =>
+ val pListForAll: Seq[NamedExpression] = subqueries.flatMap { case
Project(pList, child) =>
+ val rewrites = buildRewrites(child, subquery)
+ pList.map(pushToOtherPlan(_, rewrites))
+ }
+
+ val newSubquery = Project(pListForAll, subquery)
+ val optimized = optimizer.execute(newSubquery)
+ // Check if any optimization is performed.
+ if (optimized.sameResult(newSubquery)) {
+ // No optimization happens. Let's keep original subquery.
+ p
+ } else {
+ Project(pList.map(_.toAttribute), SubqueryAlias(alias,
newSubquery, v, true))
+ }
+ }
+ }
+
+ /**
+ * Maps Attributes from the source side to the corresponding Attribute
on the target side.
+ */
+ private def buildRewrites(source: LogicalPlan, target: LogicalPlan):
AttributeMap[Attribute] = {
+ assert(source.output.size == target.output.size)
+ AttributeMap(source.output.zip(target.output))
+ }
+
+ /**
+ * Rewrites an expression so that it can be pushed to another
LogicalPlan.
+ */
+ private def pushToOtherPlan[A <: Expression](e: A, rewrites:
AttributeMap[Attribute]) = {
+ val result = e transformUp {
+ case a: Attribute => rewrites.get(a).getOrElse(a)
+ }
+
+ // We must promise the compiler that we did not discard the names in
the case of project
+ // expressions. This is safe since the only transformation is from
Attribute => Attribute.
+ result.asInstanceOf[A]
+ }
+
+ private def optimizeFilterWithSubqueries(
+ plan: LogicalPlan,
+ keyPlan: LogicalPlan,
+ subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = {
+ var pushdownConds =
splitConjunctivePredicates(subqueries(0).asInstanceOf[Filter].condition)
+ subqueries.tail.foreach {
+ case Filter(otherCond, child) =>
+ val rewrites = buildRewrites(child,
subqueries(0).asInstanceOf[Filter].child)
+ // We can't simply push down all conditions from other Filter by
concatenating them with
--- End diff --
This part has been extracted out as #15558 and can be removed if that PR is
merged.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]