>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20687?usp=email )
Change subject: [ASTERIXDB-3634][EXT] Add Iceberg tables partition pruning support ...................................................................... [ASTERIXDB-3634][EXT] Add Iceberg tables partition pruning support Details: - Add partition and row group filter pushdown Ext-ref: MB-63115 Change-Id: If77144159c351dab051a6aebc2a2dea779cd8b50 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20687 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java A asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/IcebergTableFilterBuilder.java 3 files changed, 213 insertions(+), 5 deletions(-) Approvals: Jenkins: Verified; Verified Anon. E. Moose #1000171: Hussain Towaileb: Looks good to me, approved diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java index 6c37242..0d31962 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java @@ -44,6 +44,7 @@ import org.apache.asterix.external.api.IExternalDataRuntimeContext; import org.apache.asterix.external.api.IIcebergRecordReaderFactory; import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.external.input.filter.IcebergTableFilterEvaluatorFactory; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.iceberg.IcebergConstants; import org.apache.asterix.external.util.iceberg.IcebergUtils; @@ -61,6 +62,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.util.SnapshotUtil; @@ -146,7 +148,12 @@ if (projectedFields != null && projectedFields.length > 0) { projectedSchema = projectedSchema.select(projectedFields); } - scan = scan.project(projectedSchema); + scan.project(projectedSchema); + Expression filterExpression = + ((IcebergTableFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression(); + if (filterExpression != null) { + scan = scan.filter(filterExpression); + } try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) { tasks.forEach(fileScanTasks::add); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index 37cc9d8..6d99f94 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -57,6 +57,7 @@ import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder; import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder; import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder; +import org.apache.asterix.metadata.utils.filter.IcebergTableFilterBuilder; import org.apache.asterix.metadata.utils.filter.ParquetFilterBuilder; import org.apache.asterix.om.base.AString; import org.apache.asterix.om.base.IAObject; @@ -383,10 +384,9 @@ if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) { return NoOpIcebergTableFilterEvaluatorFactory.INSTANCE; } else { - // IcebergTableFilterBuilder builder = new IcebergTableFilterBuilder( - // (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv); - // return builder.build(); - return NoOpIcebergTableFilterEvaluatorFactory.INSTANCE; + IcebergTableFilterBuilder builder = new IcebergTableFilterBuilder( + (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv); + return builder.build(); } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/IcebergTableFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/IcebergTableFilterBuilder.java new file mode 100644 index 0000000..94ec455 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/IcebergTableFilterBuilder.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils.filter; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; +import org.apache.asterix.external.input.filter.IcebergTableFilterEvaluatorFactory; +import org.apache.asterix.om.base.ADate; +import org.apache.asterix.om.base.ADateTime; +import org.apache.asterix.om.base.ADouble; +import org.apache.asterix.om.base.AInt16; +import org.apache.asterix.om.base.AInt32; +import org.apache.asterix.om.base.AInt64; +import org.apache.asterix.om.base.AInt8; +import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.constants.AsterixConstantValue; +import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class IcebergTableFilterBuilder extends AbstractFilterBuilder { + + private static final Logger LOGGER = LogManager.getLogger(); + + public IcebergTableFilterBuilder(ExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo, + JobGenContext context, IVariableTypeEnvironment typeEnv) { + super(projectionFiltrationInfo.getFilterPaths(), projectionFiltrationInfo.getFilterExpression(), context, + typeEnv); + } + + public IExternalFilterEvaluatorFactory build() throws AlgebricksException { + Object icebergTablePredicate = null; + if (filterExpression != null) { + try { + icebergTablePredicate = createExpression(filterExpression); + } catch (Exception e) { + LOGGER.error("Error creating IcebergTable filter expression, skipping filter pushdown", e); + } + } + return new IcebergTableFilterEvaluatorFactory((Expression) icebergTablePredicate); + } + + protected Object createExpression(ILogicalExpression expression) throws AlgebricksException { + if (filterPaths.containsKey(expression)) { + // Path expression, create a value accessor (i.e., a column reader) + return createColumnExpression(expression); + } else if (expression.getExpressionTag() == LogicalExpressionTag.CONSTANT) { + return createLiteralExpression(expression); + } else if (expression.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { + return handleFunction(expression); + } + + /* + * A variable expression: This should not happen as the provided filter expression is inlined. + * If a variable was encountered for some reason, it should only be the record variable. If the record variable + * was encountered, that means there's a missing value path the compiler didn't provide. + */ + throw new RuntimeException("Unsupported expression " + expression + ". the provided paths are: " + filterPaths); + } + + private Object createLiteralExpression(ILogicalExpression expression) throws AlgebricksException { + ConstantExpression constExpr = (ConstantExpression) expression; + if (constExpr.getValue().isNull() || constExpr.getValue().isMissing()) { + throw new RuntimeException("Unsupported literal type: " + constExpr.getValue()); + } + AsterixConstantValue constantValue = (AsterixConstantValue) constExpr.getValue(); + switch (constantValue.getObject().getType().getTypeTag()) { + case STRING: + return ((AString) constantValue.getObject()).getStringValue(); + case TINYINT: + return ((AInt8) constantValue.getObject()).getByteValue(); + case SMALLINT: + return ((AInt16) constantValue.getObject()).getShortValue(); + case INTEGER: + return ((AInt32) constantValue.getObject()).getIntegerValue(); + case BOOLEAN: + return constantValue.isTrue(); + case BIGINT: + return ((AInt64) constantValue.getObject()).getLongValue(); + case DOUBLE: + return ((ADouble) constantValue.getObject()).getDoubleValue(); + case DATE: + return ((ADate) constantValue.getObject()).getChrononTimeInDays(); + case DATETIME: + Long millis = ((ADateTime) constantValue.getObject()).getChrononTime(); + return TimeUnit.MILLISECONDS.toMicros(millis); + default: + throw new RuntimeException("Unsupported literal type: " + constantValue.getObject().getType()); + } + } + + @Override + protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression expression) { + return null; + } + + private Expression handleFunction(ILogicalExpression expr) throws AlgebricksException { + AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr; + IFunctionDescriptor fd = resolveFunction(funcExpr); + FunctionIdentifier fid = fd.getIdentifier(); + if (funcExpr.getArguments().size() != 2 + && !(fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR))) { + throw new RuntimeException("Predicate should only have 2 arguments: " + funcExpr); + } + List<Object> args = handleArgs(funcExpr); + if (fid.equals(AlgebricksBuiltinFunctions.AND)) { + return Expressions.and((Expression) args.get(0), (Expression) args.get(1)); + } else if (fid.equals(AlgebricksBuiltinFunctions.OR)) { + return Expressions.or((Expression) args.get(0), (Expression) args.get(1)); + } else if (fid.equals(AlgebricksBuiltinFunctions.EQ)) { + return Expressions.equal((String) args.get(0), args.get(1)); + } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) { + return Expressions.greaterThanOrEqual((String) args.get(0), args.get(1)); + } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) { + return Expressions.greaterThan((String) args.get(0), args.get(1)); + } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) { + return Expressions.lessThanOrEqual((String) args.get(0), args.get(1)); + } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) { + return Expressions.lessThan((String) args.get(0), args.get(1)); + } else { + throw new RuntimeException("Unsupported function: " + funcExpr); + } + } + + private List<Object> handleArgs(AbstractFunctionCallExpression funcExpr) throws AlgebricksException { + List<Mutable<ILogicalExpression>> args = funcExpr.getArguments(); + List<Object> argsExpressions = new ArrayList<>(); + for (int i = 0; i < args.size(); i++) { + ILogicalExpression expr = args.get(i).getValue(); + Object evalFactory = createExpression(expr); + argsExpressions.add(evalFactory); + } + return argsExpressions; + } + + protected Object createColumnExpression(ILogicalExpression expression) { + ARecordType path = filterPaths.get(expression); + if (path.getFieldNames().length != 1) { + throw new RuntimeException("Unsupported column expression: " + expression); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) { + // The field could be a nested field + List<String> fieldList = new ArrayList<>(); + fieldList = createPathExpression(path, fieldList); + return String.join(".", fieldList); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) { + return path.getFieldNames()[0]; + } else { + throw new RuntimeException("Unsupported column expression: " + expression); + } + } + + private List<String> createPathExpression(ARecordType path, List<String> fieldList) { + if (path.getFieldNames().length != 1) { + throw new RuntimeException("Error creating column expression"); + } else { + fieldList.add(path.getFieldNames()[0]); + } + if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) { + return createPathExpression((ARecordType) path.getFieldTypes()[0], fieldList); + } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) { + return fieldList; + } else { + throw new RuntimeException("Error creating column expression"); + } + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20687?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: If77144159c351dab051a6aebc2a2dea779cd8b50 Gerrit-Change-Number: 20687 Gerrit-PatchSet: 7 Gerrit-Owner: Peeyush Gupta <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]>
