EmmyMiao87 commented on a change in pull request #1471: Planner support push down predicates past agg, win and sort URL: https://github.com/apache/incubator-doris/pull/1471#discussion_r312370964
########## File path: fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java ########## @@ -1545,5 +1625,192 @@ private void materializeInlineViewResultExprForCrossJoinOrCountStar(InlineViewRe } } + /** + ------------------------------------------------------------------------------ + */ + /** + * Push down predicates rules + */ + + /** + * Entrance for push-down rules, it will execute possible push-down rules from top to down + * and the planner will be responsible for assigning all predicates to PlanNode. + */ + private void pushDownPredicates(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { + pushDownPredicatesPastSort(analyzer, stmt); + pushDownPredicatesPastWindows(analyzer, stmt); + pushDownPredicatesPastAggregation(analyzer, stmt); + } + + private void pushDownPredicatesPastSort(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { + // TODO chenhao, remove isEvaluateOrderBy when SubQuery's default limit is removed. + if (stmt.evaluateOrderBy() || stmt.getLimit() >= 0 || stmt.getOffset() > 0 || stmt.getSortInfo() == null) { + return; + } + + final List<Expr> predicates = getBoundPredicates(analyzer, stmt.getSortInfo().getSortTupleDescriptor()); + if (predicates.size() <= 0) { + return; + } + + final List<Expr> pushDownPredicates = getPredicatesReplacedSlotWithSourceExpr(predicates, analyzer); + if (pushDownPredicates.size() <= 0) { + return; + } + + if (putPredicatesOnWindows(stmt, analyzer, pushDownPredicates)) { + return; + } + if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) { + return; + } + + putPredicatesOnFrom(stmt, analyzer, pushDownPredicates); + } + + private void pushDownPredicatesPastWindows(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { + final AnalyticInfo analyticInfo = stmt.getAnalyticInfo(); + if (analyticInfo == null || analyticInfo.getCommonPartitionExprs().size() == 0) { + return; + } + + final List<Expr> predicates = getBoundPredicates(analyzer, analyticInfo.getOutputTupleDesc()); + if (predicates.size() <= 0) { + return; + } + + final List<Expr> pushDownPredicates = getPredicatesBoundedByGroupbysAndReplaceSlotWithSourceExpr(predicates, analyzer); + if (pushDownPredicates.size() <= 0) { + return; + } + + if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) { + return; + } + + putPredicatesOnFrom(stmt, analyzer, pushDownPredicates); + } + + private void pushDownPredicatesPastAggregation(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { + final AggregateInfo aggregateInfo = stmt.getAggInfo(); + if (aggregateInfo == null || aggregateInfo.getGroupingExprs().size() <= 0) { + return; + } + + final List<Expr> predicates = getBoundPredicates(analyzer, aggregateInfo.getOutputTupleDesc()); + if (predicates.size() <= 0) { + return; + } + + final List<Expr> pushDownPredicates = getPredicatesBoundedByGroupbysAndReplaceSlotWithSourceExpr(predicates, analyzer); + if (pushDownPredicates.size() <= 0) { + return; + } + + putPredicatesOnFrom(stmt, analyzer, pushDownPredicates); + } + + private List<Expr> getPredicatesBoundedByGroupbysAndReplaceSlotWithSourceExpr(List<Expr> predicates, Analyzer analyzer) { + final List<Expr> predicatesCanPushDown = Lists.newArrayList(); + for (Expr predicate : predicates) { + if (predicate.isConstant()) { + // Constant predicates can't be pushed down past Groupby. + continue; + } + + final List<TupleId> tupleIds = Lists.newArrayList(); + final List<SlotId> slotIds = Lists.newArrayList(); + predicate.getIds(tupleIds, slotIds); + + boolean isAllSlotReferingGroupBys = true; + for (SlotId slotId : slotIds) { + final SlotDescriptor slotDesc = analyzer.getDescTbl().getSlotDesc(slotId); + if (slotDesc.getSourceExprs().get(0).getFn() instanceof AggregateFunction) { + isAllSlotReferingGroupBys = false; + } + } + + if (isAllSlotReferingGroupBys) { + predicatesCanPushDown.add(predicate); + } + } + return getPredicatesReplacedSlotWithSourceExpr(predicatesCanPushDown, analyzer); + } + + private List<Expr> getPredicatesReplacedSlotWithSourceExpr(List<Expr> predicates, Analyzer analyzer) { + final List<Expr> predicatesCanPushDown = Lists.newArrayList(); + analyzer.markConjunctsAssigned(predicates); + for (Expr predicate : predicates) { + final Expr newPredicate = predicate.clone(); + replacePredicateSlotRefWithSlotRefSource(newPredicate, analyzer); + predicatesCanPushDown.add(newPredicate); + } + return predicatesCanPushDown; + } + + private void replacePredicateSlotRefWithSlotRefSource(Expr predicate, Analyzer analyzer) { + replacePredicateSlotRefWithSlotRefSource(null, predicate, -1, analyzer); + } + + private void replacePredicateSlotRefWithSlotRefSource(Expr parent, Expr predicate, int childIndex, Analyzer analyzer) { + if (predicate instanceof SlotRef) { + final SlotRef slotRef = (SlotRef)predicate; + if (parent != null && childIndex >= 0) { + final Expr newReplacedExpr = slotRef.getDesc().getSourceExprs().get(0).clone(); + parent.setChild(childIndex, newReplacedExpr); + } + } + + for (int i = 0; i < predicate.getChildren().size(); i++) { + final Expr child = predicate.getChild(i); + replacePredicateSlotRefWithSlotRefSource(predicate, child, i, analyzer); + } + } + + private boolean putPredicatesOnAggregation(SelectStmt stmt, Analyzer analyzer, + List<Expr> predicates) throws AnalysisException { + final AggregateInfo aggregateInfo = stmt.getAggInfo(); + if (aggregateInfo != null) { + analyzer.registerConjuncts(predicates, aggregateInfo.getOutputTupleId()); + return true; + } + return false; + } + + private boolean putPredicatesOnWindows(SelectStmt stmt, Analyzer analyzer, + List<Expr> predicates) throws AnalysisException { + final AnalyticInfo analyticInfo = stmt.getAnalyticInfo(); + if (analyticInfo != null) { + analyzer.registerConjuncts(predicates, analyticInfo.getOutputTupleId()); + return true; + } + return false; + } + + private boolean putPredicatesOnFrom(SelectStmt stmt, Analyzer analyzer, List<Expr> predicates) throws AnalysisException { Review comment: Maybe the return of this function is useless. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org