cloud-fan commented on a change in pull request #32072: URL: https://github.com/apache/spark/pull/32072#discussion_r616361074
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala ########## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ + +/** + * Decorrelate the inner query by eliminating outer references and create domain joins. + * The implementation is based on the paper: Unnesting Arbitrary Queries by Thomas Neumann + * and Alfons Kemper. https://dl.gi.de/handle/20.500.12116/2418. + * + * A correlated subquery can be viewed as a "dependent" nested loop join between the outer and + * the inner query. For each row produced by the outer query, we bind the [[OuterReference]]s in + * in the inner query with the corresponding values in the row, and then evaluate the inner query. + * + * Dependent Join + * :- Outer Query + * +- Inner Query + * + * If the [[OuterReference]]s are bound to the same value, the inner query will return the same + * result. Based on this, we can reduce the times to evaluate the inner query by first getting + * all distinct values of the [[OuterReference]]s. + * + * Normal Join + * :- Outer Query + * +- Dependent Join + * :- Inner Query + * +- Distinct Aggregate (outer_ref1, outer_ref2, ...) + * +- Outer Query + * + * The distinct aggregate of the outer references is called a "domain", and the dependent join + * between the inner query and the domain is called a "domain join". We need to push down the + * domain join through the inner query until there is no outer reference in the sub-tree and + * the domain join will turn into a normal join. + * + * The decorrelation function returns a new query plan with optional placeholder [[DomainJoins]]s + * added and a list of join conditions with the outer query. [[DomainJoin]]s need to be rewritten + * into actual inner join between the inner query sub-tree and the outer query. + * + * E.g. decorrelate an inner query with equality predicates: + * + * SELECT (SELECT MIN(b) FROM t1 WHERE t2.c = t1.a) FROM t2 + * + * Aggregate [] [min(b)] Aggregate [a] [min(b), a] + * +- Filter (outer(c) = a) => +- Relation [t1] + * +- Relation [t1] + * + * Join conditions: [c = a] + * + * E.g. decorrelate an inner query with non-equality predicates: + * + * SELECT (SELECT MIN(b) FROM t1 WHERE t2.c > t1.a) FROM t2 + * + * Aggregate [] [min(b)] Aggregate [c'] [min(b), c'] + * +- Filter (outer(c) > a) => +- Filter (c' > a) + * +- Relation [t1] +- DomainJoin [c'] + * +- Relation [t1] + * + * Join conditions: [c <=> c'] + */ +object DecorrelateInnerQuery extends PredicateHelper { + + /** + * Check if an expression contains any attribute. Note OuterReference is a + * leaf node and will not be found here. + */ + private def containsAttribute(expression: Expression): Boolean = { + expression.find(_.isInstanceOf[Attribute]).isDefined + } + + /** + * Check if an expression can be pulled up over an [[Aggregate]] without changing the + * semantics of the plan. The expression must be an equality predicate that guarantees + * one-to-one mapping between inner and outer attributes. More specifically, one side + * of the predicate must be an attribute and another side of the predicate must not + * contain other attributes from the inner query. + * For example: + * (a = outer(c)) -> true + * (a > outer(c)) -> false + * (a + b = outer(c)) -> false + * (a = outer(c) - b) -> false + */ + private def canPullUpOverAgg(expression: Expression): Boolean = expression match { Review comment: Can we reuse this method in `CheckAnalysis`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
