>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20886?usp=email )
Change subject: [NO ISSUE][EXT]: Validate snapshot on creation and query
......................................................................
[NO ISSUE][EXT]: Validate snapshot on creation and query
Ext-ref: MB-70437
Change-Id: Ib8bb436d966a5b801f677922c3809065167f7f87
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
R
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
9 files changed, 217 insertions(+), 66 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/86/20886/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0c5cd98..de65cb0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -69,7 +69,8 @@
import org.apache.asterix.app.result.fields.ResultHandlePrinter;
import org.apache.asterix.app.result.fields.ResultsPrinter;
import org.apache.asterix.app.result.fields.StatusPrinter;
-import org.apache.asterix.app.translator.handlers.CatalogStatementHandler;
+import
org.apache.asterix.app.translator.handlers.IcebergCatalogStatementHandler;
+import
org.apache.asterix.app.translator.helpers.IcebergStatementValidationHelper;
import org.apache.asterix.column.validation.ColumnPropertiesValidationUtil;
import org.apache.asterix.column.validation.ColumnSupportedTypesValidator;
import org.apache.asterix.common.api.IApplicationContext;
@@ -117,7 +118,6 @@
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.WriterValidationUtil;
import org.apache.asterix.external.util.iceberg.IcebergConstants;
-import org.apache.asterix.external.util.iceberg.IcebergUtils;
import
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.IQueryRewriter;
@@ -199,7 +199,6 @@
import org.apache.asterix.metadata.dataset.hints.DatasetHints;
import
org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Catalog;
import org.apache.asterix.metadata.entities.CompactionPolicy;
import org.apache.asterix.metadata.entities.Database;
import org.apache.asterix.metadata.entities.Dataset;
@@ -1074,7 +1073,7 @@
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
ExternalDataUtils.validateType(properties, (ARecordType)
itemType);
- validateIfIcebergTable(properties, mdTxnCtx, sourceLoc);
+ validateIfIcebergTable(metadataProvider, properties,
mdTxnCtx, sourceLoc);
validateExternalDatasetProperties(externalDetails,
properties, dd.getSourceLocation(), mdTxnCtx,
appCtx, metadataProvider);
datasetDetails = new
ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
@@ -1187,20 +1186,9 @@
return Optional.of(dataset);
}
- protected void validateIfIcebergTable(Map<String, String> properties,
MetadataTransactionContext mdTxnCtx,
- SourceLocation srcLoc) throws AlgebricksException {
- if (!IcebergUtils.isIcebergTable(properties)) {
- return;
- }
- IcebergUtils.setDefaultFormat(properties);
- IcebergUtils.validateIcebergTableProperties(properties);
-
- // ensure the specified catalog exists
- String catalogName =
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
- Catalog catalog = MetadataManager.INSTANCE.getCatalog(mdTxnCtx,
catalogName);
- if (catalog == null) {
- throw new CompilationException(ErrorCode.UNKNOWN_CATALOG, srcLoc,
catalogName);
- }
+ protected void validateIfIcebergTable(MetadataProvider metadataProvider,
Map<String, String> properties,
+ MetadataTransactionContext mdTxnCtx, SourceLocation srcLoc) throws
AlgebricksException {
+ IcebergStatementValidationHelper.validateIfIcebergTable(appCtx,
metadataProvider, mdTxnCtx, properties, srcLoc);
}
protected boolean isDatasetWithoutTypeSpec(DatasetDecl datasetDecl,
ARecordType aRecordType,
@@ -5897,7 +5885,7 @@
protected void handleCatalogStatement(Statement.Kind kind,
MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IRequestParameters
requestParameters) throws Exception {
- CatalogStatementHandler statement = new CatalogStatementHandler(kind,
metadataProvider, stmt,
+ IcebergCatalogStatementHandler statement = new
IcebergCatalogStatementHandler(kind, metadataProvider, stmt,
Creator.DEFAULT_CREATOR, sessionConfig, lockUtil, lockManager);
statement.handle();
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java
similarity index 98%
rename from
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
rename to
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java
index 3f88893..e0e9d02 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/IcebergCatalogStatementHandler.java
@@ -50,7 +50,7 @@
import org.apache.asterix.translator.SessionConfig;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-public class CatalogStatementHandler {
+public class IcebergCatalogStatementHandler {
private final Statement.Kind kind;
private final MetadataProvider metadataProvider;
@@ -60,7 +60,7 @@
private final IMetadataLockUtil lockUtil;
private final IMetadataLockManager lockManager;
- public CatalogStatementHandler(Statement.Kind kind, MetadataProvider
metadataProvider, Statement statement,
+ public IcebergCatalogStatementHandler(Statement.Kind kind,
MetadataProvider metadataProvider, Statement statement,
Creator creator, SessionConfig sessionConfig, IMetadataLockUtil
lockUtil,
IMetadataLockManager lockManager) {
this.kind = kind;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
new file mode 100644
index 0000000..106c93d3
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/helpers/IcebergStatementValidationHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.translator.helpers;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergSnapshotUtils;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Catalog;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class IcebergStatementValidationHelper {
+
+ public IcebergStatementValidationHelper() {
+ }
+
+ public static void validateIfIcebergTable(ICcApplicationContext appCtx,
MetadataProvider metadataProvider,
+ MetadataTransactionContext mdTxnCtx, Map<String, String>
collectionProperties, SourceLocation srcLoc)
+ throws AlgebricksException {
+ validateIfIcebergTable(appCtx, metadataProvider, mdTxnCtx,
collectionProperties, Collections.emptyMap(),
+ srcLoc);
+ }
+
+ public static void validateIfIcebergTable(ICcApplicationContext appCtx,
MetadataProvider metadataProvider,
+ MetadataTransactionContext mdTxnCtx, Map<String, String>
collectionProperties,
+ Map<String, String> extraCollectionProperties, SourceLocation
srcLoc) throws AlgebricksException {
+ if (!IcebergUtils.isIcebergTable(collectionProperties)) {
+ return;
+ }
+ IcebergUtils.setDefaultFormat(collectionProperties);
+ IcebergUtils.validateIcebergTableProperties(collectionProperties);
+
+ // work on a copy of the properties from now onward to avoid modifying
the original collection properties
+ Map<String, String> propertiesCopy = new
HashMap<>(collectionProperties);
+
+ // ensure the specified catalog exists
+ String catalogName =
propertiesCopy.get(IcebergConstants.ICEBERG_CATALOG_NAME);
+ Catalog catalog = MetadataManager.INSTANCE.getCatalog(mdTxnCtx,
catalogName);
+ if (catalog == null) {
+ throw new CompilationException(ErrorCode.UNKNOWN_CATALOG, srcLoc,
catalogName);
+ }
+
+ // validate snapshot exists if provided
+ propertiesCopy.putAll(extraCollectionProperties);
+ metadataProvider.addIcebergCatalogPropertiesIfNeeded(appCtx,
propertiesCopy);
+ IcebergSnapshotUtils.validateSnapshotExists(propertiesCopy);
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index dd8d358..623de17 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -348,6 +348,7 @@
NO_VALID_CREDENTIALS_PROVIDED_FOR_BIGLAKE_METASTORE_CATALOG(1240),
INVALID_FRAME_BASED_MEMORY_BUDGET(1241),
COLLECTION_IS_NOT_AN_ICEBERG_TABLE_COLLECTION(1242),
+ UNKNOWN_ICEBERG_SNAPSHOT(1243),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index fe1e4ab..4199603 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -350,6 +350,7 @@
1240 = No valid credentials provided to access Biglake Metastore catalog.
1241 = Invalid `%1$s` "%2$s" for frame size=%3$s. value should be >= %4$s *
frame size: `%1$s` "%5$s" in %6$s
1242 = Collection '%1$s' is not an Iceberg table external collection.
+1243 = Snapshot '%1$s' not found for table '%2$s'.
# Feed Errors
3001 = Illegal state.
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
index aa1c52a..b03f83e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
@@ -21,10 +21,8 @@
import static
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SCHEMA_ID_PROPERTY_KEY;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY;
-import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY;
+import static
org.apache.asterix.external.util.iceberg.IcebergSnapshotUtils.snapshotIdExists;
import static
org.apache.asterix.external.util.iceberg.IcebergUtils.getProjectedFields;
-import static
org.apache.asterix.external.util.iceberg.IcebergUtils.snapshotIdExists;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.Serializable;
import java.util.ArrayList;
@@ -34,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
@@ -48,6 +47,7 @@
import
org.apache.asterix.external.input.filter.IcebergTableFilterEvaluatorFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergSnapshotUtils;
import org.apache.asterix.external.util.iceberg.IcebergUtils;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -65,7 +65,6 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.util.SnapshotUtil;
public class IcebergParquetRecordReaderFactory implements
IIcebergRecordReaderFactory<Record> {
@@ -215,23 +214,12 @@
private TableScan setAndPinScanSnapshot(Map<String, String>
configurationCopy, Table table, TableScan scan)
throws CompilationException {
long snapshot;
- String snapshotIdStr =
configurationCopy.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
- String asOfTimestampStr =
configurationCopy.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
-
- if (snapshotIdStr != null) {
- snapshot = Long.parseLong(snapshotIdStr);
+ Optional<Long> snapshotOptional =
IcebergSnapshotUtils.getSnapshotId(configurationCopy, table);
+ if (snapshotOptional.isPresent()) {
+ snapshot = snapshotOptional.get();
if (!snapshotIdExists(table, snapshot)) {
throw
CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshot);
}
- } else if (asOfTimestampStr != null) {
- try {
- snapshot = SnapshotUtil.snapshotIdAsOfTime(table,
Long.parseLong(asOfTimestampStr));
- if (!snapshotIdExists(table, snapshot)) {
- throw
CompilationException.create(ErrorCode.ICEBERG_SNAPSHOT_ID_NOT_FOUND, snapshot);
- }
- } catch (IllegalArgumentException e) {
- throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e,
getMessageOrToString(e));
- }
} else {
if (table.currentSnapshot() == null) {
throw CompilationException.create(EXTERNAL_SOURCE_ERROR,
"table " + table.name() + " has no snapshots");
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
new file mode 100644
index 0000000..5f2b5c5
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.iceberg;
+
+import static
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY;
+import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.util.SnapshotUtil;
+
+public class IcebergSnapshotUtils {
+
+ public static Optional<Long> validateAndGetSnapshot(Map<String, String>
properties) throws CompilationException {
+ String snapshotId = properties.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+ String snapshotTimestamp =
properties.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
+ if (snapshotId != null && snapshotTimestamp != null) {
+ throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
+ ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY,
ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+ }
+
+ try {
+ if (snapshotId != null) {
+ return Optional.of(Long.parseLong(snapshotId));
+ } else if (snapshotTimestamp != null) {
+ return Optional.of(Long.parseLong(snapshotTimestamp));
+ } else {
+ return Optional.empty();
+ }
+ } catch (NumberFormatException e) {
+ throw new
CompilationException(ErrorCode.INVALID_ICEBERG_SNAPSHOT_VALUE,
+ snapshotId != null ? snapshotId : snapshotTimestamp);
+ }
+ }
+
+ /**
+ * Returns the snapshot ID from the configuration. If a snapshot timestamp
is provided instead, the snapshot ID
+ * for that timestamp is returned instead, otherwise, an error is thrown
+ *
+ * @param properties properties
+ * @param table table the snapshot belongs to
+ * @return snapshot id if exists
+ * @throws CompilationException CompilationException
+ */
+ public static Optional<Long> getSnapshotId(Map<String, String> properties,
Table table)
+ throws CompilationException {
+ Optional<Long> snapshotOptional = validateAndGetSnapshot(properties);
+ if (snapshotOptional.isEmpty()) {
+ return snapshotOptional;
+ }
+
+ // if we have a snapshot timestamp, get the snapshot id for that
timestamp and validate it instead
+ if
(properties.containsKey(IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY))
{
+ try {
+ return Optional.of(SnapshotUtil.snapshotIdAsOfTime(table,
snapshotOptional.get()));
+ } catch (IllegalArgumentException e) {
+ throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e,
getMessageOrToString(e));
+ }
+ }
+ return snapshotOptional;
+ }
+
+ public static void validateSnapshotExists(Map<String, String>
catalogAndCollectionProperties)
+ throws AlgebricksException {
+ Optional<Long> snapshotOptional =
validateAndGetSnapshot(catalogAndCollectionProperties);
+ if (snapshotOptional.isPresent()) {
+ String namespace =
catalogAndCollectionProperties.get(IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY);
+ String tableName =
catalogAndCollectionProperties.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+ Map<String, String> catalogProperties =
+
IcebergUtils.filterCatalogProperties(catalogAndCollectionProperties);
+ Catalog icebergCatalog =
IcebergUtils.initializeCatalog(catalogProperties, namespace, false);
+
+ Namespace actualNamespace = Namespace.of(namespace);
+ TableIdentifier tableIdentifier =
TableIdentifier.of(actualNamespace, tableName);
+ Table table = icebergCatalog.loadTable(tableIdentifier);
+
+ Optional<Long> optional =
getSnapshotId(catalogAndCollectionProperties, table);
+ if (optional.isEmpty()) {
+ return;
+ }
+ validateSnapshotExists(icebergCatalog, tableIdentifier,
optional.get());
+ }
+ }
+
+ public static void validateSnapshotExists(Catalog icebergCatalog,
TableIdentifier tableIdentifier, long snapshot)
+ throws CompilationException {
+ Table table = icebergCatalog.loadTable(tableIdentifier);
+ if (!snapshotIdExists(table, snapshot)) {
+ throw
CompilationException.create(ErrorCode.UNKNOWN_ICEBERG_SNAPSHOT, snapshot,
tableIdentifier.toString());
+ }
+ }
+
+ public static boolean snapshotIdExists(Table table, long snapshot) {
+ return table.snapshot(snapshot) != null;
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
index 20ce57b..b4dd935 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
@@ -18,15 +18,13 @@
*/
package org.apache.asterix.external.util.iceberg;
-import static
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
import static
org.apache.asterix.common.exceptions.ErrorCode.UNSUPPORTED_ICEBERG_DATA_FORMAT;
import static
org.apache.asterix.external.util.aws.EnsureCloseClientsFactoryRegistry.FACTORY_INSTANCE_ID_KEY;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_AVRO_FORMAT;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_CATALOG_PROPERTY_PREFIX_INTERNAL;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_COLLECTION_PROPERTY_PREFIX_INTERNAL;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_PARQUET_FORMAT;
-import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY;
-import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_TABLE_FORMAT;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_WAREHOUSE_PROPERTY_KEY;
@@ -53,7 +51,6 @@
import org.apache.asterix.external.util.iceberg.rest.RestUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.Table;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.Catalog;
@@ -163,24 +160,7 @@
IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY);
}
- // validate snapshot id and timestamp
- String snapshotId = properties.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
- String snapshotTimestamp =
properties.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
- if (snapshotId != null && snapshotTimestamp != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
- ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY,
ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
- }
-
- try {
- if (snapshotId != null) {
- Long.parseLong(snapshotId);
- } else if (snapshotTimestamp != null) {
- Long.parseLong(snapshotTimestamp);
- }
- } catch (NumberFormatException e) {
- throw new
CompilationException(ErrorCode.INVALID_ICEBERG_SNAPSHOT_VALUE,
- snapshotId != null ? snapshotId : snapshotTimestamp);
- }
+ IcebergSnapshotUtils.validateAndGetSnapshot(properties);
}
/**
@@ -353,10 +333,6 @@
}
}
- public static boolean snapshotIdExists(Table table, long snapshot) {
- return table.snapshot(snapshot) != null;
- }
-
public static String[] getProjectedFields(Map<String, String>
configuration) throws IOException {
String encoded =
configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS);
ARecordType projectedRecordType =
ExternalDataUtils.getExpectedType(encoded);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 1d6d1a2..8963ed1 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1023,7 +1023,7 @@
}
}
- protected void addIcebergCatalogPropertiesIfNeeded(ICcApplicationContext
appCtx, Map<String, String> configuration)
+ public void addIcebergCatalogPropertiesIfNeeded(ICcApplicationContext
appCtx, Map<String, String> configuration)
throws AlgebricksException {
if (IcebergUtils.isIcebergTable(configuration)) {
String catalogName =
configuration.get(IcebergConstants.ICEBERG_CATALOG_NAME);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20886?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ib8bb436d966a5b801f677922c3809065167f7f87
Gerrit-Change-Number: 20886
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>