ahshahid opened a new pull request, #49124:
URL: https://github.com/apache/spark/pull/49124

   ### What changes were proposed in this pull request?
   This PR attempts to keep the depth of the LogicalPlan tree unchanged,  when 
columns are added /dropped/renamed .
   This is done via a special new rule called EarlyCollapseProject.
   This is applied after the analysis has happened, but the analyzed plan is 
yet to be assigned to the holder variable in QueryExecution / DataFrame.
   
   **EarlyCollapseProject** code does the following :
   1) If  the incoming plan is a  project1 -> Project2 - X , then it collapses 
the two projects into one.  such that the final plan looks like Project -> X
   2) If the incoming plan is of the form  Project1 -Filter1 - Filter2 
---FilterN - Project 2 - X, then it collapses the Project1 and Project2 as 
   Project -> Filter1 - Filter2 -- FilterN - X.
   Pls note that in this case it is as if Project2 is pulled up for collapse, 
rather than vice versa.
   The reason for this is that it is possible that Project1 has some behaviour 
( like a UDF) which is not capable of handling certain data , which otherwise 
would be filtered by the filter chain.  If project1 was pushed below Filter 
chain, then the unfiltered rows can cause issues. Existing spark tests on UDF 
have test which is sensitive to this.
   
   3) The EarlyCollapseProject IS NOT applied if 
     a) any of the incoming Project nodes ( Project1 or Project2) have tag 
LogicalPlan.PLAN_ID_TAG set ( which implies it is coming from a spark client. 
It is not handled as of now, because for clients the subsequent resolutions are 
tied to a direct mapping of the tag ID associated with each Project, and 
removal of any Project via collapse , breaks the code)
    
   b) If the incoming Project2  has any UserDefinedExpression or non 
deterministic expression. Or If Project2's child is a Window node.
   The reason for non-deterministic exclusion is to ensure the functionality is 
not broken, as collapse project  will replicate the non-deterministic 
expression. Similarly for UserDefinedExpression and Window node below Project2, 
the collapse is avoided, as it can cause replication & hence re-evaluation of 
expensive code when collapsed.
   
   4) The other important thing the EarlyCollapseProject does, is to store the 
attributes in the Collapsed Project node, which get dropped and because of 
collapse , they loose their presence from the Plan completely. 
   This is needed so as to resurrect dropped attributes, if needed arises, so 
as to do the resolution correctly.
   The dropped attributes are stored in a Seq using the tag 
LogicalPlan.DROPPED_NAMED_EXPRESSIONS
   The need for this arises in following situations:
   say we start with a DataFrame df1 , with plan as  Project2 ( a, b, c)  - X
   then we create a new DataFrame df2 = df1.select( b, c).  Here, attribute a 
is dropped
   Because of the EarlyCollapseProject rule, the new DataFrame df2 , will have 
logical plan as Project(b, c) - X
   Now spark allows a DataFrame df3 = df2.filter( a > 7). 
   which would result in a LogicalPlan as  Filter( a > 7) -> Project(b, c) -> X
   But because "a" has been dropped , its resolution is no longer possible.
   To retain the existing behaviour and hence resolution, the Project(b, c) 
contains the dropped the NamedExpression "a" , and hence this can be revived as 
last effort for resolution.
   
   **ColumnResolutionHelper** code change:
   The reviving of dropped attributes for plan resolution is done via the code 
change in ColumnResolutionHelper.resolveExprsAndAddMissingAttrs, where the 
dropped attributes stored in the Tag are revived back for resolution.
   
   Code changes in **CacheManager**
   The major code  change  is in **CacheManager**. 
   Previously as any change in projection ( addition, drop, rename, shuffling) 
resulted in a new Project , it was straightforward to lookup cache for fragment 
logical plan match, as the plan cached would always match a query subtree , if 
the subtree is used in an immutable form to build complete query tree.  But 
since in this PR , a new Project is collapsed with the existing Project, the 
subtree is no longer the same as what was cached. Apart from that, the presence 
of filters between two projects, also muddles the situation.
   
   **Case 1**: _using InMemoryRelation in a plan resulting from collapse of 2 
consecutive Projects._
   
    We start with a  DataFrame df1 with plan = Project2  -> X
   and then we cache this df1. So that the CachedRepresentation has ( IMR and 
the logical Plan as Project2 -> X)
   
   Now we create a new data frame Df2 = df1.select ( some Proj)  , which due to 
early collapse would look like
       Project -> X
   Clearly Project may no longer be same as Project2, so a direct check with 
