>From Peeyush Gupta <[email protected]>:
Peeyush Gupta has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19406 )
Change subject: WIP: Predicate pushdown for Delta Tables
......................................................................
WIP: Predicate pushdown for Delta Tables
Change-Id: If1a46db488ee0f26aeea27069a4668665d1781dc
---
A
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
A
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
9 files changed, 334 insertions(+), 14 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/06/19406/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index 16aeecb..a101bed 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -32,6 +32,7 @@
import
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnRangeFilterPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.ConsolidateProjectionAndFilterExpressionsProcessor;
+import
org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
@@ -118,6 +119,7 @@
}
// Performs prefix pushdowns
pushdownProcessorsExecutor.add(new
ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
+ pushdownProcessorsExecutor.add(new
DeltaTableFilterPushdownProcessor(pushdownContext, context));
pushdownProcessorsExecutor
.add(new
ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
// Inlines AND/OR expression (must be last to run)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
new file mode 100644
index 0000000..8e5bf5b
--- /dev/null
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS;
+
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import
org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import
org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import
org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class DeltaTableFilterPushdownProcessor extends
ColumnFilterPushdownProcessor {
+
+ public DeltaTableFilterPushdownProcessor(PushdownContext pushdownContext,
IOptimizationContext context) {
+ super(pushdownContext, context);
+ }
+
+ @Override
+ protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws
AlgebricksException {
+ return !DatasetUtil.isDeltaTable(scanDefineDescriptor.getDataset());
+ }
+
+ @Override
+ protected boolean isNotPushable(AbstractFunctionCallExpression expression)
{
+ FunctionIdentifier fid = expression.getFunctionIdentifier();
+ return ARRAY_FUNCTIONS.contains(fid) ||
super.isNotPushable(expression);
+ }
+
+ @Override
+ protected boolean handlePath(AbstractFunctionCallExpression expression)
throws AlgebricksException {
+ IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+ if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+ return false;
+ }
+
+ // The inferred path from the provided expression
+ ARecordType expressionPath =
pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+ paths.put(expression, expressionPath);
+ return true;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index 67d460c..62801b3 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -23,6 +23,8 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -195,11 +197,11 @@
long size = writer.getDataSize();
writer.close();
- List<Action> actions = List.of(new AddFile("firstFile.parquet",
new HashMap<>(), size,
- System.currentTimeMillis(), true, null, null));
+ List<Action> actions = List.of(new AddFile("firstFile.parquet",
Collections.singletonMap("name", "Cooper"),
+ size, System.currentTimeMillis(), true, null, null));
DeltaLog log = DeltaLog.forTable(conf, DELTA_MULTI_FILE_TABLE);
OptimisticTransaction txn = log.startTransaction();
- Metadata metaData =
txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
+ Metadata metaData =
txn.metadata().copyBuilder().partitionColumns(Arrays.asList("name"))
.schema(new StructType().add(new StructField("id", new
IntegerType(), true))
.add(new StructField("name", new StringType(),
true))
.add(new StructField("age", new StringType(),
true)))
@@ -218,8 +220,8 @@
long size2 = writer2.getDataSize();
writer2.close();
- List<Action> actions2 = List.of(new AddFile("secondFile.parquet",
new HashMap<>(), size2,
- System.currentTimeMillis(), true, null, null));
+ List<Action> actions2 = List.of(new AddFile("secondFile.parquet",
Collections.singletonMap("name", "Brand"),
+ size2, System.currentTimeMillis(), true, null, null));
OptimisticTransaction txn2 = log.startTransaction();
txn2.commit(actions2, new Operation(Operation.Name.WRITE),
"deltalake-table-create");
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
new file mode 100644
index 0000000..c954f9b
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+import io.delta.kernel.expressions.Expression;
+
+public class DeltaTableFilterEvaluatorFactory implements
IExternalFilterEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+ private final Expression filterExpression;
+
+ public DeltaTableFilterEvaluatorFactory(Expression expression) {
+ this.filterExpression = expression;
+ }
+
+ @Override
+ public IExternalFilterEvaluator create(IServiceContext serviceContext,
IWarningCollector warningCollector)
+ throws HyracksDataException {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector
warningCollector) {
+ return NoOpFilterValueEmbedder.INSTANCE;
+ }
+
+ public Expression getFilterExpression() {
+ return filterExpression;
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index e3313f7..6b51c88 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -39,6 +39,7 @@
import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
+import
org.apache.asterix.external.input.filter.DeltaTableFilterEvaluatorFactory;
import
org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.HDFSUtils;
@@ -62,6 +63,8 @@
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
@@ -124,7 +127,14 @@
} catch (AsterixDeltaRuntimeException e) {
throw e.getHyracksDataException();
}
- Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine,
requiredSchema).build();
+ Expression filterExpression = ((DeltaTableFilterEvaluatorFactory)
filterEvaluatorFactory).getFilterExpression();
+ Scan scan;
+ if (filterExpression != null) {
+ scan = snapshot.getScanBuilder(engine).withReadSchema(engine,
requiredSchema)
+ .withFilter(engine, (Predicate) filterExpression).build();
+ } else {
+ scan = snapshot.getScanBuilder(engine).withReadSchema(engine,
requiredSchema).build();
+ }
scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
CloseableIterator<FilteredColumnarBatch> iter =
scan.getScanFiles(engine);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 923dbd4..7df9b47 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -744,4 +744,9 @@
return dataset.getDatasetType() == DatasetType.INTERNAL
&& dataset.getDatasetFormatInfo().getFormat() ==
DatasetConfig.DatasetFormat.COLUMN;
}
+
+ public static boolean isDeltaTable(Dataset dataset) {
+ return dataset.getDatasetType() == DatasetType.EXTERNAL &&
ExternalDataUtils
+ .isDeltaTable(((ExternalDatasetDetails)
dataset.getDatasetDetails()).getProperties());
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index dda2111..55cb4fa 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.metadata.utils;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
@@ -49,6 +50,7 @@
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.utils.filter.ColumnFilterBuilder;
import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder;
+import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder;
import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.base.IAObject;
@@ -338,13 +340,17 @@
if (projectionFiltrationInfo ==
DefaultProjectionFiltrationInfo.INSTANCE) {
return NoOpExternalFilterEvaluatorFactory.INSTANCE;
}
-
- ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
- ExternalDatasetProjectionFiltrationInfo pfi =
- (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo;
- ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, context,
typeEnv, prefix);
-
- return build.build();
+ if (isDeltaTable(properties)) {
+ DeltaTableFilterBuilder builder = new DeltaTableFilterBuilder(
+ (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo, context, typeEnv);
+ return builder.build();
+ } else {
+ ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+ ExternalDatasetProjectionFiltrationInfo pfi =
+ (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo;
+ ExternalFilterBuilder build = new ExternalFilterBuilder(pfi,
context, typeEnv, prefix);
+ return build.build();
+ }
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
index ce39220..f41a116 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
@@ -100,7 +100,7 @@
return argsEvalFactories;
}
- private IFunctionDescriptor resolveFunction(AbstractFunctionCallExpression
funcExpr) throws AlgebricksException {
+ protected IFunctionDescriptor
resolveFunction(AbstractFunctionCallExpression funcExpr) throws
AlgebricksException {
MetadataProvider metadataProvider = (MetadataProvider)
context.getMetadataProvider();
IFunctionManager functionManager =
metadataProvider.getFunctionManager();
FunctionIdentifier fnId = funcExpr.getFunctionIdentifier();
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
new file mode 100644
index 0000000..b61baef
--- /dev/null
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import
org.apache.asterix.external.input.filter.DeltaTableFilterEvaluatorFactory;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt16;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.types.ARecordType;
+import
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import
org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.logging.log4j.LogManager;
+
+import com.microsoft.azure.storage.core.Logger;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+
+public class DeltaTableFilterBuilder extends AbstractFilterBuilder {
+
+ private static final org.apache.logging.log4j.Logger LOGGER =
LogManager.getLogger();
+
+ public DeltaTableFilterBuilder(ExternalDatasetProjectionFiltrationInfo
projectionFiltrationInfo,
+ JobGenContext context, IVariableTypeEnvironment typeEnv) {
+ super(projectionFiltrationInfo.getFilterPaths(),
projectionFiltrationInfo.getFilterExpression(), context,
+ typeEnv);
+ }
+
+ public IExternalFilterEvaluatorFactory build() throws AlgebricksException {
+ Expression deltaTablePredicate = null;
+ if (filterExpression != null) {
+ try {
+ deltaTablePredicate = createExpression(filterExpression);
+ } catch (Exception e) {
+ LOGGER.error("Error creating DeltaTable filter expression ",
e);
+ }
+ }
+ if (deltaTablePredicate != null && !(deltaTablePredicate instanceof
Predicate)) {
+ deltaTablePredicate = null;
+ }
+ return new DeltaTableFilterEvaluatorFactory(deltaTablePredicate);
+ }
+
+ protected Expression createExpression(ILogicalExpression expression)
throws AlgebricksException {
+ if (filterPaths.containsKey(expression)) {
+ // Path expression, create a value accessor (i.e., a column reader)
+ return createColumnExpression(expression);
+ } else if (expression.getExpressionTag() ==
LogicalExpressionTag.CONSTANT) {
+ return createLiteralExpression(expression);
+ } else if (expression.getExpressionTag() ==
LogicalExpressionTag.FUNCTION_CALL) {
+ return handleFunction(expression);
+ }
+
+ /*
+ * A variable expression: This should not happen as the provided
filter expression is inlined.
+ * If a variable was encountered for some reason, it should only be
the record variable. If the record variable
+ * was encountered, that means there's a missing value path the
compiler didn't provide.
+ */
+ throw new RuntimeException("Unsupported expression " + expression + ".
the provided paths are: " + filterPaths);
+ }
+
+ private Expression createLiteralExpression(ILogicalExpression expression)
throws AlgebricksException {
+ ConstantExpression constExpr = (ConstantExpression) expression;
+ AsterixConstantValue constantValue = (AsterixConstantValue)
constExpr.getValue();
+ switch (constantValue.getObject().getType().getTypeTag()) {
+ case STRING:
+ return Literal.ofString(((AString)
constantValue.getObject()).getStringValue());
+ case TINYINT:
+ return Literal.ofByte(((AInt8)
constantValue.getObject()).getByteValue());
+ case SMALLINT:
+ return Literal.ofShort(((AInt16)
constantValue.getObject()).getShortValue());
+ case INTEGER:
+ return Literal.ofInt(((AInt32)
constantValue.getObject()).getIntegerValue());
+ case BOOLEAN:
+ return Literal.ofBoolean(constantValue.isTrue());
+ case BIGINT:
+ return Literal.ofLong(((AInt64)
constantValue.getObject()).getLongValue());
+ case DOUBLE:
+ return Literal.ofDouble(((ADouble)
constantValue.getObject()).getDoubleValue());
+ default:
+ throw new RuntimeException("Unsupported literal type: " +
constantValue.getObject().getType());
+ }
+ }
+
+ @Override
+ protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression
expression) {
+ return null;
+ }
+
+ private Expression handleFunction(ILogicalExpression expr) throws
AlgebricksException {
+ AbstractFunctionCallExpression funcExpr =
(AbstractFunctionCallExpression) expr;
+ IFunctionDescriptor fd = resolveFunction(funcExpr);
+ List<Expression> args = handleArgs(funcExpr);
+ FunctionIdentifier fid = fd.getIdentifier();
+ if (fid.equals(AlgebricksBuiltinFunctions.AND)) {
+ return new Predicate("AND", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.OR)) {
+ return new Predicate("OR", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+ return new Predicate("=", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) {
+ return new Predicate(">=", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) {
+ return new Predicate(">", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) {
+ return new Predicate("<=", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) {
+ return new Predicate("<", args);
+ } else {
+ throw new RuntimeException("Unsupported function: " + funcExpr);
+ }
+ }
+
+ private List<Expression> handleArgs(AbstractFunctionCallExpression
funcExpr) throws AlgebricksException {
+ List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+ List<Expression> argsExpressions = new ArrayList<>();
+ for (int i = 0; i < args.size(); i++) {
+ ILogicalExpression expr = args.get(i).getValue();
+ Expression evalFactory = createExpression(expr);
+ argsExpressions.add(evalFactory);
+ }
+ return argsExpressions;
+ }
+
+ protected Column createColumnExpression(ILogicalExpression expression) {
+ ARecordType path = filterPaths.get(expression);
+ if (path.getFieldNames().length != 1) {
+ throw new RuntimeException("Unsupported expression: " +
expression);
+ }
+ return new Column(path.getFieldNames()[0]);
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19406
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: If1a46db488ee0f26aeea27069a4668665d1781dc
Gerrit-Change-Number: 19406
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-MessageType: newchange