peter-toth opened a new pull request, #37630:
URL: https://github.com/apache/spark/pull/37630

   ### What changes were proposed in this pull request?
   After https://github.com/apache/spark/pull/32298 we were able to merge 
scalar subquery plans. This PR is a follow-up improvement to the merging logic 
to be able to combine `Filter` nodes with different conditions if those 
conditions can be merged in an ancestor `Aggregate` node. 
   
   Consider the following query:
   ```
   SELECT
     (SELECT avg(a) FROM t WHERE c = 1)
     (SELECT sum(a) FROM t WHERE c = 2)
   ```
   After this PR the 2 subqueries can be merged to this optimized form:
   ```
   == Optimized Logical Plan ==
   Project [scalar-subquery#260 [].avg(a) AS scalarsubquery()#277, 
scalar-subquery#261 [].sum(b) AS scalarsubquery()#278L]
   :  :- Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS 
mergedValue#286]
   :  :  +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS 
avg(a)#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
   :  :     +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS 
propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS 
propagatedFilter#285]
   :  :        +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR 
(isnotnull(c#266) AND (c#266 = 2)))
   :  :           +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
   :  +- Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) AS 
mergedValue#286]
   :     +- Aggregate [avg(a#264) FILTER (WHERE propagatedFilter#285) AS 
avg(a)#268, sum(b#265) FILTER (WHERE propagatedFilter#284) AS sum(b)#271L]
   :        +- Project [a#264, b#265, (isnotnull(c#266) AND (c#266 = 2)) AS 
propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS 
propagatedFilter#285]
   :           +- Filter ((isnotnull(c#266) AND (c#266 = 1)) OR 
(isnotnull(c#266) AND (c#266 = 2)))
   :              +- Relation spark_catalog.default.t[a#264,b#265,c#266] parquet
   +- OneRowRelation
   ```
   and physical form:
   ```
   == Physical Plan ==
   *(1) Project [Subquery scalar-subquery#260, [id=#148].avg(a) AS 
scalarsubquery()#277, ReusedSubquery Subquery scalar-subquery#260, 
[id=#148].sum(b) AS scalarsubquery()#278L]
   :  :- Subquery scalar-subquery#260, [id=#148]
   :  :  +- *(2) Project [named_struct(avg(a), avg(a)#268, sum(b), sum(b)#271L) 
AS mergedValue#286]
   :  :     +- *(2) HashAggregate(keys=[], functions=[avg(a#264), sum(b#265)], 
output=[avg(a)#268, sum(b)#271L])
   :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=143]
   :  :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#264) 
FILTER (WHERE propagatedFilter#285), partial_sum(b#265) FILTER (WHERE 
propagatedFilter#284)], output=[sum#288, count#289L, sum#290L])
   :  :              +- *(1) Project [a#264, b#265, (isnotnull(c#266) AND 
(c#266 = 2)) AS propagatedFilter#284, (isnotnull(c#266) AND (c#266 = 1)) AS 
propagatedFilter#285]
   :  :                 +- *(1) Filter ((isnotnull(c#266) AND (c#266 = 1)) OR 
(isnotnull(c#266) AND (c#266 = 2)))
   :  :                    +- *(1) ColumnarToRow
   :  :                       +- FileScan parquet 
spark_catalog.default.t[a#264,b#265,c#266] Batched: true, DataFilters: 
[((isnotnull(c#266) AND (c#266 = 1)) OR (isnotnull(c#266) AND (c#266 = 2)))], 
Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: 
[Or(And(IsNotNull(c),EqualTo(c,1)),And(IsNotNull(c),EqualTo(c,2)))], 
ReadSchema: struct<a:int,b:int,c:int>
   :  +- ReusedSubquery Subquery scalar-subquery#260, [id=#148]
   +- *(1) Scan OneRowRelation[]
   ```
   
   The optimization in this PR doesn't kick in if the filters (`c = 1`, `c = 
2`) are partition or bucket filters (to avoid possible performance 
degradation), but allows merging pushed-down data filters depending on a new 
`spark.sql.planMerge.ignorePushedDataFilters` config value (default `true`).
   
   ### Why are the changes needed?
   Performance improvement.
   ```
   [info] TPCDS Snappy:                             Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   [info] 
------------------------------------------------------------------------------------------------------------------------
   [info] q9 - Merge different filters off                   9526           
9634          97          0.0   244257993.6       1.0X
   [info] q9 - Merge different filters on                    3798           
3881         133          0.0    97381735.1       2.5X
   ```
   The performance improvement in case of `q9` comes from merging 15 subqueries 
into 1 subquery (https://github.com/apache/spark/pull/32298 was able to merge 
15 subqueries into 5).
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Existing and new UTs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to