peter-toth commented on code in PR #42223: URL: https://github.com/apache/spark/pull/42223#discussion_r1282023520
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CombineJoinedAggregates.scala: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeMap, Expression, NamedExpression, Or} +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LeafNode, LogicalPlan, Project, SerializeFromObject} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, JOIN} + +/** + * This rule eliminates the [[Join]] if all the join side are [[Aggregate]]s by combine these + * [[Aggregate]]s. This rule also support the nested [[Join]], as long as all the join sides for + * every [[Join]] are [[Aggregate]]s. + * + * Note: this rule doesn't following cases: + * 1. One of the to be merged two [[Aggregate]]s with child [[Filter]] and the other one is not. Review Comment: I tend to agree with you that merging is beneficial in most of the cases. But a negative case, that I mentioned in https://github.com/apache/spark/pull/42223#issuecomment-1657990533, is when `<condition 1>` is `p = 1` and `<condition 2>` is `p = 2` if `p` is a partitioning column. This means that the original scans dont't overlap, so when we calculate the sum of both `a` and `b` columns, the scan in the merged query returns 2 times more data (both `a` and `b` columns from both partitions) compared to the original scans (`a` from partition `p = 1` and `b` from partition `p = 2`. And that data "flows through" the whole plan and gets filtered up in the aggregate node only, which comes with additional costs. Also, in this case we need to scan both partitions' data files once just like the original individual queries did so no benefit on scan side. But when those conditions can't be pushed down (or they can be pushed down as "only" data filters) then I think merging is more likely to be beneficial than in the previous partition filtering case. This is because we need to scan all the data files only once in the merged query while we had to scan all files 2 times with the individual queries. (Now, obviously the above statment doesn't hold in all cases as pushed down data filters can also help avoiding scanning through all data files.) So this is why in https://github.com/apache/spark/pull/37630 I: - alowed aggregate merge if no filters can be pushed down to scans, - disabled merge (to be on the safe side) if any filters are pushed down to scans as partitioning or bucketing filters, - allowed merge depending on a new `spark.sql.planMerge.ignorePushedDataFilters` config if filters can be pushed down as data filters. (The config is ebabled by default as `Q9` with parquet files falls into this category and the improvement from merging is significant.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
