>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20886?usp=email )


Change subject: [NO ISSUE][EXT]: Validate snapshot on creation and query
......................................................................

[NO ISSUE][EXT]: Validate snapshot on creation and query

Ext-ref: MB-70437
Change-Id: Ib8bb436d966a5b801f677922c3809065167f7f87
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
R 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
9 files changed, 217 insertions(+), 66 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/86/20886/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0c5cd98..de65cb0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -69,7 +69,8 @@
 import org.apache.asterix.app.result.fields.ResultHandlePrinter;
 import org.apache.asterix.app.result.fields.ResultsPrinter;
 import org.apache.asterix.app.result.fields.StatusPrinter;
-import org.apache.asterix.app.translator.handlers.CatalogStatementHandler;
+import 
org.apache.asterix.app.translator.handlers.IcebergCatalogStatementHandler;
+import 
org.apache.asterix.app.translator.helpers.IcebergStatementValidationHelper;
 import org.apache.asterix.column.validation.ColumnPropertiesValidationUtil;
 import org.apache.asterix.column.validation.ColumnSupportedTypesValidator;
 import org.apache.asterix.common.api.IApplicationContext;
@@ -117,7 +118,6 @@
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.WriterValidationUtil;
 import org.apache.asterix.external.util.iceberg.IcebergConstants;
-import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import 
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.IQueryRewriter;
@@ -199,7 +199,6 @@
 import org.apache.asterix.metadata.dataset.hints.DatasetHints;
 import 
org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
 import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Catalog;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
 import org.apache.asterix.metadata.entities.Database;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -1074,7 +1073,7 @@
                     ExternalDataUtils.normalize(properties);
                     ExternalDataUtils.validate(properties);
                     ExternalDataUtils.validateType(properties, (ARecordType) 
itemType);
-                    validateIfIcebergTable(properties, mdTxnCtx, sourceLoc);
+                    validateIfIcebergTable(metadataProvider, properties, 
mdTxnCtx, sourceLoc);
                     validateExternalDatasetProperties(externalDetails, 
properties, dd.getSourceLocation(), mdTxnCtx,
                             appCtx, metadataProvider);
                     datasetDetails = new 
ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
@@ -1187,20 +1186,9 @@
         return Optional.of(dataset);
     }

