ahshahid opened a new pull request, #49150:
URL: https://github.com/apache/spark/pull/49150

   ### What changes were proposed in this pull request?
   Currently the  CacheLookup relies on the fragment ( sub plan of incoming 
query)  matching exactly the analyzed Logical Plan in CacheManager for which 
InMemoryRelation exists.
   This limits efficiency of the lookup.. for eg  
   consider a simple case
   we have an InMemoryRelation available for a plan as
   
    Project(a, b, c, d)                                                ->  
Cached IMR
         |
   Project( x as a, b,  b+a as c, b - c as d) 
   
   and the incoming plan is say
     Project( a, b, c)
     |
   Project( x as a, b,  b+a as c, b - c as d) 
   
   Currently the incoming look up will not be able to use Cached IMR, as top 
level Projects do not match.
   But it is possible to use the IMR by putting a projection of   (a, b c)
   i.e Project( a, b, c)
            |
       Cached IMR
   
   Below describes how more complex plans can use cache IMR , with filters 
present in between Projects.
   **The main idea is : for each incoming fragment of incoming subplan, check 
if there is an exact match plan in the cache data. ( this is the current code). 
 If matches use that, else  look for a partial match ** 
   
   ** A partial match, among other requirements, also mandates that 
incoming_plan.child  is canonically same as cachePlan.child **
   
   It can be argued that why only 1  level depth equality is needed, as it is 
possible that the cache plan is usable even if the child do not match, but 
grand child match.
   
    i.e incoming_plan.child.child == cachePlan.child .child
   and in that sense the argument can be extended to the leaf..
   
   So we limit to 1 child level check for partial match for following reasons:
   1) Reduce the complexity 
   2) Keep lookup time in reasonable limit
   3) Hopefully the next PR in line 
[PR-SPARK-45959](https://github.com/apache/spark/pull/43854), which aggresively 
collapses two adjacents Projects or 2 Projects interspersed with Filters, into 
a single project in analyzer phase, if possible, means that in most cases, the 
child match would be good enough for partial match requirement..
   
   
   
   Case 1: using InMemoryRelation in a plan having 2 consecutive Projects.
   
   We start with a DataFrame df1 with plan = Project2 -> X
   and then we cache this df1. So that the CachedRepresentation has ( IMR and 
the logical Plan as Project2 -> X)
   
   Now we create a new data frame Df2 =  Some Project -> X 
   Clearly Project may no longer be same as Project2, so a direct check with 
CacheManager will not result in matching IMR.
   But clearly X are same .
   So the criteria is : an IMR can be used IFF following conditions are met
   
   X for both is same ( i.e incoming Project's child and CachedPlan's Project's 
child are same)
   All the NamedExpressions of Incoming Project are expressable in terms of 
output of Project2 ( which is what IMR's output is )
   To do the check for above point 2, we consider following logic
   Now given that X for both are same, which means their outputs are 
equivalent, so we remap the cached plan's Project2 in terms of output attribute 
( Expr IDs) of X of incoming Project Plan
   This will help us find out following
   Those NamedExpressions of incoming Project which are directly same as 
NamedExpressions of Project 2
   Those NamedExpressions of incoming Project which are some functions of 
output of Project 2
   Those NamedExpressions of incoming Project which are sort of Literal 
Constants and independent of output of Project2
   Those NamedExpressions of incoming Project which are functions of some 
attributes but those attributes are unavailable in the output of Project2
   So so long as above # 4 types of NamedExpressions are empty, it means that 
InMemoryRelation of the CachedPlan is usable.
   and this above logic is coded in CacheManager. The logic involves modifying 
the NamedExpressions in incoming Project, in terms of the Seq[Attributes] which 
will be forced on the IMR.
   
   Case 2: using InMemoryRelation in a plan resulting from collapse of Projects 
interspersed with Filters.
   
   We start with a DataFrame df1 with plan = Filter3 -> Filter4 -> Project2 -> X
   and then we cache this df1. So that the CachedRepresentation has ( IMR and 
the logical Plan as
   Filter3 -> Filter4 -> Project2 -> X )
   
   Now we create a new data frame Df2 = Project -> Filter1 -> Filter2 -> 
Filter3 -> Filter4  -> X 
   
   Clearly here the cached plan chain
   Filter3 -> Filter4 -> Project2 -> X
   is no longer directly similar to
   Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
   But it is still possible to use IMR as actually the cached plan's 
LogicalPlan can be used as a subtree of the incoming Plan.
   
   The logic for such check is partly the same as above for 2 consecutive 
Projects, with some handling for filters.
   The algo for this is as follows
   
   Identify the "child" X from the incoming plan and the Cached Plan 's Logical 
Plan. for similarity check.
   For incoming Plan, we reach X, and store all the consecutive Filter Chain.
   For the Cached Plan, we identify the first encountered Project , which is 
Project 2, and its child which is X.
   
   so we have X from both incoming and cached plan, and we identify the 
incoming project "Project" and the CachedPlan's "Project2".
   Now we can apply the Rule of case 1 of two consecutive Projects, and 
correctly modify the NamedExpressions of incoming Project , in terms of 
Seq[Attributes] which will be enforced upon the IMR.
   
   But we also need to ensure that the filter chain present in Cached Plan i.e 
Filter3 -> Filter4 is a subset of filter chain in the incoming Plan , which is 
Filter1 -> Filter2 -> Filter3 - Filter4.
   Now thing to note is that
   for incoming plan it is
   Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
   In the above chain, the filters are expressed in terms of output of X
   But for cached plan the filters are expressed in terms of output of Project2.
   Filter3 -> Filter4 -> Project2 -> X
   
   So for comparison we need to express the filter chain of Cached Plan in 
terms of X, by pulling up the P2 above filters such that
   it is now
   Project2 -> Filter3' -> Filter4' -> -> X
   
   Now we can see that if we compare
   Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
   
   and find that Filter3' -> Filter4' is a subset of Filter1 -> Filter2 -> 
Filter3 - Filter4
   and as the Project and Project2 are already compatible ( by case point 1)
   we can use cached IMR , with a modified Project with partial filter chain.
   
   i.e we should be able to get a plan like
   
   Project -> Filter1 -> Filter2 -> IMR.
   
   
   ### Why are the changes needed?
   This PR attempts to make cache look up more robust to make use of existing 
IMR as much as possible.
   And this change is a neccessary requirement for another opened PR which 
collapses projects aggresively 
[PR_SPARK-45959](https://github.com/apache/spark/pull/43854)
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Added new tests and relying on existing tests.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to