>From Mehnaz Tabassum Mahin <[email protected]>:

Mehnaz Tabassum Mahin has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17759 )


Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................

PLEASE EDIT to provide a meaningful commit message!

The following commits from your working branch will be included:

[NO ISSUE][COMP] Add Distinct Cardinality Estimator from samples

Details:
 - Add sampling queries for DISTINCT and GROUPBY operators
 - Create DISTINCT operators to submit sampling queries for each dataset of 
JOIN queries
 - Add the Method-of-Moments (MMO) estimator for distinct values from samples
 - The estimator can estimate number of distinct values in any attributes of a 
dataset, i.e.,
   in the attributes given in DISTINCT or GROUPBY operators.

Change-Id: I9f90bddc0fb1b19923a4ceac7436504d8d264689

# Do not delete the following line!
Change-Id: I7443ac65d53aaa3d3579baf3874e660bf562e925
---
A 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/CBODistinctOperatorUtils.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
A 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/DistinctCardinalityEstimation.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
4 files changed, 598 insertions(+), 0 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/59/17759/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/CBODistinctOperatorUtils.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/CBODistinctOperatorUtils.java
new file mode 100644
index 0000000..3a9e725
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/CBODistinctOperatorUtils.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.optimizer.rules.cbo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class CBODistinctOperatorUtils {
+
+    public static boolean 
containsAllGroupByDistinctVarsInScanOp(DataSourceScanOperator scanOp,
+            ILogicalOperator grpByDistinctOp) {
+        LogicalOperatorTag tag = grpByDistinctOp.getOperatorTag();
+        if (tag == LogicalOperatorTag.GROUP || tag == 
LogicalOperatorTag.DISTINCT) {
+            List<LogicalVariable> distinctVars = 
getGroupByDistinctVarFuncPair(grpByDistinctOp).first;
+            if (distinctVars.size() == 0) {
+                return false;
+            }
+            List<LogicalVariable> scanVars = scanOp.getVariables();
+            List<LogicalVariable> foundDistinctVars = new ArrayList<>();
+            for (LogicalVariable scanVar : scanVars) {
+                if (distinctVars.contains(scanVar)) {
+                    foundDistinctVars.add(scanVar);
+                }
+            }
+            // discarding the variable for Dataset name or alias from scanOp
+            return ((scanVars.size() - 1) == foundDistinctVars.size());
+        }
+        return false;
+    }
+
+    public static void createDistinctOpsForJoinNodes(ILogicalOperator op, 
ILogicalOperator grpByDistinctOp,
+            IOptimizationContext context, HashMap<DataSourceScanOperator, 
ILogicalOperator> map) {
+        List<LogicalVariable> foundDistinctVars = new ArrayList<>();
+        ILogicalOperator selOp = null, assignOp = null;
+
+        LogicalOperatorTag tag = op.getOperatorTag();
+        // add DistinctOp to count distinct values in an attribute (except all 
PK attribute(s))
+        if (tag == LogicalOperatorTag.ASSIGN || tag == 
LogicalOperatorTag.SELECT
+                || tag == LogicalOperatorTag.DATASOURCESCAN) {
+            Pair<List<LogicalVariable>, List<AbstractFunctionCallExpression>> 
distinctPair =
+                    getGroupByDistinctVarFuncPair(grpByDistinctOp);
+            List<LogicalVariable> distinctVars = distinctPair.first;
+            if (distinctVars.size() == 0) {
+                return;
+            }
+
+            DataSourceScanOperator scanOp = null;
+            LogicalVariable assignVar;
+            while (tag != LogicalOperatorTag.EMPTYTUPLESOURCE) {
+                if (tag == LogicalOperatorTag.SELECT) {
+                    selOp = op;
+                } else if (tag == LogicalOperatorTag.ASSIGN) {
+                    assignVar = ((AssignOperator) op).getVariables().get(0);
+                    int idx = distinctVars.indexOf(assignVar);
+                    if (idx != -1 && assignOp == null) { // first 
corresponding AssignOp found
+                        assignOp = op;
+                    }
+                    if (idx != -1) { // add all variables of the AssignOp
+                        foundDistinctVars.add(assignVar);
+                    }
+                } else if (tag == LogicalOperatorTag.DATASOURCESCAN) {
+                    scanOp = (DataSourceScanOperator) op;
+                    // will work for any attributes present in GroupByOp or 
DistinctOp
+                    // Note: uncomment the following if-else-statement if CBO 
doesn't need to estimate the number of distinct values
+                    // when GroupByOp or DistinctOp contains all PK attributes
+                    // (in the case of estimated cardinality from samples can 
be mostly same as original dataset cardinality)
+                    /*if (containsAllGroupByDistinctVarsInScanOp(scanOp, 
grpByDistinctOp)) {
+                        // contains all PK attribute(s), so estimated distinct 
cardinality is same as original dataset cardinality
+                        scanOp = null;
+                    } else { // at least one PK attribute is not in GroupByOp 
or DistinctOp variables */
+                    List<LogicalVariable> scanVars = scanOp.getVariables();
+                    for (LogicalVariable scanVar : scanVars) { // add all 
required variables of the DataSourceScanOp
+                        if (distinctVars.contains(scanVar)) {
+                            foundDistinctVars.add(scanVar);
+                        }
+                    }
+                    if (foundDistinctVars.size() == 0) {
+                        scanOp = null; // GroupByOp or DistinctOp doesn't 
contain any attributes of the dataset
+                    }
+                    //}
+                }
+                op = op.getInputs().get(0).getValue();
+                tag = op.getOperatorTag();
+            }
+
+            if (scanOp != null) {
+                ILogicalOperator inputOp = (selOp != null) ? selOp : 
((assignOp != null) ? assignOp : scanOp);
+                SourceLocation sourceLocation = inputOp.getSourceLocation();
+                DistinctOperator distinctOp =
+                        createDistinctOp(foundDistinctVars, inputOp, 
sourceLocation, distinctPair.second, context);
+                if (distinctOp != null) {
+                    map.put(scanOp, distinctOp);
+                }
+            }
+        } else if (tag == LogicalOperatorTag.INNERJOIN || tag == 
LogicalOperatorTag.LEFTOUTERJOIN) {
+            for (int i = 0; i < op.getInputs().size(); i++) {
+                ILogicalOperator nextOp = op.getInputs().get(i).getValue();
+                createDistinctOpsForJoinNodes(nextOp, grpByDistinctOp, 
context, map);
+            }
+        }
+    }
+
+    private static List<LogicalVariable> 
getFunctionVariables(AbstractFunctionCallExpression funcExpr) {
+        List<LogicalVariable> variables = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> argList = funcExpr.getArguments();
+        for (Mutable<ILogicalExpression> arg : argList) {
+            if (arg.getValue().getExpressionTag() == 
LogicalExpressionTag.VARIABLE) {
+                variables.add(((VariableReferenceExpression) 
arg.getValue()).getVariableReference());
+            } else if (arg.getValue().getExpressionTag() == 
LogicalExpressionTag.FUNCTION_CALL) {
+                
variables.addAll(getFunctionVariables((AbstractFunctionCallExpression) 
arg.getValue()));
+            }
+        }
+        return variables;
+    }
+
+    private static Pair<List<LogicalVariable>, 
List<AbstractFunctionCallExpression>> getGroupByDistinctVarFuncPair(
+            ILogicalOperator grpByDistinctOp) {
+        List<LogicalVariable> distinctVars = new ArrayList<>();
+        List<AbstractFunctionCallExpression> distinctFunctions = new 
ArrayList<>();
+        ILogicalExpression varRef;
+        ILogicalOperator nextOp;
+        if (grpByDistinctOp.getOperatorTag() == LogicalOperatorTag.DISTINCT) {
+            nextOp = grpByDistinctOp.getInputs().get(0).getValue();
+            if (nextOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                ILogicalExpression assignExpr = ((AssignOperator) 
nextOp).getExpressions().get(0).getValue();
+                if (assignExpr.getExpressionTag() == 
LogicalExpressionTag.FUNCTION_CALL) { // FId: open-object-constructor
+                    List<Mutable<ILogicalExpression>> argList =
+                            ((AbstractFunctionCallExpression) 
assignExpr).getArguments();
+                    for (int i = 0; i < argList.size(); i += 2) {
+                        varRef = argList.get(i + 1).getValue();
+                        if (varRef.getExpressionTag() == 
LogicalExpressionTag.VARIABLE) {
+                            distinctVars.add(((VariableReferenceExpression) 
varRef).getVariableReference());
+                        } else if (varRef.getExpressionTag() == 
LogicalExpressionTag.FUNCTION_CALL) {
+                            
distinctVars.addAll(getFunctionVariables((AbstractFunctionCallExpression) 
varRef));
+                            
distinctFunctions.add((AbstractFunctionCallExpression) varRef);
+                        }
+                    }
+                }
+            }
+        } else if (grpByDistinctOp.getOperatorTag() == 
LogicalOperatorTag.GROUP) {
+            distinctVars = ((GroupByOperator) 
grpByDistinctOp).getGroupByVarList();
+            nextOp = grpByDistinctOp.getInputs().get(0).getValue();
+            LogicalOperatorTag tag = nextOp.getOperatorTag();
+            while (tag != LogicalOperatorTag.DATASOURCESCAN) {
+                if (tag == LogicalOperatorTag.INNERJOIN || tag == 
LogicalOperatorTag.LEFTOUTERJOIN) {
+                    break;
+                } else if (tag == LogicalOperatorTag.ASSIGN) {
+                    ILogicalExpression assignExpr = ((AssignOperator) 
nextOp).getExpressions().get(0).getValue();
+                    if (assignExpr.getExpressionTag() == 
LogicalExpressionTag.FUNCTION_CALL) {
+                        List<LogicalVariable> fVars = 
getFunctionVariables((AbstractFunctionCallExpression) assignExpr);
+                        LogicalVariable assignVar = ((AssignOperator) 
nextOp).getVariables().get(0);
+                        int idx = distinctVars.indexOf(assignVar);
+                        if (idx != -1 && fVars.size() > 0) {
+                            distinctVars.remove(idx);
+                            distinctVars.addAll(fVars);
+                            
distinctFunctions.add((AbstractFunctionCallExpression) assignExpr);
+                        }
+                    }
+                }
+                nextOp = nextOp.getInputs().get(0).getValue();
+                tag = nextOp.getOperatorTag();
+            }
+        }
+        return new Pair<>(distinctVars, distinctFunctions);
+    }
+
+    private static AssignOperator 
createAssignOpForFunctionExpr(IOptimizationContext optCtx,
+            List<LogicalVariable> distinctVars, 
List<AbstractFunctionCallExpression> funcExpr,
+            SourceLocation sourceLocation) {
+        int counter = 1;
+        List<LogicalVariable> notFoundDistinctVars = new 
ArrayList<>(distinctVars);
+        List<Mutable<ILogicalExpression>> openRecConsArgs = new ArrayList<>();
+        for (AbstractFunctionCallExpression expr : funcExpr) {
+            List<LogicalVariable> funcVars = getFunctionVariables(expr);
+            if (new HashSet<>(distinctVars).containsAll(funcVars)) { // all 
variables in the function are of the current dataset
+                openRecConsArgs.add(new MutableObject<>(
+                        new ConstantExpression(new AsterixConstantValue(new 
AString(String.valueOf(counter))))));
+                openRecConsArgs.add(new MutableObject<>(expr));
+                counter++;
+                notFoundDistinctVars.removeAll(funcVars); // DistinctOp 
variables are found in the function, so remove
+            }
+        }
+        if (openRecConsArgs.size() > 0) { // at least one Function expression 
is available/applicable
+            for (LogicalVariable var : notFoundDistinctVars) {
+                openRecConsArgs.add(new MutableObject<>(
+                        new ConstantExpression(new AsterixConstantValue(new 
AString(String.valueOf(counter))))));
+                openRecConsArgs.add(new MutableObject<>(new 
VariableReferenceExpression(var)));
+                counter++;
+            }
+            AbstractFunctionCallExpression openRecFunc = new 
ScalarFunctionCallExpression(
+                    
BuiltinFunctions.getBuiltinFunctionInfo(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR),
 openRecConsArgs);
+            LogicalVariable assignVar = optCtx.newVar();
+            AssignOperator assignOp = new AssignOperator(assignVar, new 
MutableObject<>(openRecFunc));
+            assignOp.setSourceLocation(sourceLocation);
+            return assignOp;
+        }
+        return null;
+    }
+
+    private static DistinctOperator createDistinctOp(List<LogicalVariable> 
distinctVars, ILogicalOperator inputOp,
+            SourceLocation sourceLocation, 
List<AbstractFunctionCallExpression> funcExpr, IOptimizationContext optCtx) {
+        if (distinctVars.size() == 0 || inputOp == null) {
+            return null;
+        }
+        LogicalOperatorTag tag = inputOp.getOperatorTag();
+        if (tag != LogicalOperatorTag.ASSIGN && tag != 
LogicalOperatorTag.SELECT
+                && tag != LogicalOperatorTag.DATASOURCESCAN) {
+            return null;
+        }
+
+        // create an AssignOp for Function expressions of the corresponding 
GroupByOp or DistinctOp
+        AssignOperator assignOp = createAssignOpForFunctionExpr(optCtx, 
distinctVars, funcExpr, sourceLocation);
+
+        List<Mutable<ILogicalExpression>> distinctExpr = new ArrayList<>();
+        if (assignOp == null) { // no Function expressions are 
available/applicable for the new DistinctOp
+            for (LogicalVariable var : distinctVars) {
+                VariableReferenceExpression varExpr = new 
VariableReferenceExpression(var);
+                varExpr.setSourceLocation(sourceLocation);
+                Mutable<ILogicalExpression> vRef = new 
MutableObject<>(varExpr);
+                distinctExpr.add(vRef);
+            }
+        } else {
+            VariableReferenceExpression varExpr = new 
VariableReferenceExpression(assignOp.getVariables().get(0));
+            varExpr.setSourceLocation(sourceLocation);
+            distinctExpr.add(new MutableObject<>(varExpr));
+        }
+
+        // create a new Distinct operator
+        DistinctOperator distinctOp = new DistinctOperator(distinctExpr);
+        distinctOp.setSourceLocation(sourceLocation);
+        if (assignOp == null) {
+            distinctOp.getInputs().add(new MutableObject<>(inputOp));
+        } else {
+            distinctOp.getInputs().add(new MutableObject<>(assignOp));
+            ILogicalOperator nextOp = distinctOp.getInputs().get(0).getValue();
+            nextOp.getInputs().add(new MutableObject<>(inputOp));
+        }
+        distinctOp.setExecutionMode(inputOp.getExecutionMode());
+
+        return distinctOp;
+    }
+}
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/DistinctCardinalityEstimation.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/DistinctCardinalityEstimation.java
new file mode 100644
index 0000000..93173b5
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/DistinctCardinalityEstimation.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.optimizer.rules.cbo;
+
+import java.util.List;
+
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.SampleDataSource;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+
+public class DistinctCardinalityEstimation {
+    private final IOptimizationContext optCtx;
+    private final JoinEnum joinEnum;
+    private final Stats stats;
+    private long totalSamples;
+    private double distinctFromSamples;
+
+    public DistinctCardinalityEstimation(IOptimizationContext context, 
JoinEnum joinE) {
+        optCtx = context;
+        joinEnum = joinE;
+        stats = joinEnum.getStatsHandle();
+    }
+
+    public void setTotalSamples(long numSamples) {
+        totalSamples = numSamples;
+    }
+
+    public void setDistinctFromSamples(double distinctSamples) {
+        distinctFromSamples = distinctSamples;
+    }
+
+    public long findDistinctCardinality(ILogicalOperator grpByDistinctOp) 
throws AlgebricksException {
+        if (stats == null) {
+            return 0L; // stats object is not initialized yet
+        }
+        long distinctCard = 0L;
+        LogicalOperatorTag tag = grpByDistinctOp.getOperatorTag();
+
+        // distinct cardinality supported only for GroupByOp and DistinctOp
+        if (tag == LogicalOperatorTag.DISTINCT || tag == 
LogicalOperatorTag.GROUP) {
+            ILogicalOperator parent = 
joinEnum.findDataSourceScanOperatorParent(grpByDistinctOp);
+            DataSourceScanOperator scanOp = (DataSourceScanOperator) 
parent.getInputs().get(0).getValue();
+            if (scanOp == null) {
+                return distinctCard; // this may happen in case of in lists
+            }
+
+            Index index = stats.findSampleIndex(scanOp, optCtx);
+            if (index == null) {
+                return distinctCard;
+            }
+
+            Index.SampleIndexDetails idxDetails = (Index.SampleIndexDetails) 
index.getIndexDetails();
+            double origDatasetCard = idxDetails.getSourceCardinality();
+
+            byte dsType = ((DataSource) 
scanOp.getDataSource()).getDatasourceType();
+            if (!(dsType == DataSource.Type.INTERNAL_DATASET || dsType == 
DataSource.Type.EXTERNAL_DATASET)) {
+                return distinctCard; // Datasource must be of a dataset, not 
supported for other datasource types
+            }
+            SampleDataSource sampleDataSource = 
joinEnum.getSampleDataSource(scanOp);
+            String viewInPlan = new ALogicalPlanImpl(new 
MutableObject<>(grpByDistinctOp)).toString(); //useful when debugging
+
+            ILogicalOperator parentOfSelectOp = 
findParentOfSelectOp(grpByDistinctOp);
+            SelectOperator selOp = (parentOfSelectOp == null) ? null
+                    : ((SelectOperator) 
parentOfSelectOp.getInputs().get(0).getValue());
+
+            setTotalSamples(idxDetails.getSampleCardinalityTarget()); // 
sample size without predicates (i.e., n)
+            if (selOp != null) {
+                long sampleWithPredicates = 
findSampleSizeWithPredicates(selOp, sampleDataSource);
+                // set totalSamples to the sample size with predicates (i.e., 
n_f)
+                setTotalSamples(sampleWithPredicates);
+            }
+            // get the estimated distinct cardinality for the dataset (i.e., 
D_est or D_est_f)
+            distinctCard = findEstDistinctWithPredicates(grpByDistinctOp, 
origDatasetCard, sampleDataSource);
+
+            /*// only for the verification purpose of the estimator accuracy
+            if (selOp != null) { // get distinct estimation without predicates 
(i.e., D_est, for verification of accuracy)
+                setTotalSamples(idxDetails.getSampleCardinalityTarget());
+                long initCard = 
findEstDistinctWithoutPredicates(grpByDistinctOp, origDatasetCard, 
sampleDataSource);
+            }*/
+        }
+        return distinctCard;
+    }
+
+    private long findSampleSizeWithPredicates(SelectOperator selOp, 
SampleDataSource sampleDataSource)
+            throws AlgebricksException {
+        long sampleSize = Long.MAX_VALUE;
+        ILogicalOperator copyOfSelOp = 
OperatorManipulationUtil.bottomUpCopyOperators(selOp);
+        if (setSampleDataSource(copyOfSelOp, sampleDataSource)) {
+            List<List<IAObject>> result = stats.runSamplingQuery(optCtx, 
copyOfSelOp);
+            sampleSize = ((AInt64) result.get(0).get(0)).getLongValue();
+        }
+        return sampleSize;
+    }
+
+    private long findEstDistinctWithPredicates(ILogicalOperator 
grpByDistinctOp, double origDatasetCardinality,
+            SampleDataSource sampleDataSource) throws AlgebricksException {
+        double estCardinality = -1.0;
+        LogicalOperatorTag tag = grpByDistinctOp.getOperatorTag();
+        if (tag == LogicalOperatorTag.GROUP || tag == 
LogicalOperatorTag.DISTINCT) {
+            ILogicalOperator copyOfGrpByDistinctOp = 
OperatorManipulationUtil.bottomUpCopyOperators(grpByDistinctOp);
+            if (setSampleDataSource(copyOfGrpByDistinctOp, sampleDataSource)) {
+                // get distinct cardinality from the sampling source
+                List<List<IAObject>> result = stats.runSamplingQuery(optCtx, 
copyOfGrpByDistinctOp);
+                estCardinality = ((double) ((AInt64) 
result.get(0).get(0)).getLongValue());
+            }
+        }
+        if (estCardinality != -1.0) { // estimate distinct cardinality for the 
dataset from the sampled cardinality
+            estCardinality = distinctEstimator(estCardinality, 
origDatasetCardinality);
+        }
+        estCardinality = Math.max(0.0, estCardinality);
+        return Math.round(estCardinality);
+    }
+
+    private long findEstDistinctWithoutPredicates(ILogicalOperator 
grpByDistinctOp, double origDatasetCardinality,
+            SampleDataSource sampleDataSource) throws AlgebricksException {
+        double estCardinality = -1.0;
+        LogicalOperatorTag tag = grpByDistinctOp.getOperatorTag();
+        if (tag == LogicalOperatorTag.GROUP || tag == 
LogicalOperatorTag.DISTINCT) {
+            ILogicalOperator parentOfSelOp = 
findParentOfSelectOp(grpByDistinctOp);
+            ILogicalOperator nextOp = (parentOfSelOp == null) ? null : 
(parentOfSelOp.getInputs().get(0).getValue());
+
+            if (nextOp != null && nextOp.getOperatorTag() == 
LogicalOperatorTag.SELECT) { // skip the SelectOp
+                nextOp = nextOp.getInputs().get(0).getValue(); // must be an 
AssignOp or DataSourceScanOp
+
+                ILogicalOperator copyOfGrpByDistinctOp =
+                        
OperatorManipulationUtil.bottomUpCopyOperators(grpByDistinctOp);
+                parentOfSelOp = findParentOfSelectOp(copyOfGrpByDistinctOp);
+                ILogicalOperator copyOfNextOp = 
OperatorManipulationUtil.bottomUpCopyOperators(nextOp);
+                assert parentOfSelOp != null;
+                parentOfSelOp.getInputs().get(0).setValue(copyOfNextOp);
+
+                if (setSampleDataSource(copyOfGrpByDistinctOp, 
sampleDataSource)) {
+                    // get distinct cardinality from the sampling source
+                    List<List<IAObject>> result = 
stats.runSamplingQuery(optCtx, copyOfGrpByDistinctOp);
+                    estCardinality = ((double) ((AInt64) 
result.get(0).get(0)).getLongValue());
+                }
+            }
+        }
+        if (estCardinality != -1.0) { // estimate distinct cardinality for the 
dataset from the sampled cardinality
+            estCardinality = distinctEstimator(estCardinality, 
origDatasetCardinality);
+        }
+        estCardinality = Math.max(0.0, estCardinality);
+        return Math.round(estCardinality);
+    }
+
+    private double distinctEstimator(double estCardinality, double 
origDatasetCardinality) {
+        if (totalSamples <= 1) {
+            totalSamples += 2;
+            estCardinality = totalSamples - 1;
+        } else if (estCardinality == totalSamples) {
+            estCardinality--;
+        }
+        setDistinctFromSamples(estCardinality);
+
+        double denominator = derivativeFunctionForMMO(estCardinality);
+        if (denominator == 0.0) { // Newton-Raphson method requires it to be 
non-zero
+            return estCardinality;
+        }
+        double fraction = functionForMMO(estCardinality) / denominator;
+        while (Math.abs(fraction) >= 0.001) {
+            denominator = derivativeFunctionForMMO(estCardinality);
+            if (denominator == 0.0) {
+                break;
+            }
+            fraction = functionForMMO(estCardinality) / denominator;
+            estCardinality = estCardinality - fraction;
+            if (estCardinality > origDatasetCardinality) {
+                estCardinality = origDatasetCardinality; // for preventing 
infinite growth beyond N
+                break;
+            }
+        }
+        return estCardinality;
+    }
+
+    private double functionForMMO(double x) {
+        return (x * (1.0 - Math.exp(-1.0 * (double) totalSamples / x)) - 
distinctFromSamples);
+    }
+
+    private double derivativeFunctionForMMO(double x) {
+        double arg = ((double) totalSamples / x);
+        return (1.0 - (arg + 1.0) * Math.exp(-1.0 * arg));
+    }
+
+    private boolean setSampleDataSource(ILogicalOperator op, SampleDataSource 
sampleDataSource) {
+        ILogicalOperator parent = 
joinEnum.findDataSourceScanOperatorParent(op);
+        DataSourceScanOperator scanOp = (DataSourceScanOperator) 
parent.getInputs().get(0).getValue();
+        if (scanOp == null) {
+            return false;
+        }
+        // replace the DataSourceScanOp with the sampling source
+        scanOp.setDataSource(sampleDataSource);
+        return true;
+    }
+
+    private ILogicalOperator findParentOfSelectOp(ILogicalOperator op) {
+        ILogicalOperator parent = null;
+        LogicalOperatorTag tag = op.getOperatorTag();
+        while (tag != LogicalOperatorTag.DATASOURCESCAN) {
+            if (tag == LogicalOperatorTag.SELECT) {
+                return parent;
+            }
+            parent = op;
+            op = op.getInputs().get(0).getValue();
+            tag = op.getOperatorTag();
+        }
+        return null; // no SelectOp in the query tree
+    }
+}
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
index 4c9a10e..fe54164 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
@@ -78,6 +78,8 @@
     List<JoinOperator> allJoinOps; // can be inner join or left outer join
     // Will be in the order of the from clause. Important for position 
