peter-toth opened a new pull request, #37630: URL: https://github.com/apache/spark/pull/37630
### What changes were proposed in this pull request? After https://github.com/apache/spark/pull/32298 we were able to merge scalar subquery plans. This PR is a follow-up improvement to the merging logic to be able to combine `Filter` nodes with different conditions if those conditions can be merged in an ancestor `Aggregate` node. Consider the following query: ``` SELECT (SELECT avg(a) FROM t WHERE c = 1) (SELECT sum(a) FROM t WHERE c = 2) ``` After this PR the 2 subqueries can be merged to this optimized form: ``` == Optimized Logical Plan == Project [scalar-subquery#260 [].avg(a) AS scalarsubquery()#277, scalar-subquery#261 [].sum(b) AS scalarsubquery()#278L] : :- Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286] : : +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L] : : +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : : +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : : +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet : +- Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286] : +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS avg(a)#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L] : +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet +- OneRowRelation ``` and physical form: ``` == Physical Plan == *(1) Project [Subquery scalar-subquery#260, [id=#148].avg(a) AS scalarsubquery()#277, ReusedSubquery Subquery scalar-subquery#260, [id=#148].sum(b) AS scalarsubquery()#278L] : :- Subquery scalar-subquery#260, [id=#148] : : +- *(2) Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS mergedValue#286] : : +- *(2) HashAggregate(keys=[], functions=[avg(a#264), sum(b#265)], output=[avg(a)#268, sum(b)#271L]) : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=143] : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#264) FILTER (WHERE propagatedFilter#285), partial_sum(b#265) FILTER (WHERE propagatedFilter#284)], output=[sum#288, count#289L, sum#290L]) : : +- *(1) Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS propagatedFilter#285] : : +- *(1) Filter ((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2))) : : +- *(1) ColumnarToRow : : +- FileScan parquet spark_catalog.default.t[a#264,b#265,c#266] Batched: true, DataFilters: [((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [Or(And(IsNotNull(c),EqualTo(c,1)),And(IsNotNull(c),EqualTo(c,2)))], ReadSchema: struct<a:int,b:int,c:int> : +- ReusedSubquery Subquery scalar-subquery#260, [id=#148] +- *(1) Scan OneRowRelation[] ``` The optimization in this PR doesn't kick in if the filters (`c = 1`, `c = 2`) are partition or bucket filters (to avoid possible performance degradation), but allows merging pushed-down data filters depending on a new `spark.sql.planMerge.ignorePushedDataFilters` config value (default `true`). ### Why are the changes needed? Performance improvement. ``` [info] TPCDS Snappy: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] q9 - Merge different filters off 9526 9634 97 0.0 244257993.6 1.0X [info] q9 - Merge different filters on 3798 3881 133 0.0 97381735.1 2.5X ``` The performance improvement in case of `q9` comes from merging 15 subqueries into 1 subquery (https://github.com/apache/spark/pull/32298 was able to merge 15 subqueries into 5). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing and new UTs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