-    protected void validateIfIcebergTable(Map<String, String> properties, 
MetadataTransactionContext mdTxnCtx,
-            SourceLocation srcLoc) throws AlgebricksException {
-        if (!IcebergUtils.isIcebergTable(properties)) {
-            return;
-        }
-        IcebergUtils.setDefaultFormat(properties);
-        IcebergUtils.validateIcebergTableProperties(properties);
-
-        // ensure the specified catalog exists
-        String catalogName = 
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
-        Catalog catalog = MetadataManager.INSTANCE.getCatalog(mdTxnCtx, 
catalogName);
-        if (catalog == null) {
-            throw new CompilationException(ErrorCode.UNKNOWN_CATALOG, srcLoc, 
catalogName);
-        }
+    protected void validateIfIcebergTable(MetadataProvider metadataProvider, 
Map<String, String> properties,
+            MetadataTransactionContext mdTxnCtx, SourceLocation srcLoc) throws 
AlgebricksException {
+        IcebergStatementValidationHelper.validateIfIcebergTable(appCtx, 
metadataProvider, mdTxnCtx, properties, srcLoc);
     }
 
     protected boolean isDatasetWithoutTypeSpec(DatasetDecl datasetDecl, 
ARecordType aRecordType,
@@ -5897,7 +5885,7 @@

     protected void handleCatalogStatement(Statement.Kind kind, 
MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters 
requestParameters) throws Exception {
-        CatalogStatementHandler statement = new CatalogStatementHandler(kind, 
metadataProvider, stmt,
+        IcebergCatalogStatementHandler statement = new 
IcebergCatalogStatementHandler(kind, metadataProvider, stmt,
                 Creator.DEFAULT_CREATOR, sessionConfig, lockUtil, lockManager);
         statement.handle();
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java
similarity index 98%
rename from 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
rename to 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java
index 3f88893..e0e9d02 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java
@@ -50,7 +50,7 @@
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;

-public class CatalogStatementHandler {
+public class IcebergCatalogStatementHandler {

     private final Statement.Kind kind;
     private final MetadataProvider metadataProvider;
@@ -60,7 +60,7 @@
     private final IMetadataLockUtil lockUtil;
     private final IMetadataLockManager lockManager;

-    public CatalogStatementHandler(Statement.Kind kind, MetadataProvider 
metadataProvider, Statement statement,
+    public IcebergCatalogStatementHandler(Statement.Kind kind, 
MetadataProvider metadataProvider, Statement statement,
             Creator creator, SessionConfig sessionConfig, IMetadataLockUtil 
lockUtil,
             IMetadataLockManager lockManager) {
         this.kind = kind;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
new file mode 100644
index 0000000..106c93d3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.translator.helpers;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergSnapshotUtils;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Catalog;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class IcebergStatementValidationHelper {
+
+    public IcebergStatementValidationHelper() {
+    }
+
+    public static void validateIfIcebergTable(ICcApplicationContext appCtx, 
MetadataProvider metadataProvider,
+            MetadataTransactionContext mdTxnCtx, Map<String, String> 
collectionProperties, SourceLocation srcLoc)
+            throws AlgebricksException {
+        validateIfIcebergTable(appCtx, metadataProvider, mdTxnCtx, 
collectionProperties, Collections.emptyMap(),
+                srcLoc);
+    }
+
+    public static void validateIfIcebergTable(ICcApplicationContext appCtx, 
MetadataProvider metadataProvider,
+            MetadataTransactionContext mdTxnCtx, Map<String, String> 
collectionProperties,
+            Map<String, String> extraCollectionProperties, SourceLocation 
srcLoc) throws AlgebricksException {
+        if (!IcebergUtils.isIcebergTable(collectionProperties)) {
+            return;
+        }
+        IcebergUtils.setDefaultFormat(collectionProperties);
+        IcebergUtils.validateIcebergTableProperties(collectionProperties);
+
+        // work on a copy of the properties from now onward to avoid modifying 
the original collection properties
+        Map<String, String> propertiesCopy = new 
HashMap<>(collectionProperties);
+
+        // ensure the specified catalog exists
+        String catalogName = 
propertiesCopy.get(IcebergConstants.ICEBERG_CATALOG_NAME);
+        Catalog catalog = MetadataManager.INSTANCE.getCatalog(mdTxnCtx, 
catalogName);
+        if (catalog == null) {
+            throw new CompilationException(ErrorCode.UNKNOWN_CATALOG, srcLoc, 
catalogName);
+        }
+
+        // validate snapshot exists if provided
+        propertiesCopy.putAll(extraCollectionProperties);
+        metadataProvider.addIcebergCatalogPropertiesIfNeeded(appCtx, 
propertiesCopy);
+        IcebergSnapshotUtils.validateSnapshotExists(propertiesCopy);
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index dd8d358..623de17 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -348,6 +348,7 @@
     NO_VALID_CREDENTIALS_PROVIDED_FOR_BIGLAKE_METASTORE_CATALOG(1240),
     INVALID_FRAME_BASED_MEMORY_BUDGET(1241),
     COLLECTION_IS_NOT_AN_ICEBERG_TABLE_COLLECTION(1242),
+    UNKNOWN_ICEBERG_SNAPSHOT(1243),

     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index fe1e4ab..4199603 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -350,6 +350,7 @@
 1240 = No valid credentials provided to access Biglake Metastore catalog.
 1241 = Invalid `%1$s` "%2$s" for frame size=%3$s. value should be >= %4$s * 
frame size: `%1$s` "%5$s" in %6$s
 1242 = Collection '%1$s' is not an Iceberg table external collection.
+1243 = Snapshot '%1$s' not found for table '%2$s'.

 # Feed Errors
 3001 = Illegal state.
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
index aa1c52a..b03f83e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
@@ -21,10 +21,8 @@
 import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
 import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SCHEMA_ID_PROPERTY_KEY;
 import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY;
-import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY;
+import static 
org.apache.asterix.external.util.iceberg.IcebergSnapshotUtils.snapshotIdExists;
 import static 
org.apache.asterix.external.util.iceberg.IcebergUtils.getProjectedFields;
-import static 
org.apache.asterix.external.util.iceberg.IcebergUtils.snapshotIdExists;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

 import java.io.Serializable;
 import java.util.ArrayList;
@@ -34,6 +32,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.Set;

@@ -48,6 +47,7 @@
 import 
org.apache.asterix.external.input.filter.IcebergTableFilterEvaluatorFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergSnapshotUtils;
 import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -65,7 +65,6 @@
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.util.SnapshotUtil;

 public class IcebergParquetRecordReaderFactory implements 
IIcebergRecordReaderFactory<Record> {

@@ -215,23 +214,12 @@
     private TableScan setAndPinScanSnapshot(Map<String, String> 
configurationCopy, Table table, TableScan scan)
             throws CompilationException {
         long snapshot;
-        String snapshotIdStr = 
configurationCopy.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
-        String asOfTimestampStr = 
configurationCopy.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
-
-        if (snapshotIdStr != null) {
-            snapshot = Long.parseLong(snapshotIdStr);
+        Optional<Long> snapshotOptional = 
IcebergSnapshotUtils.getSnapshotId(configurationCopy, table);
+        if (snapshotOptional.isPresent()) {
+            snapshot = snapshotOptional.get();
             if (!snapshotIdExists(table, snapshot)) {
                 throw 
CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshot);
             }
-        } else if (asOfTimestampStr != null) {
-            try {
-                snapshot = SnapshotUtil.snapshotIdAsOfTime(table, 
Long.parseLong(asOfTimestampStr));
-                if (!snapshotIdExists(table, snapshot)) {
-                    throw 
CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshot);
-                }
-            } catch (IllegalArgumentException e) {
-                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e, 
getMessageOrToString(e));
-            }
         } else {
             if (table.currentSnapshot() == null) {
                 throw CompilationException.create(EXTERNAL_SOURCE_ERROR, 
"table " + table.name() + " has no snapshots");
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
new file mode 100644
index 0000000..5f2b5c5
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.iceberg;
+
+import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY;
+import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.util.SnapshotUtil;
+
+public class IcebergSnapshotUtils {
+
+    public static Optional<Long> validateAndGetSnapshot(Map<String, String> 
properties) throws CompilationException {
+        String snapshotId = properties.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+        String snapshotTimestamp = 
properties.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
+        if (snapshotId != null && snapshotTimestamp != null) {
+            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
+                    ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY, 
ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+        }
+
+        try {
+            if (snapshotId != null) {
+                return Optional.of(Long.parseLong(snapshotId));
+            } else if (snapshotTimestamp != null) {
+                return Optional.of(Long.parseLong(snapshotTimestamp));
+            } else {
+                return Optional.empty();
+            }
+        } catch (NumberFormatException e) {
+            throw new 
CompilationException(ErrorCode.INVALID_ICEBERG_SNAPSHOT_VALUE,
+                    snapshotId != null ? snapshotId : snapshotTimestamp);
+        }
+    }
+
+    /**
+     * Returns the snapshot ID from the configuration. If a snapshot timestamp 
is provided instead, the snapshot ID
+     * for that timestamp is returned instead, otherwise, an error is thrown
+     *
+     * @param properties properties
+     * @param table table the snapshot belongs to
+     * @return snapshot id if exists
+     * @throws CompilationException CompilationException
+     */
+    public static Optional<Long> getSnapshotId(Map<String, String> properties, 
Table table)
+            throws CompilationException {
+        Optional<Long> snapshotOptional = validateAndGetSnapshot(properties);
+        if (snapshotOptional.isEmpty()) {
+            return snapshotOptional;
+        }
+
+        // if we have a snapshot timestamp, get the snapshot id for that 
timestamp and validate it instead
+        if 
(properties.containsKey(IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY))
 {
+            try {
+                return Optional.of(SnapshotUtil.snapshotIdAsOfTime(table, 
snapshotOptional.get()));
+            } catch (IllegalArgumentException e) {
+                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e, 
getMessageOrToString(e));
+            }
+        }
+        return snapshotOptional;
+    }
+
+    public static void validateSnapshotExists(Map<String, String> 
catalogAndCollectionProperties)
+            throws AlgebricksException {
+        Optional<Long> snapshotOptional = 
validateAndGetSnapshot(catalogAndCollectionProperties);
+        if (snapshotOptional.isPresent()) {
+            String namespace = 
catalogAndCollectionProperties.get(IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY);
+            String tableName = 
catalogAndCollectionProperties.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+            Map<String, String> catalogProperties =
+                    
IcebergUtils.filterCatalogProperties(catalogAndCollectionProperties);
+            Catalog icebergCatalog = 
IcebergUtils.initializeCatalog(catalogProperties, namespace, false);
+
+            Namespace actualNamespace = Namespace.of(namespace);
+            TableIdentifier tableIdentifier = 
TableIdentifier.of(actualNamespace, tableName);
+            Table table = icebergCatalog.loadTable(tableIdentifier);
+
+            Optional<Long> optional = 
getSnapshotId(catalogAndCollectionProperties, table);
+            if (optional.isEmpty()) {
+                return;
+            }
+            validateSnapshotExists(icebergCatalog, tableIdentifier, 
optional.get());
+        }
+    }
+
+    public static void validateSnapshotExists(Catalog icebergCatalog, 
TableIdentifier tableIdentifier, long snapshot)
+            throws CompilationException {
+        Table table = icebergCatalog.loadTable(tableIdentifier);
+        if (!snapshotIdExists(table, snapshot)) {
+            throw 
CompilationException.create(ErrorCode.UNKNOWN_ICEBERG_SNAPSHOT, snapshot, 
tableIdentifier.toString());
+        }
+    }
+
+    public static boolean snapshotIdExists(Table table, long snapshot) {
+        return table.snapshot(snapshot) != null;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
index 20ce57b..b4dd935 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
@@ -18,15 +18,13 @@
  */
 package org.apache.asterix.external.util.iceberg;

-import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
 import static 
org.apache.asterix.common.exceptions.ErrorCode.UNSUPPORTED_ICEBERG_DATA_FORMAT;
 import static 
org.apache.asterix.external.util.aws.EnsureCloseClientsFactoryRegistry.FACTORY_INSTANCE_ID_KEY;
 import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_AVRO_FORMAT;
 import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_CATALOG_PROPERTY_PREFIX_INTERNAL;
 import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL;
 import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_PARQUET_FORMAT;
-import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY;
-import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY;
 import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_TABLE_FORMAT;
 import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_WAREHOUSE_PROPERTY_KEY;

@@ -53,7 +51,6 @@
 import org.apache.asterix.external.util.iceberg.rest.RestUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.Table;
 import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.aws.glue.GlueCatalog;
 import org.apache.iceberg.catalog.Catalog;
@@ -163,24 +160,7 @@
                     IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY);
         }

-        // validate snapshot id and timestamp
-        String snapshotId = properties.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
-        String snapshotTimestamp = 
properties.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
-        if (snapshotId != null && snapshotTimestamp != null) {
-            throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
-                    ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY, 
ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
-        }
-
-        try {
-            if (snapshotId != null) {
-                Long.parseLong(snapshotId);
-            } else if (snapshotTimestamp != null) {
-                Long.parseLong(snapshotTimestamp);
-            }
-        } catch (NumberFormatException e) {
-            throw new 
CompilationException(ErrorCode.INVALID_ICEBERG_SNAPSHOT_VALUE,
-                    snapshotId != null ? snapshotId : snapshotTimestamp);
-        }
+        IcebergSnapshotUtils.validateAndGetSnapshot(properties);
     }

     /**
@@ -353,10 +333,6 @@
         }
     }

-    public static boolean snapshotIdExists(Table table, long snapshot) {
-        return table.snapshot(snapshot) != null;
-    }
-
     public static String[] getProjectedFields(Map<String, String> 
configuration) throws IOException {
         String encoded = 
configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS);
         ARecordType projectedRecordType = 
ExternalDataUtils.getExpectedType(encoded);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 1d6d1a2..8963ed1 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1023,7 +1023,7 @@
         }
     }

-    protected void addIcebergCatalogPropertiesIfNeeded(ICcApplicationContext 
appCtx, Map<String, String> configuration)
+    public void addIcebergCatalogPropertiesIfNeeded(ICcApplicationContext 
appCtx, Map<String, String> configuration)
             throws AlgebricksException {
         if (IcebergUtils.isIcebergTable(configuration)) {
             String catalogName = 
configuration.get(IcebergConstants.ICEBERG_CATALOG_NAME);

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20886?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ib8bb436d966a5b801f677922c3809065167f7f87
Gerrit-Change-Number: 20886
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>

Reply via email to