ordering when assigning bits to join expressions.
     List<ILogicalOperator> leafInputs;
+    // The Distinct operators for each Select or DataSourceScan operator (if 
applicable)
+    HashMap<DataSourceScanOperator, ILogicalOperator> 
dataScanOrSelectAndDistinctOps;
     HashMap<LogicalVariable, Integer> varLeafInputIds;
     List<Triple<Integer, Integer, Boolean>> buildSets; // the first is the 
bits and the second is the number of tables.
     List<Quadruple<Integer, Integer, JoinOperator, Integer>> 
outerJoinsDependencyList;
@@ -86,6 +88,7 @@

     public EnumerateJoinsRule(JoinEnum joinEnum) {
         this.joinEnum = joinEnum;
+        dataScanOrSelectAndDistinctOps = new HashMap<>(); // initialized only 
once at the beginning of the rule
     }

     @Override
@@ -118,6 +121,11 @@
             return false;
         }

+        // If cboMode or cboTestMode is true, identify each DistinctOp or 
GroupByOp for the corresponding DataScanOp
+        if (op.getOperatorTag() == LogicalOperatorTag.DISTRIBUTE_RESULT) {
+            getDistinctOpsForJoinNodes(op, context);
+        }
+
         // if this join has already been seen before, no need to apply the 
