ahshahid opened a new pull request #33983: URL: https://github.com/apache/spark/pull/33983
Re creating a new PR which was closed earlier This PR proposes new logic to store the constraint and track aliases in projection which eliminates the need of pessimistically generating all the permutations of a given constraint. It is also more effective in correctly identifying the filters which can be pruned, apart from minimizing the memory used as compared to the current code. This also has changes to push compound filters if the join condition is on multiple attributes and the constraint comprises of more than 1 attributes of the join conditions. Right now the code retains the old logic of constraint management along with the new logic. It is controlled by a sql conf property "spark.sql.optimizer.optimizedConstraintPropagation.enabled" which is by default true. Once the PR is approved it would make sense to remove the old code & merge the code of ConstraintSet into ExpressionSet and removing certain if else blocks in the Optimizer & the function Optimizer.getAllConstraints and LogicalPlan.getAllValidConstraints. It is the code of getAllValidConstraints which generates all the permutations of the base constraint. The new logic is as follows: In the class ConstraintSet which extends ExpressionSet, we track the aliases , along with the base constraint. Any constraint which is added to the ConstraintSet is stored in the most canonicalized form (i.e consisting of only those attributes which are part of the output set and NOT the Alias's attribute). for eg consider a hypothetical plan Projection1 ( a, a as a1, a as a2, b , b as b1, b as b2, c, a +b as z) | Filter ( a + b > 10) | base relation (a, b, c, d) At the node Filter the constraint set will just have constraint a + b > 10 At the Node Projection1 , the constraint set will have constraint a + b > 10 and maintain following buffers buff1 -> a , a1.attribute, a2. attribute buff2 -> b, b1.attribute, b2.attribute buff3 -> a + b, z.attribute constraint a + b > 10 is already canonicalized in terms of output attributes. Now suppose there are two filters on top of projection1 Filter( z > 10) and Filter ( a1 + b2 > 10) To prune the above two filters, we canonicalize z as a + b ( from the data maintained in the constraintset) & check if the underlying set contains a +b > 10 & so can be pruned. For Filter a1 + b2 > 10, we identify the buffer to which a1 & b2 belong to and replace it with 0th elements of the buffer, which will yield a +b > 10, and so filter can be pruned. Now suppose there is another Project2 ( a1, a2, b1, b2, z, c) i.e say attributes a & b are no longer part of outputset. The idea is that "as much as possible try to make a constraint survive). So in Project2 , the atttributes a & b are being eliminated. we have a constraint a + b > 10 which is dependent on it. so in the constraintset of the ProjectP2, we update it such that a + b > 10 becomes ----> a1.attr + b1.attr > 10 buff1 a , a1.attribute, a2. attribute ---> a1.attribute, a2. attribute buff2 b , b1.attribute, b2. attribute ---> b1.attribute, b2. attribute buff3 a +b , z.attribute -->. a1.attr + b1.attr , z.attr This way by tracking aliases & just storing the canonicalized base constraints we can eliminate the need of pessimistically generating all combination of constraints. For inferring new Filter from constraints , we use following logic New Filter = Filter.constraints -- ( Filter.child.constraints ++ Filter.constraints.convertToCanonicalizedIfRequired(Filter.conditions) ) So the idea is that new filter conditions without redundancy can be obtained by difference of current node's constraints & the child node's constraints & the condition itself properly canonicalized in terms of base attributes which will be part of the output set of filter node. For inferring new filters for Join push down, we identify all the equality conditions & then the attributes are segregated on the lines of LHS & RHS of joins. So to identify filters for push down on RHS side, we get all equality atttributes of LHS side & ask the constraintset to return all the constraints which are subset of the passed LHS attributes. The LHS attributes are appropriately canonicalized & the constrainst identified. Once the constraints are know, we can replace the attributes with the corresponding RHS attributes. This helps in identifying the compound filters for push down & not just single attribute filters. Below is a description of the changes proposed. ExpressionSet: Apart from adding some new functions, fixed the two bugs in the ExpressionSet where in filter & filterNot, e.canonicalized was being used. e is already canonicalized. Also in very complex expressions canonicalization of a canonicalized object ( especially if contains a join expression ) does not behave correctly in current spark code. The ExpressionSet has added methods just to retain the existing constraints code. ConstraintSet: This is the class which does the tracking of the aliases , stores the constraints in the canonicalized form, updates the constraints using available aliases if any of the attribute comprising constraint is getting eliminated. The contains method of this class is used for filter pruning. It also identifies those constraints which can generated new filters for push down in join nodes. Rest all the changes are just to integrate the new logic as well as retain the old constraints logic. Pls notice that related to tpcds plan stability , I had to add new golden files for q75. The change as such is trivial. previously pushed filter was generated as PushedFilters: [IsNotNull(cr_order_number), IsNotNull(cr_item_sk)] and with the change it is PushedFilters: [IsNotNull(cr_item_sk), IsNotNull(cr_order_number)] This PR also eliminates the need of EqualNullSafe constraints for the alias. It also is able to handle the literal boolean constraints. Why are the changes needed? This issue if not fixed can cause OutOfMemory issue or unacceptable query compilation times. Added a test "plan equivalence with case statements and performance comparison with benefit of more than 10x conservatively" in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite with this PR the compilation time is 247 ms vs 13958 ms without the change 2) It is more effective in filter pruning as is evident in some of the tests in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite where current code is not able to identify the redundant filter in some cases. It is able to generate a better optimized plan for join queries as it can push compound predicates. Does this PR introduce any user-facing change? No ### Why are the changes needed? 1) The changes are needed because if the plan contains a lot of aliases & complex expressions involving those, the current combinatorial logic of evaluation all the constraints can cause compilation to run into hours or cause OOM. 2) The current logic can miss a lot of possible cases of removing redundant predicates, as it fails to take into account if same attribute or its aliases are repeated multiple times in a complex expression. 3) There are cases where some of the optimizer rules involving removal of redundant predicates fail to remove on the basis of constraint data. In some cases the rule works, just by the virtue of previous rules helping it out to cover the inaccuracy. ### How was this patch tested? Many new tests are added. Code is functional in our env. for many months without any issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
