vicennial commented on code in PR #51516: URL: https://github.com/apache/spark/pull/51516#discussion_r2325239874
########## sql/connect/common/src/main/protobuf/spark/connect/relations.proto: ########## @@ -81,6 +81,14 @@ message Relation { UnresolvedTableValuedFunction unresolved_table_valued_function = 43; LateralJoin lateral_join = 44; + // Reference to a node else where in the tree. There are two use cases for this: + // 1. Reduce tree duplication. In this case the tree contains two or more subtrees that are + // identical. The referenced plan can only be a back reference, to a subtree that was + // already visited by the planner. The planner is expected to visit the tree bottom-up from + // left to right. + // 1. Reduce tree depth. Review Comment: ```suggestion // 2. Reduce tree depth. ``` ########## sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RelationTreeUtils.scala: ########## @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.util + +import scala.jdk.CollectionConverters._ + +import com.google.protobuf.Descriptors.FieldDescriptor +import com.google.protobuf.Message + +import org.apache.spark.connect.proto +import org.apache.spark.util.SparkEnvUtils + +/** + * Utility functions for visiting and transforming relation trees (a.k.a. query trees). + * + * This implementation is efficient for know Relation/Message types. For unknown message types we + * use proto reflection. + */ +private[connect] object RelationTreeUtils { + + private val NO_INPUT_REL_TYPE_CASES = { + val typeCases = util.EnumSet.noneOf(classOf[proto.Relation.RelTypeCase]) + typeCases.add(proto.Relation.RelTypeCase.RELTYPE_NOT_SET) + typeCases.add(proto.Relation.RelTypeCase.READ) + typeCases.add(proto.Relation.RelTypeCase.LOCAL_RELATION) + typeCases.add(proto.Relation.RelTypeCase.CACHED_LOCAL_RELATION) + typeCases.add(proto.Relation.RelTypeCase.CACHED_REMOTE_RELATION) + typeCases.add(proto.Relation.RelTypeCase.SQL) + typeCases.add(proto.Relation.RelTypeCase.RANGE) + typeCases.add(proto.Relation.RelTypeCase.COMMON_INLINE_USER_DEFINED_TABLE_FUNCTION) + typeCases.add(proto.Relation.RelTypeCase.COMMON_INLINE_USER_DEFINED_DATA_SOURCE) + typeCases.add(proto.Relation.RelTypeCase.UNRESOLVED_TABLE_VALUED_FUNCTION) + typeCases.add(proto.Relation.RelTypeCase.REFERENCED_PLAN_ID) + typeCases.add(proto.Relation.RelTypeCase.UNKNOWN) + typeCases.add(proto.Relation.RelTypeCase.CATALOG) + typeCases.add(proto.Relation.RelTypeCase.EXTENSION) + typeCases + } + + def visit(relation: proto.Relation)(f: proto.Relation => Boolean): Unit = { + visit[Null](relation, null) { (current, _) => + (f(current), null) + } + } + + /** + * Visit all [[proto.Relation relations]] in a tree. + * + * @param relation + * root of the tree. + * @param f + * visit callback. The children of a relation will be visited when this function returns true. + */ + def visit[T](relation: proto.Relation, context: T)( + f: (proto.Relation, T) => (Boolean, T)): Unit = { + val messages = new util.ArrayDeque[(Message, T)] Review Comment: Are we using this to avoid potential stack overflow on deep trees? ########## sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RelationTreeUtils.scala: ########## @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.util + +import scala.jdk.CollectionConverters._ + +import com.google.protobuf.Descriptors.FieldDescriptor +import com.google.protobuf.Message + +import org.apache.spark.connect.proto +import org.apache.spark.util.SparkEnvUtils + +/** + * Utility functions for visiting and transforming relation trees (a.k.a. query trees). + * + * This implementation is efficient for know Relation/Message types. For unknown message types we + * use proto reflection. + */ +private[connect] object RelationTreeUtils { + + private val NO_INPUT_REL_TYPE_CASES = { + val typeCases = util.EnumSet.noneOf(classOf[proto.Relation.RelTypeCase]) + typeCases.add(proto.Relation.RelTypeCase.RELTYPE_NOT_SET) + typeCases.add(proto.Relation.RelTypeCase.READ) + typeCases.add(proto.Relation.RelTypeCase.LOCAL_RELATION) + typeCases.add(proto.Relation.RelTypeCase.CACHED_LOCAL_RELATION) + typeCases.add(proto.Relation.RelTypeCase.CACHED_REMOTE_RELATION) + typeCases.add(proto.Relation.RelTypeCase.SQL) + typeCases.add(proto.Relation.RelTypeCase.RANGE) + typeCases.add(proto.Relation.RelTypeCase.COMMON_INLINE_USER_DEFINED_TABLE_FUNCTION) + typeCases.add(proto.Relation.RelTypeCase.COMMON_INLINE_USER_DEFINED_DATA_SOURCE) + typeCases.add(proto.Relation.RelTypeCase.UNRESOLVED_TABLE_VALUED_FUNCTION) + typeCases.add(proto.Relation.RelTypeCase.REFERENCED_PLAN_ID) + typeCases.add(proto.Relation.RelTypeCase.UNKNOWN) + typeCases.add(proto.Relation.RelTypeCase.CATALOG) + typeCases.add(proto.Relation.RelTypeCase.EXTENSION) + typeCases + } + + def visit(relation: proto.Relation)(f: proto.Relation => Boolean): Unit = { + visit[Null](relation, null) { (current, _) => + (f(current), null) + } + } + + /** + * Visit all [[proto.Relation relations]] in a tree. + * + * @param relation + * root of the tree. + * @param f + * visit callback. The children of a relation will be visited when this function returns true. Review Comment: Also returns an updated context ########## sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/PlanOptimizer.scala: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.immutable.SeqMap +import scala.collection.mutable + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.WithRelations.ResolutionMethod + +/** + * Optimizer for Spark Connect plans. This optimizer moves all duplicate subtrees from a query + * tree (Relation) into a top level WithRelations node, the duplicates in the plan are replaced by + * references. This has a couple of advantages: it reduces the number of nodes in the plan, it + * reduces the plan size, it avoids redundant work on the server side (both during planning, and - + * if supported - analysis). + * + * This optimization assumes that nodes with the same plan_id are structurally equivalent. + * + * The optimization will retain all plan_ids in the input plan. This is needed because plan_ids + * can be referenced by UnresolvedAttribute, UnresolvedStar, UnresolvedRegex, and + * SubqueryExpression expressions. If the plan can be optimized, the new plan will contain an + * additional plan_id: the plan_id of the top-level WithRelations node. + * + * The current optimization uses a 2-pass approach. The first step identifies duplicate subtrees. + * This has a runtime and space complexity of O(num_unique_relations). The second step rewrites + * the plan. This has a runtime and space complexity of O(num_unique_relations). + * + * In theory this can be implemented as a single pass algorithm by replace duplicates with a + * reference once we identify them. This has two downsides: it requires that the client and the + * server have exactly the same traversal order, and it makes the plans much harder to read. + * + * @param nextPlanId + * generator for new plan_ids. + */ +class PlanOptimizer(nextPlanId: () => Long) { + def this(planIdGenerator: AtomicLong) = + this(() => planIdGenerator.incrementAndGet()) + + /** + * Optimize the given plan by deduplicating subtrees. + * + * @param plan + * The plan to optimize. + * @return + * The optimized plan with deduplicated subtrees. If the plan cannot be optimized, this + * returns the original plan. + */ + def optimize(plan: proto.Plan): proto.Plan = + PlanOptimizer.optimize(plan, nextPlanId) + + /** + * Optimize the given relation by deduplicating subtrees. + * + * @param relation + * The relation to optimize. + * @return + * The optimized relation with deduplicated subtrees. If the relation cannot be optimized, + * this returns the original relation. + */ + def optimize(relation: proto.Relation): proto.Relation = + PlanOptimizer.optimize(relation, nextPlanId) +} + +private[connect] object PlanOptimizer { + import RelationTreeUtils._ + + def optimize(plan: proto.Plan, nextPlanId: () => Long): proto.Plan = { + if (plan.hasRoot) { + val relation = plan.getRoot + val optimizedRelation = optimize(relation, nextPlanId) + if (optimizedRelation ne relation) { + plan.toBuilder.setRoot(optimizedRelation).build() + } else { + plan + } + } else { + plan + } + } + + def optimize(relation: proto.Relation, nextPlanId: () => Long): proto.Relation = { Review Comment: rename `relation` to `root` instead? ########## sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/PlanOptimizer.scala: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.immutable.SeqMap +import scala.collection.mutable + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.WithRelations.ResolutionMethod + +/** + * Optimizer for Spark Connect plans. This optimizer moves all duplicate subtrees from a query + * tree (Relation) into a top level WithRelations node, the duplicates in the plan are replaced by + * references. This has a couple of advantages: it reduces the number of nodes in the plan, it + * reduces the plan size, it avoids redundant work on the server side (both during planning, and - + * if supported - analysis). + * + * This optimization assumes that nodes with the same plan_id are structurally equivalent. + * + * The optimization will retain all plan_ids in the input plan. This is needed because plan_ids + * can be referenced by UnresolvedAttribute, UnresolvedStar, UnresolvedRegex, and + * SubqueryExpression expressions. If the plan can be optimized, the new plan will contain an + * additional plan_id: the plan_id of the top-level WithRelations node. + * + * The current optimization uses a 2-pass approach. The first step identifies duplicate subtrees. + * This has a runtime and space complexity of O(num_unique_relations). The second step rewrites + * the plan. This has a runtime and space complexity of O(num_unique_relations). + * + * In theory this can be implemented as a single pass algorithm by replace duplicates with a + * reference once we identify them. This has two downsides: it requires that the client and the + * server have exactly the same traversal order, and it makes the plans much harder to read. + * + * @param nextPlanId + * generator for new plan_ids. + */ +class PlanOptimizer(nextPlanId: () => Long) { + def this(planIdGenerator: AtomicLong) = + this(() => planIdGenerator.incrementAndGet()) + + /** + * Optimize the given plan by deduplicating subtrees. + * + * @param plan + * The plan to optimize. + * @return + * The optimized plan with deduplicated subtrees. If the plan cannot be optimized, this + * returns the original plan. + */ + def optimize(plan: proto.Plan): proto.Plan = + PlanOptimizer.optimize(plan, nextPlanId) + + /** + * Optimize the given relation by deduplicating subtrees. + * + * @param relation + * The relation to optimize. + * @return + * The optimized relation with deduplicated subtrees. If the relation cannot be optimized, + * this returns the original relation. + */ + def optimize(relation: proto.Relation): proto.Relation = + PlanOptimizer.optimize(relation, nextPlanId) +} + +private[connect] object PlanOptimizer { + import RelationTreeUtils._ + + def optimize(plan: proto.Plan, nextPlanId: () => Long): proto.Plan = { + if (plan.hasRoot) { + val relation = plan.getRoot + val optimizedRelation = optimize(relation, nextPlanId) + if (optimizedRelation ne relation) { + plan.toBuilder.setRoot(optimizedRelation).build() + } else { + plan + } + } else { + plan + } + } + + def optimize(relation: proto.Relation, nextPlanId: () => Long): proto.Relation = { + val relations = analyze(relation) + if (relations.nonEmpty) { + rewriteRelation(relation, relations, nextPlanId) + } else { + relation + } + } + + /** + * Find all repeated (duplicate) query fragments in a query tree. + * + * @param root + * node of the query tree + * @return + * a map that contains all repeated query fragments, keyed by their plan id. + */ + def analyze(root: proto.Relation): SeqMap[Long, proto.Relation] = { + // We can reduce memory consumption by using a bitset that tracks the planIds of nodes with a + // single occurrence. We only need to start tracking detailed information once there are + // multiple occurrences. For this we need a bitset that can deal with sparse planIds; there are + // libraries for this (e.g. RoaringBitMap), however that requires us to add a library to the + // Spark Connect client classpath which is something we need to trade off against overall size + // of that classpath. + val relationsMap = mutable.LinkedHashMap.empty[Long, RelationHolder] + visit(root) { + case relation @ PlanId(id) => Review Comment: Maybe leave a comment here mentioning that this block of code is used as the rule to determine whether a node's children are visited. Slightly redundant since it can be understood from looking at the method def but helps improve readability -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org