peter-toth commented on a change in pull request #32298: URL: https://github.com/apache/spark/pull/32298#discussion_r628046114
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{MULTI_SCALAR_SUBQUERY, SCALAR_SUBQUERY} + +/** + * This rule tries to merge multiple non-correlated [[ScalarSubquery]]s into a + * [[MultiScalarSubquery]] to compute multiple scalar values once. + * + * The process is the following: + * - While traversing through the plan each [[ScalarSubquery]] plan is tried to merge into the cache + * of already seen subquery plans. If merge is possible then cache is updated with the merged + * subquery plan, if not then the new subquery plan is added to the cache. + * - The original [[ScalarSubquery]] expression is replaced to a reference pointing to its cached + * version in this form: `GetStructField(MultiScalarSubquery(SubqueryReference(...)))`. + * - A second traversal checks if a [[SubqueryReference]] is pointing to a subquery plan that + * returns multiple values and either replaces only [[SubqueryReference]] to the cached plan or + * restores the whole expression to its original [[ScalarSubquery]] form. + * - [[ReuseSubquery]] rule makes sure that merged subqueries are computed once. + * + * Eg. the following query: + * + * SELECT + * (SELECT avg(a) FROM t GROUP BY b), + * (SELECT sum(b) FROM t GROUP BY b) + * + * is optimized from: + * + * Project [scalar-subquery#231 [] AS scalarsubquery()#241, + * scalar-subquery#232 [] AS scalarsubquery()#242L] + * : :- Aggregate [b#234], [avg(a#233) AS avg(a)#236] + * : : +- Relation default.t[a#233,b#234] parquet + * : +- Aggregate [b#240], [sum(b#240) AS sum(b)#238L] + * : +- Project [b#240] + * : +- Relation default.t[a#239,b#240] parquet Review comment: > The proposed rule augments two subqueries, makes them look identical, and hopes (a) column-pruning doesn't prune too aggressively and (b) physical de-dup could dedup them. In case (a) changes later and the two aggregate trees are not deduped in the physical plan, there could potentially be regressions -- each aggregation then becomes more expensive. In this PR the new `MergeScalarSubqueries` rule runs in a separate batch after column pruning, close to the end of optimization. This is by design to make sure no subsequent rule changes the structure of different instances of a merged subquery plan at different places in the logical plan differently. So the physical planing creates the same physical plan for these instances and there shouldn't be any dedup issues. I think probably the downside of my current PR is that the physical planning of merged subqueries happen multiple times (as many times as they they appear in the logical plan) and physical dedup comes only after that. This could be improved if we had subquery references in logical plan as well (something like `ReuseSubqueryExec`). But I think that's what your (1) is about. Move the merged subqueries to a special top logical plan node and add subquery references at places where they are actually used. > SELECT y FROM LATERAL VIEW explode(ARRAY(ARRAY(1), ARRAY(1, 2), ARRAY(1, 2, 3))) AS y WHERE ( SELECT COUNT(*) FROM LATERAL VIEW explode(y) AS element ) > 1 AND ( SELECT SUM(element) FROM LATERAL VIEW explode(y) AS element ) > 3 I noticed that such subqueries do not work for now. But they align with the language spec and has well defined semantics. Once we support them, we want your proposed rule to be able to speedup them as well. Ah ok, but what should be the optimized plan of that query? This looks like we have 2 correlated subqueries and (2) makes perfect sense to merge them. But I don't think we need lateral views, just take the following query: ``` SELECT (SELECT avg(a) FROM t WHERE t.a = outer.a), (SELECT sum(b) FROM t WHERE t.a = outer.a) FROM t AS outer ``` which is ``` Project [scalar-subquery#231 [a#233] AS scalarsubquery(a)#243, scalar-subquery#232 [a#233] AS scalarsubquery(a)#244L] : :- Aggregate [avg(a#239) AS avg(a)#236] : : +- Filter (a#239 = outer(a#233)) : : +- SubqueryAlias spark_catalog.default.t : : +- Relation default.t[a#239,b#240] parquet : +- Aggregate [sum(b#242) AS sum(b)#238L] : +- Filter (a#241 = outer(a#233)) : +- SubqueryAlias spark_catalog.default.t : +- Relation default.t[a#241,b#242] parquet +- SubqueryAlias outer +- SubqueryAlias spark_catalog.default.t +- Relation default.t[a#233,b#234] parquet ``` / ``` Project [avg(a)#236 AS scalarsubquery(a)#243, sum(b)#238L AS scalarsubquery(a)#244L] +- Join LeftOuter, (a#241 = a#233) :- Project [a#233, avg(a)#236] : +- Join LeftOuter, (a#239 = a#233) : :- Project [a#233] : : +- Relation default.t[a#233,b#234] parquet : +- Aggregate [a#239], [avg(a#239) AS avg(a)#236, a#239] : +- Project [a#239] : +- Filter isnotnull(a#239) : +- Relation default.t[a#239,b#240] parquet +- Aggregate [a#241], [sum(b#242) AS sum(b)#238L, a#241] +- Filter isnotnull(a#241) +- Relation default.t[a#241,b#242] parquet ``` now, and this PR doesn't help at all, but it could be optimized using your (2). I wonder the following steps (tickets/PRs) would make sense: 1. Finish this PR and support only on non-correlated mergeable subqueries. Mainly focus on merging plans and keep the physical reuse dependency for simplicity. This supports subquery merging within a plan regardless they are in the same logical node. 2. Add a performance improvement to 1. so as to physical plan a merged subquery only once. This is your (1) basically. Move the merged subqueries to a top node and introduce subquery references in logical plan. 3. Add support for correlated subqueries using your (2). As you mentioned this will only support subqueries within the same logical node. Probably we should implement separate rules for 1. + 2. and 3. but the plan merging logic can be common. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
