>From Peeyush Gupta <[email protected]>:

Peeyush Gupta has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19406 )


Change subject: WIP: Predicate pushdown for Delta Tables
......................................................................

WIP: Predicate pushdown for Delta Tables

Change-Id: If1a46db488ee0f26aeea27069a4668665d1781dc
---
A 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
A 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
9 files changed, 334 insertions(+), 14 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/06/19406/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index 16aeecb..a101bed 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -32,6 +32,7 @@
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnRangeFilterPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.ConsolidateProjectionAndFilterExpressionsProcessor;
+import 
org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
@@ -118,6 +119,7 @@
         }
         // Performs prefix pushdowns
         pushdownProcessorsExecutor.add(new 
ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
+        pushdownProcessorsExecutor.add(new 
DeltaTableFilterPushdownProcessor(pushdownContext, context));
         pushdownProcessorsExecutor
                 .add(new 
ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
         // Inlines AND/OR expression (must be last to run)
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
new file mode 100644
index 0000000..8e5bf5b
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS;
+
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import 
org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class DeltaTableFilterPushdownProcessor extends 
ColumnFilterPushdownProcessor {
+
+    public DeltaTableFilterPushdownProcessor(PushdownContext pushdownContext, 
IOptimizationContext context) {
+        super(pushdownContext, context);
+    }
+
+    @Override
+    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws 
AlgebricksException {
+        return !DatasetUtil.isDeltaTable(scanDefineDescriptor.getDataset());
+    }
+
+    @Override
+    protected boolean isNotPushable(AbstractFunctionCallExpression expression) 
{
+        FunctionIdentifier fid = expression.getFunctionIdentifier();
+        return ARRAY_FUNCTIONS.contains(fid) || 
super.isNotPushable(expression);
+    }
+
+    @Override
+    protected boolean handlePath(AbstractFunctionCallExpression expression) 
throws AlgebricksException {
+        IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+        if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+            return false;
+        }
+
+        // The inferred path from the provided expression
+        ARecordType expressionPath = 
pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+        paths.put(expression, expressionPath);
+        return true;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index 67d460c..62801b3 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -23,6 +23,8 @@
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;

@@ -195,11 +197,11 @@
             long size = writer.getDataSize();
             writer.close();

-            List<Action> actions = List.of(new AddFile("firstFile.parquet", 
new HashMap<>(), size,
-                    System.currentTimeMillis(), true, null, null));
+            List<Action> actions = List.of(new AddFile("firstFile.parquet", 
Collections.singletonMap("name", "Cooper"),
+                    size, System.currentTimeMillis(), true, null, null));
             DeltaLog log = DeltaLog.forTable(conf, DELTA_MULTI_FILE_TABLE);
             OptimisticTransaction txn = log.startTransaction();
-            Metadata metaData = 
txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
+            Metadata metaData = 
txn.metadata().copyBuilder().partitionColumns(Arrays.asList("name"))
                     .schema(new StructType().add(new StructField("id", new 
IntegerType(), true))
                             .add(new StructField("name", new StringType(), 
true))
                             .add(new StructField("age", new StringType(), 
true)))
@@ -218,8 +220,8 @@
             long size2 = writer2.getDataSize();
             writer2.close();

-            List<Action> actions2 = List.of(new AddFile("secondFile.parquet", 
new HashMap<>(), size2,
-                    System.currentTimeMillis(), true, null, null));
+            List<Action> actions2 = List.of(new AddFile("secondFile.parquet", 
Collections.singletonMap("name", "Brand"),
+                    size2, System.currentTimeMillis(), true, null, null));
             OptimisticTransaction txn2 = log.startTransaction();
             txn2.commit(actions2, new Operation(Operation.Name.WRITE), 
"deltalake-table-create");

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
new file mode 100644
index 0000000..c954f9b
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+import io.delta.kernel.expressions.Expression;
+
+public class DeltaTableFilterEvaluatorFactory implements 
IExternalFilterEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+    private final Expression filterExpression;
+
+    public DeltaTableFilterEvaluatorFactory(Expression expression) {
+        this.filterExpression = expression;
+    }
+
+    @Override
+    public IExternalFilterEvaluator create(IServiceContext serviceContext, 
IWarningCollector warningCollector)
+            throws HyracksDataException {
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector 
warningCollector) {
+        return NoOpFilterValueEmbedder.INSTANCE;
+    }
+
+    public Expression getFilterExpression() {
+        return filterExpression;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index e3313f7..6b51c88 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -39,6 +39,7 @@
 import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
+import 
org.apache.asterix.external.input.filter.DeltaTableFilterEvaluatorFactory;
 import 
org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.HDFSUtils;
@@ -62,6 +63,8 @@
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
 import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Predicate;
 import io.delta.kernel.internal.InternalScanFileUtils;
 import io.delta.kernel.types.StructType;
 import io.delta.kernel.utils.CloseableIterator;
@@ -124,7 +127,14 @@
         } catch (AsterixDeltaRuntimeException e) {
             throw e.getHyracksDataException();
         }
-        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+        Expression filterExpression = ((DeltaTableFilterEvaluatorFactory) 
filterEvaluatorFactory).getFilterExpression();
+        Scan scan;
+        if (filterExpression != null) {
+            scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema)
+                    .withFilter(engine, (Predicate) filterExpression).build();
+        } else {
+            scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+        }
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         CloseableIterator<FilteredColumnarBatch> iter = 
scan.getScanFiles(engine);

diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 923dbd4..7df9b47 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -744,4 +744,9 @@
         return dataset.getDatasetType() == DatasetType.INTERNAL
                 && dataset.getDatasetFormatInfo().getFormat() == 
DatasetConfig.DatasetFormat.COLUMN;
     }
+
+    public static boolean isDeltaTable(Dataset dataset) {
+        return dataset.getDatasetType() == DatasetType.EXTERNAL && 
ExternalDataUtils
+                .isDeltaTable(((ExternalDatasetDetails) 
dataset.getDatasetDetails()).getProperties());
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index dda2111..55cb4fa 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.metadata.utils;

+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;

@@ -49,6 +50,7 @@
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.utils.filter.ColumnFilterBuilder;
 import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder;
+import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder;
 import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.base.IAObject;
@@ -338,13 +340,17 @@
         if (projectionFiltrationInfo == 
DefaultProjectionFiltrationInfo.INSTANCE) {
             return NoOpExternalFilterEvaluatorFactory.INSTANCE;
         }
-
-        ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
-        ExternalDatasetProjectionFiltrationInfo pfi =
-                (ExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo;
-        ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, context, 
typeEnv, prefix);
-
-        return build.build();
+        if (isDeltaTable(properties)) {
+            DeltaTableFilterBuilder builder = new DeltaTableFilterBuilder(
+                    (ExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo, context, typeEnv);
+            return builder.build();
+        } else {
+            ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+            ExternalDatasetProjectionFiltrationInfo pfi =
+                    (ExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo;
+            ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, 
context, typeEnv, prefix);
+            return build.build();
+        }
     }

 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
index ce39220..f41a116 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
@@ -100,7 +100,7 @@
         return argsEvalFactories;
     }

-    private IFunctionDescriptor resolveFunction(AbstractFunctionCallExpression 
funcExpr) throws AlgebricksException {
+    protected IFunctionDescriptor 
resolveFunction(AbstractFunctionCallExpression funcExpr) throws 
AlgebricksException {
         MetadataProvider metadataProvider = (MetadataProvider) 
context.getMetadataProvider();
         IFunctionManager functionManager = 
metadataProvider.getFunctionManager();
         FunctionIdentifier fnId = funcExpr.getFunctionIdentifier();
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
new file mode 100644
index 0000000..b61baef
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import 
org.apache.asterix.external.input.filter.DeltaTableFilterEvaluatorFactory;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt16;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.types.ARecordType;
+import 
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import 
org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.logging.log4j.LogManager;
+
+import com.microsoft.azure.storage.core.Logger;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+
+public class DeltaTableFilterBuilder extends AbstractFilterBuilder {
+
+    private static final org.apache.logging.log4j.Logger LOGGER = 
LogManager.getLogger();
+
+    public DeltaTableFilterBuilder(ExternalDatasetProjectionFiltrationInfo 
projectionFiltrationInfo,
+            JobGenContext context, IVariableTypeEnvironment typeEnv) {
+        super(projectionFiltrationInfo.getFilterPaths(), 
projectionFiltrationInfo.getFilterExpression(), context,
+                typeEnv);
+    }
+
+    public IExternalFilterEvaluatorFactory build() throws AlgebricksException {
+        Expression deltaTablePredicate = null;
+        if (filterExpression != null) {
+            try {
+                deltaTablePredicate = createExpression(filterExpression);
+            } catch (Exception e) {
+                LOGGER.error("Error creating DeltaTable filter expression ", 
e);
+            }
+        }
+        if (deltaTablePredicate != null && !(deltaTablePredicate instanceof 
Predicate)) {
+            deltaTablePredicate = null;
+        }
+        return new DeltaTableFilterEvaluatorFactory(deltaTablePredicate);
+    }
+
+    protected Expression createExpression(ILogicalExpression expression) 
throws AlgebricksException {
+        if (filterPaths.containsKey(expression)) {
+            // Path expression, create a value accessor (i.e., a column reader)
+            return createColumnExpression(expression);
+        } else if (expression.getExpressionTag() == 
LogicalExpressionTag.CONSTANT) {
+            return createLiteralExpression(expression);
+        } else if (expression.getExpressionTag() == 
LogicalExpressionTag.FUNCTION_CALL) {
+            return handleFunction(expression);
+        }
+
+        /*
+         * A variable expression: This should not happen as the provided 
filter expression is inlined.
+         * If a variable was encountered for some reason, it should only be 
the record variable. If the record variable
+         * was encountered, that means there's a missing value path the 
compiler didn't provide.
+         */
+        throw new RuntimeException("Unsupported expression " + expression + ". 
the provided paths are: " + filterPaths);
+    }
+
+    private Expression createLiteralExpression(ILogicalExpression expression) 
throws AlgebricksException {
+        ConstantExpression constExpr = (ConstantExpression) expression;
+        AsterixConstantValue constantValue = (AsterixConstantValue) 
constExpr.getValue();
+        switch (constantValue.getObject().getType().getTypeTag()) {
+            case STRING:
+                return Literal.ofString(((AString) 
constantValue.getObject()).getStringValue());
+            case TINYINT:
+                return Literal.ofByte(((AInt8) 
constantValue.getObject()).getByteValue());
+            case SMALLINT:
+                return Literal.ofShort(((AInt16) 
constantValue.getObject()).getShortValue());
+            case INTEGER:
+                return Literal.ofInt(((AInt32) 
constantValue.getObject()).getIntegerValue());
+            case BOOLEAN:
+                return Literal.ofBoolean(constantValue.isTrue());
+            case BIGINT:
+                return Literal.ofLong(((AInt64) 
constantValue.getObject()).getLongValue());
+            case DOUBLE:
+                return Literal.ofDouble(((ADouble) 
constantValue.getObject()).getDoubleValue());
+            default:
+                throw new RuntimeException("Unsupported literal type: " + 
constantValue.getObject().getType());
+        }
+    }
+
+    @Override
+    protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression 
expression) {
+        return null;
+    }
+
+    private Expression handleFunction(ILogicalExpression expr) throws 
AlgebricksException {
+        AbstractFunctionCallExpression funcExpr = 
(AbstractFunctionCallExpression) expr;
+        IFunctionDescriptor fd = resolveFunction(funcExpr);
+        List<Expression> args = handleArgs(funcExpr);
+        FunctionIdentifier fid = fd.getIdentifier();
+        if (fid.equals(AlgebricksBuiltinFunctions.AND)) {
+            return new Predicate("AND", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.OR)) {
+            return new Predicate("OR", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+            return new Predicate("=", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) {
+            return new Predicate(">=", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) {
+            return new Predicate(">", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) {
+            return new Predicate("<=", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) {
+            return new Predicate("<", args);
+        } else {
+            throw new RuntimeException("Unsupported function: " + funcExpr);
+        }
+    }
+
+    private List<Expression> handleArgs(AbstractFunctionCallExpression 
funcExpr) throws AlgebricksException {
+        List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+        List<Expression> argsExpressions = new ArrayList<>();
+        for (int i = 0; i < args.size(); i++) {
+            ILogicalExpression expr = args.get(i).getValue();
+            Expression evalFactory = createExpression(expr);
+            argsExpressions.add(evalFactory);
+        }
+        return argsExpressions;
+    }
+
+    protected Column createColumnExpression(ILogicalExpression expression) {
+        ARecordType path = filterPaths.get(expression);
+        if (path.getFieldNames().length != 1) {
+            throw new RuntimeException("Unsupported expression: " + 
expression);
+        }
+        return new Column(path.getFieldNames()[0]);
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19406
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: If1a46db488ee0f26aeea27069a4668665d1781dc
Gerrit-Change-Number: 19406
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-MessageType: newchange

Reply via email to