rule again
         if (context.checkIfInDontApplySet(this, op)) {
             return false;
@@ -159,6 +167,7 @@
         }
         joinEnum.initEnum((AbstractLogicalOperator) op, cboMode, cboTestMode, 
numberOfFromTerms, leafInputs, allJoinOps,
                 assignOps, outerJoinsDependencyList, buildSets, 
varLeafInputIds, context);
+        joinEnum.dataScanOrSelectAndDistinctOps = 
this.dataScanOrSelectAndDistinctOps;

         if (cboMode) {
             if (!doAllDataSourcesHaveSamples(leafInputs, context)) {
@@ -388,6 +397,45 @@
         }
     }

+    private void getDistinctOpsForJoinNodes(ILogicalOperator op, 
IOptimizationContext context) {
+        if (op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
+            return;
+        }
+        ILogicalOperator grpByDistinctOp = null; // null indicates no 
DistinctOp or GroupByOp
+        DataSourceScanOperator scanOp;
+        while (true) {
+            LogicalOperatorTag tag = op.getOperatorTag();
+            if (tag == LogicalOperatorTag.DISTINCT || tag == 
LogicalOperatorTag.GROUP) {
+                grpByDistinctOp = op; // GroupByOp Variable expressions (if 
any) take over DistinctOp ones
+            } else if (tag == LogicalOperatorTag.INNERJOIN || tag == 
LogicalOperatorTag.LEFTOUTERJOIN) {
+                if (grpByDistinctOp != null) {
+                    for (int i = 0; i < op.getInputs().size(); i++) {
+                        ILogicalOperator nextOp = 
op.getInputs().get(i).getValue();
+                        
CBODistinctOperatorUtils.createDistinctOpsForJoinNodes(nextOp, grpByDistinctOp, 
context,
+                                dataScanOrSelectAndDistinctOps);
+                    }
+                }
+                return;
+            } else if (tag == LogicalOperatorTag.DATASOURCESCAN) { // single 
table queries
+                scanOp = (DataSourceScanOperator) op;
+                // will work for any attributes present in GroupByOp or 
DistinctOp
+                // Note: add the following condition:
+                //      
"!CBODistinctOperatorUtils.containsAllGroupByDistinctVarsInScanOp(scanOp, 
grpByDistinctOp)"
+                // in the following if-statement if CBO doesn't need to 
estimate the number of distinct values
+                // when GroupByOp or DistinctOp contains all PK attributes
+                // (in the case of estimated cardinality from samples can be 
mostly same as original dataset cardinality)
+                if (grpByDistinctOp != null) {
+                    dataScanOrSelectAndDistinctOps.put(scanOp, 
grpByDistinctOp);
+                }
+                return;
+            }
+            op = op.getInputs().get(0).getValue();
+            if (op.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
+                return; // if this happens, there is nothing we can do in CBO 
code since there is no DataSourceScan
+            }
+        }
+    }
+
     private int getLeafInputId(LogicalVariable lv) {
         if (varLeafInputIds.containsKey(lv))
             return varLeafInputIds.get(lv);
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
index 3ce4e9f..54d8ed8 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
@@ -103,6 +103,8 @@
     protected JoinNode[] jnArray; // array of all join nodes
     protected int jnArraySize;
     protected List<ILogicalOperator> leafInputs;
+    // The Distinct operators for each Select or DataScan operator (if 
applicable)
+    HashMap<DataSourceScanOperator, ILogicalOperator> 
dataScanOrSelectAndDistinctOps;
     protected List<ILogicalExpression> singleDatasetPreds;
     protected List<AssignOperator> assignOps;
     List<Quadruple<Integer, Integer, JoinOperator, Integer>> 
