>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20886?usp=email )
Change subject: [ASTERIXDB-3634][EXT]: Validate snapshot on creation and query ...................................................................... [ASTERIXDB-3634][EXT]: Validate snapshot on creation and query Ext-ref: MB-70437 Change-Id: Ib8bb436d966a5b801f677922c3809065167f7f87 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20886 Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java 9 files changed, 210 insertions(+), 69 deletions(-) Approvals: Jenkins: Verified; Verified Ali Alsuliman: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 0c5cd98..de65cb0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -69,7 +69,8 @@ import org.apache.asterix.app.result.fields.ResultHandlePrinter; import org.apache.asterix.app.result.fields.ResultsPrinter; import org.apache.asterix.app.result.fields.StatusPrinter; -import org.apache.asterix.app.translator.handlers.CatalogStatementHandler; +import org.apache.asterix.app.translator.handlers.IcebergCatalogStatementHandler; +import org.apache.asterix.app.translator.helpers.IcebergStatementValidationHelper; import org.apache.asterix.column.validation.ColumnPropertiesValidationUtil; import org.apache.asterix.column.validation.ColumnSupportedTypesValidator; import org.apache.asterix.common.api.IApplicationContext; @@ -117,7 +118,6 @@ import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.WriterValidationUtil; import org.apache.asterix.external.util.iceberg.IcebergConstants; -import org.apache.asterix.external.util.iceberg.IcebergUtils; import org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor; import org.apache.asterix.lang.common.base.Expression; import org.apache.asterix.lang.common.base.IQueryRewriter; @@ -199,7 +199,6 @@ import org.apache.asterix.metadata.dataset.hints.DatasetHints; import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint; import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Catalog; import org.apache.asterix.metadata.entities.CompactionPolicy; import org.apache.asterix.metadata.entities.Database; import org.apache.asterix.metadata.entities.Dataset; @@ -1074,7 +1073,7 @@ ExternalDataUtils.normalize(properties); ExternalDataUtils.validate(properties); ExternalDataUtils.validateType(properties, (ARecordType) itemType); - validateIfIcebergTable(properties, mdTxnCtx, sourceLoc); + validateIfIcebergTable(metadataProvider, properties, mdTxnCtx, sourceLoc); validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider); datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(), @@ -1187,20 +1186,9 @@ return Optional.of(dataset); } - protected void validateIfIcebergTable(Map<String, String> properties, MetadataTransactionContext mdTxnCtx, - SourceLocation srcLoc) throws AlgebricksException { - if (!IcebergUtils.isIcebergTable(properties)) { - return; - } - IcebergUtils.setDefaultFormat(properties); - IcebergUtils.validateIcebergTableProperties(properties); - - // ensure the specified catalog exists - String catalogName = properties.get(IcebergConstants.ICEBERG_CATALOG_NAME); - Catalog catalog = MetadataManager.INSTANCE.getCatalog(mdTxnCtx, catalogName); - if (catalog == null) { - throw new CompilationException(ErrorCode.UNKNOWN_CATALOG, srcLoc, catalogName); - } + protected void validateIfIcebergTable(MetadataProvider metadataProvider, Map<String, String> properties, + MetadataTransactionContext mdTxnCtx, SourceLocation srcLoc) throws AlgebricksException { + IcebergStatementValidationHelper.validateIfIcebergTable(appCtx, metadataProvider, mdTxnCtx, properties, srcLoc); } protected boolean isDatasetWithoutTypeSpec(DatasetDecl datasetDecl, ARecordType aRecordType, @@ -5897,7 +5885,7 @@ protected void handleCatalogStatement(Statement.Kind kind, MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { - CatalogStatementHandler statement = new CatalogStatementHandler(kind, metadataProvider, stmt, + IcebergCatalogStatementHandler statement = new IcebergCatalogStatementHandler(kind, metadataProvider, stmt, Creator.DEFAULT_CREATOR, sessionConfig, lockUtil, lockManager); statement.handle(); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java similarity index 98% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java index 3f88893..e0e9d02 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java @@ -50,7 +50,7 @@ import org.apache.asterix.translator.SessionConfig; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -public class CatalogStatementHandler { +public class IcebergCatalogStatementHandler { private final Statement.Kind kind; private final MetadataProvider metadataProvider; @@ -60,7 +60,7 @@ private final IMetadataLockUtil lockUtil; private final IMetadataLockManager lockManager; - public CatalogStatementHandler(Statement.Kind kind, MetadataProvider metadataProvider, Statement statement, + public IcebergCatalogStatementHandler(Statement.Kind kind, MetadataProvider metadataProvider, Statement statement, Creator creator, SessionConfig sessionConfig, IMetadataLockUtil lockUtil, IMetadataLockManager lockManager) { this.kind = kind; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java new file mode 100644 index 0000000..106c93d3 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.translator.helpers; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.util.iceberg.IcebergConstants; +import org.apache.asterix.external.util.iceberg.IcebergSnapshotUtils; +import org.apache.asterix.external.util.iceberg.IcebergUtils; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Catalog; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.exceptions.SourceLocation; + +public class IcebergStatementValidationHelper { + + public IcebergStatementValidationHelper() { + } + + public static void validateIfIcebergTable(ICcApplicationContext appCtx, MetadataProvider metadataProvider, + MetadataTransactionContext mdTxnCtx, Map<String, String> collectionProperties, SourceLocation srcLoc) + throws AlgebricksException { + validateIfIcebergTable(appCtx, metadataProvider, mdTxnCtx, collectionProperties, Collections.emptyMap(), + srcLoc); + } + + public static void validateIfIcebergTable(ICcApplicationContext appCtx, MetadataProvider metadataProvider, + MetadataTransactionContext mdTxnCtx, Map<String, String> collectionProperties, + Map<String, String> extraCollectionProperties, SourceLocation srcLoc) throws AlgebricksException { + if (!IcebergUtils.isIcebergTable(collectionProperties)) { + return; + } + IcebergUtils.setDefaultFormat(collectionProperties); + IcebergUtils.validateIcebergTableProperties(collectionProperties); + + // work on a copy of the properties from now onward to avoid modifying the original collection properties + Map<String, String> propertiesCopy = new HashMap<>(collectionProperties); + + // ensure the specified catalog exists + String catalogName = propertiesCopy.get(IcebergConstants.ICEBERG_CATALOG_NAME); + Catalog catalog = MetadataManager.INSTANCE.getCatalog(mdTxnCtx, catalogName); + if (catalog == null) { + throw new CompilationException(ErrorCode.UNKNOWN_CATALOG, srcLoc, catalogName); + } + + // validate snapshot exists if provided + propertiesCopy.putAll(extraCollectionProperties); + metadataProvider.addIcebergCatalogPropertiesIfNeeded(appCtx, propertiesCopy); + IcebergSnapshotUtils.validateSnapshotExists(propertiesCopy); + } +} diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index fe1e4ab..23a66ff 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -346,7 +346,7 @@ 1236 = The provided Iceberg table '%1$s' does not exist 1237 = The provided data format '%1$s' is not supported for Iceberg tables 1238 = Iceberg snapshot '%1$s' not found -1239 = Invalid Iceberg snapshot value: '%1$s' +1239 = Snapshot '%1$s' not found for table '%2$s'. 1240 = No valid credentials provided to access Biglake Metastore catalog. 1241 = Invalid `%1$s` "%2$s" for frame size=%3$s. value should be >= %4$s * frame size: `%1$s` "%5$s" in %6$s 1242 = Collection '%1$s' is not an Iceberg table external collection. diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java index fddf692..76309da 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java @@ -101,7 +101,7 @@ Snapshot snapshot = table.snapshot(snapshotId); if (snapshot == null) { // Snapshot might have been expired/GC'd between compile and runtime - throw CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshotId); + throw CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshotId, table.name()); } this.schemaAtSnapshot = table.schemas().get(snapshot.schemaId()); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java index aa1c52a..705a906 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java @@ -21,10 +21,8 @@ import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SCHEMA_ID_PROPERTY_KEY; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY; -import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY; +import static org.apache.asterix.external.util.iceberg.IcebergSnapshotUtils.snapshotIdExists; import static org.apache.asterix.external.util.iceberg.IcebergUtils.getProjectedFields; -import static org.apache.asterix.external.util.iceberg.IcebergUtils.snapshotIdExists; -import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.Serializable; import java.util.ArrayList; @@ -34,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.PriorityQueue; import java.util.Set; @@ -48,6 +47,7 @@ import org.apache.asterix.external.input.filter.IcebergTableFilterEvaluatorFactory; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.iceberg.IcebergConstants; +import org.apache.asterix.external.util.iceberg.IcebergSnapshotUtils; import org.apache.asterix.external.util.iceberg.IcebergUtils; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -65,7 +65,6 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.util.SnapshotUtil; public class IcebergParquetRecordReaderFactory implements IIcebergRecordReaderFactory<Record> { @@ -215,22 +214,11 @@ private TableScan setAndPinScanSnapshot(Map<String, String> configurationCopy, Table table, TableScan scan) throws CompilationException { long snapshot; - String snapshotIdStr = configurationCopy.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); - String asOfTimestampStr = configurationCopy.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY); - - if (snapshotIdStr != null) { - snapshot = Long.parseLong(snapshotIdStr); + Optional<Long> snapshotOptional = IcebergSnapshotUtils.getSnapshotId(configurationCopy, table); + if (snapshotOptional.isPresent()) { + snapshot = snapshotOptional.get(); if (!snapshotIdExists(table, snapshot)) { - throw CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshot); - } - } else if (asOfTimestampStr != null) { - try { - snapshot = SnapshotUtil.snapshotIdAsOfTime(table, Long.parseLong(asOfTimestampStr)); - if (!snapshotIdExists(table, snapshot)) { - throw CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshot); - } - } catch (IllegalArgumentException e) { - throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); + throw CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshot, table.name()); } } else { if (table.currentSnapshot() == null) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java new file mode 100644 index 0000000..41bd253 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.iceberg; + +import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; +import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; +import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY; +import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; + +import java.util.Map; +import java.util.Optional; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.util.SnapshotUtil; + +public class IcebergSnapshotUtils { + + public static Optional<Long> validateAndGetSnapshot(Map<String, String> properties) throws CompilationException { + String snapshotId = properties.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); + String snapshotTimestamp = properties.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY); + if (snapshotId != null && snapshotTimestamp != null) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, + ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY, ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); + } + + try { + if (snapshotId != null) { + return Optional.of(Long.parseLong(snapshotId)); + } else if (snapshotTimestamp != null) { + return Optional.of(Long.parseLong(snapshotTimestamp)); + } else { + return Optional.empty(); + } + } catch (NumberFormatException e) { + throw new CompilationException(ErrorCode.INVALID_ICEBERG_SNAPSHOT_VALUE, + snapshotId != null ? snapshotId : snapshotTimestamp); + } + } + + /** + * Returns the snapshot ID from the configuration. If a snapshot timestamp is provided instead, the snapshot ID + * for that timestamp is returned instead, otherwise, an error is thrown + * + * @param properties properties + * @param table table the snapshot belongs to + * @return snapshot id if exists + * @throws CompilationException CompilationException + */ + public static Optional<Long> getSnapshotId(Map<String, String> properties, Table table) + throws CompilationException { + Optional<Long> snapshotOptional = validateAndGetSnapshot(properties); + if (snapshotOptional.isEmpty()) { + return snapshotOptional; + } + + // if we have a snapshot timestamp, get the snapshot id for that timestamp and validate it instead + if (properties.containsKey(IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY)) { + try { + return Optional.of(SnapshotUtil.snapshotIdAsOfTime(table, snapshotOptional.get())); + } catch (IllegalArgumentException e) { + throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); + } + } + return snapshotOptional; + } + + public static void validateSnapshotExists(Map<String, String> catalogAndCollectionProperties) + throws AlgebricksException { + Optional<Long> snapshotOptional = validateAndGetSnapshot(catalogAndCollectionProperties); + if (snapshotOptional.isPresent()) { + String namespace = catalogAndCollectionProperties.get(IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY); + String tableName = catalogAndCollectionProperties.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY); + Map<String, String> catalogProperties = + IcebergUtils.filterCatalogProperties(catalogAndCollectionProperties); + Catalog icebergCatalog = IcebergUtils.initializeCatalog(catalogProperties, namespace, false); + + Namespace actualNamespace = Namespace.of(namespace); + TableIdentifier tableIdentifier = TableIdentifier.of(actualNamespace, tableName); + Table table = icebergCatalog.loadTable(tableIdentifier); + + Optional<Long> snapshotIdOptional = getSnapshotId(catalogAndCollectionProperties, table); + if (snapshotIdOptional.isPresent() && !snapshotIdExists(table, snapshotIdOptional.get())) { + throw CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshotIdOptional.get(), + tableIdentifier.toString()); + } + } + } + + public static boolean snapshotIdExists(Table table, long snapshot) { + return table.snapshot(snapshot) != null; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java index c9a4e5b..20b5781 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java @@ -18,15 +18,13 @@ */ package org.apache.asterix.external.util.iceberg; -import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; +import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; import static org.apache.asterix.common.exceptions.ErrorCode.UNSUPPORTED_ICEBERG_DATA_FORMAT; import static org.apache.asterix.external.util.aws.EnsureCloseClientsFactoryRegistry.FACTORY_INSTANCE_ID_KEY; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_AVRO_FORMAT; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_CATALOG_PROPERTY_PREFIX_INTERNAL; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_PARQUET_FORMAT; -import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY; -import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_TABLE_FORMAT; import static org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_WAREHOUSE_PROPERTY_KEY; @@ -53,7 +51,6 @@ import org.apache.asterix.external.util.iceberg.rest.RestUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.Table; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.catalog.Catalog; @@ -163,24 +160,7 @@ IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY); } - // validate snapshot id and timestamp - String snapshotId = properties.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); - String snapshotTimestamp = properties.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY); - if (snapshotId != null && snapshotTimestamp != null) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, - ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY, ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); - } - - try { - if (snapshotId != null) { - Long.parseLong(snapshotId); - } else if (snapshotTimestamp != null) { - Long.parseLong(snapshotTimestamp); - } - } catch (NumberFormatException e) { - throw new CompilationException(ErrorCode.INVALID_ICEBERG_SNAPSHOT_VALUE, - snapshotId != null ? snapshotId : snapshotTimestamp); - } + IcebergSnapshotUtils.validateAndGetSnapshot(properties); } /** @@ -355,10 +335,6 @@ } } - public static boolean snapshotIdExists(Table table, long snapshot) { - return table.snapshot(snapshot) != null; - } - public static String[] getProjectedFields(Map<String, String> configuration) throws IOException { String encoded = configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS); ARecordType projectedRecordType = ExternalDataUtils.getExpectedType(encoded); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 1d6d1a2..8963ed1 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -1023,7 +1023,7 @@ } } - protected void addIcebergCatalogPropertiesIfNeeded(ICcApplicationContext appCtx, Map<String, String> configuration) + public void addIcebergCatalogPropertiesIfNeeded(ICcApplicationContext appCtx, Map<String, String> configuration) throws AlgebricksException { if (IcebergUtils.isIcebergTable(configuration)) { String catalogName = configuration.get(IcebergConstants.ICEBERG_CATALOG_NAME); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20886?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Ib8bb436d966a5b801f677922c3809065167f7f87 Gerrit-Change-Number: 20886 Gerrit-PatchSet: 5 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