CacheManager will not result in matching IMR.
    But  clearly  X are same .
   So the criteria is : an IMR can be used IFF following conditions are met
   1) X  for both is same ( i.e incoming Project's child and CachedPlan's 
Project's child are same)
   2) All the NamedExpressions of Incoming Project are expressable in terms of 
output of Project2  ( which is what IMR's output is )
        To do the check for  above point  **_2_**, we consider following logic
       Now given that X for both are same,  which means their outputs are 
equivalent, so we remap the cached plan's Project2 in terms  of output 
attribute ( Expr IDs) of X  of incoming Project Plan 
     This will help us find out following
      1) Those NamedExpressions of incoming Project which are directly same as 
NamedExpressions of Project 2
      2) Those NamedExpressions of incoming Project which are some functions of 
output of Project 2
      3) Those NamedExpressions of incoming Project which are sort of Literal 
Constants and independent of output of Project2
     4) Those NamedExpressions of incoming Project which are functions of some 
attributes but those attributes are unavailable in the  output of Project2
   
   So so long as above # 4  types of NamedExpressions are empty, it means that 
InMemoryRelation of the CachedPlan is usable.
   and this above logic is coded in CacheManager. The logic involves modifying 
the NamedExpressions in incoming Project, in terms of the Seq[Attributes] which 
will be forced on the IMR.
   
   
   **Case 2**: _using InMemoryRelation in a plan resulting from collapse of 
Projects interspersed with Filters._
   
    We start with a  DataFrame df1 with plan = Filter3 -> Filter4 -> Project2  
-> X
   and then we cache this df1. So that the CachedRepresentation has ( IMR and 
the logical Plan as
   Filter3 -> Filter4 ->  Project2 -> X )
   
   Now we create a new data frame Df2 = df1.filter( f2) .filter(f1).select 
(some Proj)  , which due to early collapse would look like
     Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
   ( this is because in case of filters, Project2 would be pulled up for 
collapse)
   
   Clearly here the cached plan chain
   Filter3 -> Filter4 -> Project2  -> X
   is no longer directly similar to 
   Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
   But it is still possible to use IMR as actually the cached plan's 
LogicalPlan can be used as a subtree of the incoming Plan.
   
   The logic for such check is partly the same as above for 2 consecutive 
Projects, with some handling for filters.
   The algo for this is as follows
   1) Identify the "child"  X from the incoming plan  and the Cached Plan 's 
Logical Plan. for similarity check.
   For incoming Plan, we reach  X, and store all the consecutive Filter Chain.
   
   For the Cached Plan,  we identify the first encountered Project , which is 
Project 2, and its child which is X.
   
   so we have X from both incoming and cached plan, and we identify the 
incoming project "Project" and the CachedPlan's "Project2".
    Now we can apply the Rule of **case 1** of two consecutive Projects, and 
correctly modify the NamedExpressions of incoming Project , in terms of 
Seq[Attributes] which will be enforced upon the IMR.
   
   But we also need to ensure that the filter chain present in Cached Plan i.e 
Filter3 -> Filter4  is a subset of filter chain in the incoming Plan , which is 
Filter1 -> Filter2 -> Filter3 - Filter4.
   **Now thing to note is that**
     for incoming plan it is 
   Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
   In the above chain, the filters are expressed in terms of output of X
   But for cached plan the filters are expressed in terms of output of Project2.
   Filter3 -> Filter4 -> Project2  -> X
   
   So for comparison we need to express the filter chain of Cached Plan in 
terms of X, by pulling up the P2 above filters such that
   it is now 
   Project2 -> Filter3' -> Filter4' -> -> X
   
   Now we can see that if  we compare 
   Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
   
   and find that Filter3' -> Filter4'  is a subset of  Filter1 -> Filter2 -> 
Filter3 - Filter4  
   and as the Project and Project2 are already compatible ( by **case  point 
1**)
   we can use cached IMR , with a modified Project with partial filter chain.
   
   i.e we should be able to get a plan like
   
   Project -> Filter1 -> Filter2 -> IMR.
   ### Why are the changes needed?
   Due to addition/mods of new rules Spark 3.0 , clients are seeing extremely 
large increase in query compilation time, when the client code is adding one 
column at a time in a loop.  Even though API doc does not recommend such 
practice, but it happens and clients are reluctant to change the code. So this 
PR attempts to handle the situation where columns are added not in a single 
shot but  one at a time . This would help in Analyzer/ Resolver rules to 
complete faster.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Added new tests and relying on existing tests.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to