outerJoinsDependencyList;
@@ -116,6 +118,7 @@
     protected int maxBits; // the joinNode where the dataset bits are the 
highest is where all the tables have been joined

     protected Stats stats;
+    protected DistinctCardinalityEstimation distinctEst;
     private boolean cboMode;
     private boolean cboTestMode;
     protected int cboFullEnumLevel;
@@ -148,6 +151,7 @@
         this.connectedJoinGraph = true;
         this.optCtx = context;
         this.leafInputs = leafInputs;
+        this.dataScanOrSelectAndDistinctOps = new HashMap<>();
         this.assignOps = assignOps;
         this.outerJoin = false; // assume no outerjoins anywhere in the query 
at first.
         this.outerJoinsDependencyList = outerJoinsDependencyList;
@@ -166,6 +170,7 @@
         this.cost = new Cost();
         this.costMethods = new CostMethods(context);
         this.stats = new Stats(optCtx, this);
+        this.distinctEst = new DistinctCardinalityEstimation(optCtx, this);
         this.jnArraySize = (int) Math.pow(2.0, this.numberOfTerms);
         this.jnArray = new JoinNode[this.jnArraySize];
         // initialize all the join nodes
@@ -836,6 +841,9 @@
                 if (scanOp == null) {
                     continue; // what happens to the cards and sizes then? 
this may happen in case of in lists
                 }
+                ILogicalOperator grpByDistinctOp = 
this.dataScanOrSelectAndDistinctOps.get(scanOp);
+                long distinctCardinality =
+                        (grpByDistinctOp != null) ? 
distinctEst.findDistinctCardinality(grpByDistinctOp) : 0L;

                 finalDatasetCard = origDatasetCard = 
idxDetails.getSourceCardinality();


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17759
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7443ac65d53aaa3d3579baf3874e660bf562e925
Gerrit-Change-Number: 17759
Gerrit-PatchSet: 1
Gerrit-Owner: Mehnaz Tabassum Mahin <[email protected]>
Gerrit-MessageType: newchange

Reply via email to