EmmyMiao87 commented on a change in pull request #1471: Planner support push 
down predicates past agg, win and sort
URL: https://github.com/apache/incubator-doris/pull/1471#discussion_r312370964
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
 ##########
 @@ -1545,5 +1625,192 @@ private void 
materializeInlineViewResultExprForCrossJoinOrCountStar(InlineViewRe
         }
     }
 
+    /**
+     
------------------------------------------------------------------------------
+     */
+    /**
+     * Push down predicates rules
+     */
+
+    /**
+     * Entrance for push-down rules, it will execute possible push-down rules 
from top to down
+     * and the planner will be responsible for assigning all predicates to 
PlanNode.
+     */
+    private void pushDownPredicates(Analyzer analyzer, SelectStmt stmt) throws 
AnalysisException {
+        pushDownPredicatesPastSort(analyzer, stmt);
+        pushDownPredicatesPastWindows(analyzer, stmt);
+        pushDownPredicatesPastAggregation(analyzer, stmt);
+    }
+
+    private void pushDownPredicatesPastSort(Analyzer analyzer, SelectStmt 
stmt) throws AnalysisException {
+        // TODO chenhao, remove isEvaluateOrderBy when SubQuery's default 
limit is removed.
+        if (stmt.evaluateOrderBy() || stmt.getLimit() >= 0 || stmt.getOffset() 
> 0 || stmt.getSortInfo() == null) {
+            return;
+        } 
+
+        final List<Expr> predicates = getBoundPredicates(analyzer, 
stmt.getSortInfo().getSortTupleDescriptor());
+        if (predicates.size() <= 0) {
+            return;
+        }
+
+        final List<Expr> pushDownPredicates = 
getPredicatesReplacedSlotWithSourceExpr(predicates, analyzer);
+        if (pushDownPredicates.size() <= 0) {
+            return;
+        }
+        
+        if (putPredicatesOnWindows(stmt, analyzer, pushDownPredicates)) {
+            return;
+        }
+        if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
+            return;
+        }
+
+        putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+    }
+
+    private void pushDownPredicatesPastWindows(Analyzer analyzer, SelectStmt 
stmt) throws AnalysisException {
+        final AnalyticInfo analyticInfo = stmt.getAnalyticInfo();
+        if (analyticInfo == null || 
analyticInfo.getCommonPartitionExprs().size() == 0) {
+            return;
+        }
+
+        final List<Expr> predicates = getBoundPredicates(analyzer, 
analyticInfo.getOutputTupleDesc());
+        if (predicates.size() <= 0) {
+            return;
+        }
+
+        final List<Expr> pushDownPredicates = 
getPredicatesBoundedByGroupbysAndReplaceSlotWithSourceExpr(predicates, 
analyzer);
+        if (pushDownPredicates.size() <= 0) {
+            return;
+        }
+
+        if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
+            return;
+        }
+
+        putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+    }
+
+    private void pushDownPredicatesPastAggregation(Analyzer analyzer, 
SelectStmt stmt) throws AnalysisException {
+        final AggregateInfo aggregateInfo = stmt.getAggInfo();
+        if (aggregateInfo == null || aggregateInfo.getGroupingExprs().size() 
<= 0) {
+            return;
+        }
+
+        final List<Expr> predicates = getBoundPredicates(analyzer, 
aggregateInfo.getOutputTupleDesc());
+        if (predicates.size() <= 0) {
+            return;
+        }
+
+        final List<Expr> pushDownPredicates = 
getPredicatesBoundedByGroupbysAndReplaceSlotWithSourceExpr(predicates, 
analyzer);
+        if (pushDownPredicates.size() <= 0) {
+            return;
+        }
+
+        putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
+    }
+
+    private List<Expr> 
getPredicatesBoundedByGroupbysAndReplaceSlotWithSourceExpr(List<Expr> 
predicates, Analyzer analyzer) {
+        final List<Expr> predicatesCanPushDown = Lists.newArrayList();
+        for (Expr predicate : predicates) {
+            if (predicate.isConstant()) {
+                // Constant predicates can't be pushed down past Groupby.
+                continue;
+            }
+
+            final List<TupleId> tupleIds = Lists.newArrayList();
+            final List<SlotId> slotIds = Lists.newArrayList();
+            predicate.getIds(tupleIds, slotIds);
+
+            boolean isAllSlotReferingGroupBys = true;
+            for (SlotId slotId : slotIds) {
+                final SlotDescriptor slotDesc = 
analyzer.getDescTbl().getSlotDesc(slotId);
+                if (slotDesc.getSourceExprs().get(0).getFn() instanceof 
AggregateFunction) {
+                    isAllSlotReferingGroupBys = false;
+                }
+            }
+
+            if (isAllSlotReferingGroupBys) {
+                predicatesCanPushDown.add(predicate);
+            }
+        }
+        return getPredicatesReplacedSlotWithSourceExpr(predicatesCanPushDown, 
analyzer);
+    }
+
+    private List<Expr> getPredicatesReplacedSlotWithSourceExpr(List<Expr> 
predicates, Analyzer analyzer) {
+        final List<Expr> predicatesCanPushDown = Lists.newArrayList();
+        analyzer.markConjunctsAssigned(predicates);
+        for (Expr predicate : predicates) {
+            final Expr newPredicate = predicate.clone();
+            replacePredicateSlotRefWithSlotRefSource(newPredicate, analyzer);
+            predicatesCanPushDown.add(newPredicate);
+        }
+        return predicatesCanPushDown;
+    }
+
+    private void replacePredicateSlotRefWithSlotRefSource(Expr predicate, 
Analyzer analyzer) {
+        replacePredicateSlotRefWithSlotRefSource(null, predicate, -1, 
analyzer);
+    }
+
+    private void replacePredicateSlotRefWithSlotRefSource(Expr parent, Expr 
predicate, int childIndex, Analyzer analyzer) {
+        if (predicate instanceof SlotRef) {
+            final SlotRef slotRef =  (SlotRef)predicate;
+            if (parent != null && childIndex >= 0) {
+                final Expr newReplacedExpr = 
slotRef.getDesc().getSourceExprs().get(0).clone();
+                parent.setChild(childIndex, newReplacedExpr);
+            }
+        }
+
+        for (int i = 0; i < predicate.getChildren().size(); i++) {
+            final Expr child = predicate.getChild(i);
+            replacePredicateSlotRefWithSlotRefSource(predicate, child, i, 
analyzer);
+        }
+    }
+
+    private boolean putPredicatesOnAggregation(SelectStmt stmt, Analyzer 
analyzer,
+                                      List<Expr> predicates) throws 
AnalysisException {
+        final AggregateInfo aggregateInfo = stmt.getAggInfo();
+        if (aggregateInfo != null) {
+            analyzer.registerConjuncts(predicates, 
aggregateInfo.getOutputTupleId());
+            return true;
+        }
+        return false;
+    }
+
+    private boolean putPredicatesOnWindows(SelectStmt stmt, Analyzer analyzer,
+                                 List<Expr> predicates) throws 
AnalysisException {
+        final AnalyticInfo analyticInfo = stmt.getAnalyticInfo();
+        if (analyticInfo != null) {
+            analyzer.registerConjuncts(predicates, 
analyticInfo.getOutputTupleId());
+            return true;
+        }
+        return false;
+    }
+
+    private boolean putPredicatesOnFrom(SelectStmt stmt, Analyzer analyzer, 
List<Expr> predicates) throws AnalysisException {
 
 Review comment:
   Maybe the return of this function is useless.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to