>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19365 )
Change subject: [ASTERIXDB-3514][EXT]: Pass entity-id for all creds required operations ...................................................................... [ASTERIXDB-3514][EXT]: Pass entity-id for all creds required operations - user model changes: no - storage format changes: no - interface changes: yes Details: - Pass entity-id to uniquely identify the cached credentials for all external operations. Ext-ref: MB-63505 Change-Id: If5bd9ed7c1fc0aedba7f7bb90acb48c2edf3d580 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19365 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java 12 files changed, 199 insertions(+), 192 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Hussain Towaileb: Looks good to me, but someone else must approve Jenkins: Verified; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java index 8cb3a47..ed26f7c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java @@ -24,14 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.asterix.common.api.IApplicationContext; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.external.IExternalCredentialsCache; -import org.apache.asterix.common.metadata.DatasetFullyQualifiedName; -import org.apache.asterix.common.metadata.DataverseName; -import org.apache.asterix.common.metadata.IFullyQualifiedName; -import org.apache.asterix.common.metadata.MetadataConstants; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.aws.s3.S3Constants; import org.apache.commons.lang3.tuple.Pair; @@ -46,41 +39,35 @@ private static final Logger LOGGER = LogManager.getLogger(); private final ConcurrentMap<String, Pair<Span, Object>> cache = new ConcurrentHashMap<>(); private final int awsAssumeRoleDuration; - private final double refreshAwsAssumeRoleThreshold; + private final int refreshAwsAssumeRoleThresholdPercentage; public ExternalCredentialsCache(IApplicationContext appCtx) { this.awsAssumeRoleDuration = appCtx.getExternalProperties().getAwsAssumeRoleDuration(); - this.refreshAwsAssumeRoleThreshold = appCtx.getExternalProperties().getAwsRefreshAssumeRoleThreshold(); + this.refreshAwsAssumeRoleThresholdPercentage = + appCtx.getExternalProperties().getAwsRefreshAssumeRoleThresholdPercentage(); } @Override - public synchronized Object getCredentials(Map<String, String> configuration) throws CompilationException { - IFullyQualifiedName fqn = getFullyQualifiedNameFromConfiguration(configuration); - return getCredentials(fqn); - } - - @Override - public synchronized Object getCredentials(IFullyQualifiedName fqn) { - String name = getName(fqn); - if (cache.containsKey(name) && !needsRefresh(cache.get(name).getLeft())) { - return cache.get(name).getRight(); + public synchronized Object get(String key) { + invalidateCache(); + if (cache.containsKey(key)) { + return cache.get(key).getRight(); } return null; } @Override - public synchronized void updateCache(Map<String, String> configuration, Map<String, String> credentials) - throws CompilationException { - IFullyQualifiedName fqn = getFullyQualifiedNameFromConfiguration(configuration); - String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE); - updateCache(fqn, type, credentials); + public void delete(String key) { + Object removed = cache.remove(key); + if (removed != null) { + LOGGER.info("Removed cached credentials for {} because it got deleted", key); + } } @Override - public synchronized void updateCache(IFullyQualifiedName fqn, String type, Map<String, String> credentials) { - String name = getName(fqn); + public synchronized void put(String key, String type, Map<String, String> credentials) { if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) { - updateAwsCache(name, credentials); + updateAwsCache(key, credentials); } } @@ -88,34 +75,23 @@ String accessKeyId = credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME); String secretAccessKey = credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME); String sessionToken = credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME); - doUpdateAwsCache(name, AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken)); - } - private void doUpdateAwsCache(String name, AwsSessionCredentials credentials) { - cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), credentials)); + AwsSessionCredentials sessionCreds = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken); + cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), sessionCreds)); LOGGER.info("Received and cached new credentials for {}", name); } - @Override - public void deleteCredentials(IFullyQualifiedName fqn) { - String name = getName(fqn); - Object removed = cache.remove(name); - if (removed != null) { - LOGGER.info("Removed cached credentials for {}", name); - } else { - LOGGER.info("No cached credentials found for {}, nothing to remove", name); - } - } - - @Override - public String getName(Map<String, String> configuration) throws CompilationException { - IFullyQualifiedName fqn = getFullyQualifiedNameFromConfiguration(configuration); - return getName(fqn); - } - - @Override - public String getName(IFullyQualifiedName fqn) { - return fqn.toString(); + /** + * Iterates the cache and removes the credentials that are considered expired + */ + private void invalidateCache() { + cache.entrySet().removeIf(entry -> { + boolean shouldRemove = needsRefresh(entry.getValue().getLeft()); + if (shouldRemove) { + LOGGER.info("Removing cached credentials for {} because it expired", entry.getKey()); + } + return shouldRemove; + }); } /** @@ -125,27 +101,9 @@ * @return true if the remaining time is less than the configured refresh percentage, false otherwise */ private boolean needsRefresh(Span span) { - return (double) span.remaining(TimeUnit.SECONDS) - / span.getSpan(TimeUnit.SECONDS) < refreshAwsAssumeRoleThreshold; - } - - protected IFullyQualifiedName getFullyQualifiedNameFromConfiguration(Map<String, String> configuration) - throws CompilationException { - String database = configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE); - if (database == null) { - database = MetadataConstants.DEFAULT_DATABASE; - } - String stringDataverse = configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE); - DataverseName dataverse = getDataverseName(stringDataverse); - String dataset = configuration.get(ExternalDataConstants.KEY_DATASET); - return new DatasetFullyQualifiedName(database, dataverse, dataset); - } - - protected DataverseName getDataverseName(String dataverse) throws CompilationException { - try { - return DataverseName.createSinglePartName(dataverse); - } catch (AsterixException ex) { - throw new CompilationException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, dataverse); - } + double remaining = (double) span.remaining(TimeUnit.SECONDS) / span.getSpan(TimeUnit.SECONDS); + double passed = 1 - remaining; + int passedPercentage = (int) (passed * 100); + return passedPercentage > refreshAwsAssumeRoleThresholdPercentage; } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java index f07caaa..29e3239 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java @@ -40,6 +40,7 @@ import org.apache.asterix.common.external.IExternalCredentialsCache; import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater; import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.aws.s3.S3AuthUtils; import org.apache.asterix.external.util.aws.s3.S3Constants; import org.apache.asterix.messaging.CCMessageBroker; @@ -65,7 +66,8 @@ public synchronized Object generateAndCacheCredentials(Map<String, String> configuration) throws HyracksDataException, CompilationException { IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache(); - Object credentials = cache.getCredentials(configuration); + String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID); + Object credentials = cache.get(name); if (credentials != null) { return credentials; } @@ -74,9 +76,7 @@ * if we are the CC, generate new creds and ask all NCs to update their cache * if we are the NC, send a message to the CC to generate new creds and ask all NCs to update their cache */ - String name = cache.getName(configuration); - if (appCtx instanceof ICcApplicationContext) { - ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx; + if (appCtx instanceof ICcApplicationContext ccAppCtx) { IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState(); if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) { throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state); @@ -106,7 +106,8 @@ // request all NCs to update their credentials cache with the latest creds updateNcsCredentialsCache(ccAppCtx, name, credentialsMap, configuration); - cache.updateCache(configuration, credentialsMap); + String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE); + cache.put(name, type, credentialsMap); credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken); } else { NCMessageBroker broker = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java index d1a9ebf..438e425 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java @@ -21,9 +21,8 @@ import java.util.Map; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.messaging.api.INcAddressedMessage; -import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -40,13 +39,10 @@ } @Override - public void handle(INcApplicationContext appCtx) throws HyracksDataException { - try { - appCtx.getExternalCredentialsCache().updateCache(configuration, credentials); - } catch (CompilationException ex) { - LOGGER.info("Failed to process request", ex); - throw HyracksDataException.create(ex); - } + public void handle(INcApplicationContext appCtx) { + String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID); + String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE); + appCtx.getExternalCredentialsCache().put(name, type, credentials); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 51371da..97bc3cd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -274,6 +274,7 @@ import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.result.IResultSet; import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; @@ -578,11 +579,17 @@ // async queries are completed after their job completes if (ResultDelivery.ASYNC != resultDelivery) { appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid()); + postRequestCompleteCleanup(requestParameters); } Thread.currentThread().setName(threadName); } } + protected void postRequestCompleteCleanup(IRequestParameters requestParameters) { + String uuid = requestParameters.getRequestReference().getUuid(); + appCtx.getExternalCredentialsCache().delete(uuid); + } + protected void configureMetadataProvider(MetadataProvider metadataProvider, Map<String, String> config, Counter resultSetIdCounter, FileSplit outputFile, IRequestParameters requestParameters, Statement statement) { @@ -1031,8 +1038,12 @@ ExternalDataUtils.normalize(properties); ExternalDataUtils.validate(properties); ExternalDataUtils.validateType(properties, (ARecordType) itemType); - validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx, - appCtx, metadataProvider); + Map<String, String> propertiesCopy = preparePropertiesCopyForValidation(externalDetails, properties, + dd.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider); + // do any necessary validation on the copy to avoid changing the original and storing it in the metadata + metadataProvider.setExternalEntityIdFromParts(propertiesCopy, databaseName, dataverseName, + datasetName, false); + validateAdapterSpecificProperties(propertiesCopy, dd.getSourceLocation(), appCtx); datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(), TransactionState.COMMIT); break; @@ -2448,9 +2459,7 @@ sourceLoc, EnumSet.of(DropOption.IF_EXISTS), requestParameters.isForceDropDataset()); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); - if (ds.getDatasetType().equals(DatasetType.EXTERNAL)) { - appCtx.getExternalCredentialsCache().deleteCredentials(ds.getDatasetFullyQualifiedName()); - } + deleteDatasetCachedCredentials(ds); return true; } catch (Exception e) { LOGGER.error("failed to drop dataset; executing compensating operations", e); @@ -2498,6 +2507,10 @@ } } + protected void deleteDatasetCachedCredentials(Dataset dataset) throws CompilationException { + appCtx.getExternalCredentialsCache().delete(dataset.getDatasetFullyQualifiedName().toString()); + } + protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt; @@ -3637,7 +3650,7 @@ protected CreateResult doCreateLibrary(MetadataProvider metadataProvider, String databaseName, DataverseName dataverseName, String libraryName, String libraryHash, CreateLibraryStatement cls, IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { - JobUtils.ProgressState progress = ProgressState.NO_PROGRESS; + ProgressState progress = ProgressState.NO_PROGRESS; boolean prepareJobSuccessful = false; JobSpecification abortJobSpec = null; Library existingLibrary = null; @@ -3773,7 +3786,7 @@ protected boolean doDropLibrary(MetadataProvider metadataProvider, LibraryDropStatement stmtDropLibrary, String databaseName, DataverseName dataverseName, String libraryName, IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { - JobUtils.ProgressState progress = ProgressState.NO_PROGRESS; + ProgressState progress = ProgressState.NO_PROGRESS; MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); @@ -4044,8 +4057,11 @@ copyStmt, itemType, mdTxnCtx, metadataProvider); ExternalDataUtils.normalize(properties); ExternalDataUtils.validate(properties); - validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx, - appCtx, metadataProvider); + Map<String, String> propertiesCopy = preparePropertiesCopyForValidation(externalDetails, properties, + copyStmt.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider); + // do any necessary validation on the copy to avoid changing the original and storing it in the metadata + metadataProvider.setExternalEntityId(propertiesCopy, dataset); + validateAdapterSpecificProperties(propertiesCopy, copyStmt.getSourceLocation(), appCtx); CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(databaseName, dataverseName, copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties); cls.setSourceLocation(stmt.getSourceLocation()); @@ -4102,6 +4118,9 @@ ResultMetadata outMetadata, IRequestParameters requestParameters, Map<String, IAObject> stmtParams, Stats stats) throws Exception { CopyToStatement copyTo = (CopyToStatement) stmt; + Namespace namespace = copyTo.getNamespace(); + DataverseName dataverseName = namespace.getDataverseName(); + String databaseName = namespace.getDatabaseName(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); @@ -4130,16 +4149,21 @@ metadataProvider.setMetadataTxnContext(mdTxnCtx); try { ExternalDetailsDecl edd = copyTo.getExternalDetailsDecl(); - edd.setProperties(createAndValidateAdapterConfigurationForCopyToStmt(edd, + Map<String, String> properties = createAndValidateAdapterConfigurationForCopyToStmt(edd, ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx, - metadataProvider)); + metadataProvider); + + // request id is used to cache credentials if needed, and clear them after request is done + String uuid = requestParameters.getRequestReference().getUuid(); + metadataProvider.setExternalEntityIdFromParts(properties, databaseName, dataverseName, uuid, true); + edd.setProperties(properties); if (ExternalDataConstants.FORMAT_PARQUET .equalsIgnoreCase(edd.getProperties().get(ExternalDataConstants.KEY_FORMAT))) { if (copyTo.getType() != null) { - DataverseName dataverseName = + DataverseName dummyDataverse = DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME); - IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName, + IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse, ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx); edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY, SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType)); @@ -4148,14 +4172,14 @@ if (edd.getProperties().get(ExternalDataConstants.KEY_FORMAT) .equalsIgnoreCase(ExternalDataConstants.FORMAT_CSV_LOWER_CASE)) { - DataverseName dataverseName = + DataverseName dummyDataverse = DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME); IAType iaType; if (copyTo.getType() != null) { - iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName, + iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse, ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx); } else if (copyTo.getTypeExpressionItemType() != null) { - iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName, + iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse, ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getTypeExpressionItemType(), mdTxnCtx); } else { throw new CompilationException(ErrorCode.COMPILATION_ERROR, @@ -5552,7 +5576,7 @@ if (atomic && jobId != null) { globalTxManager.abortTransaction(jobId); } - if (org.apache.hyracks.api.util.ExceptionUtils.getRootCause(e) instanceof InterruptedException) { + if (ExceptionUtils.getRootCause(e) instanceof InterruptedException) { Thread.currentThread().interrupt(); throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId()); } @@ -5561,6 +5585,7 @@ // complete async jobs after their job completes if (ResultDelivery.ASYNC == resultDelivery) { requestTracker.complete(clientRequest.getId()); + postRequestCompleteCleanup(requestParameters); } locker.unlock(); } @@ -5777,33 +5802,39 @@ * @param details external details * @param sourceLoc source location */ - private void normalizeAdapters(ExternalDetailsDecl details, SourceLocation sourceLoc) throws CompilationException { + protected void normalizeAdapters(ExternalDetailsDecl details, SourceLocation sourceLoc) + throws CompilationException { String adapter = details.getAdapter(); - Optional<String> normalizedAdapter = ExternalDataConstants.EXTERNAL_READ_ADAPTERS.stream() - .filter(k -> k.equalsIgnoreCase(adapter)).findFirst(); + Optional<String> normalizedAdapter = + getSupportedAdapters().stream().filter(k -> k.equalsIgnoreCase(adapter)).findFirst(); if (normalizedAdapter.isEmpty()) { throw CompilationException.create(ErrorCode.UNKNOWN_ADAPTER, sourceLoc, adapter); } details.setAdapter(normalizedAdapter.get()); } - protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails, + protected Set<String> getSupportedAdapters() { + return ExternalDataConstants.EXTERNAL_READ_ADAPTERS; + } + + protected Map<String, String> preparePropertiesCopyForValidation(ExternalDetailsDecl externalDetails, Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx, IApplicationContext appCtx, MetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException { - // Validate adapter specific properties normalizeAdapters(externalDetails, srcLoc); String adapter = externalDetails.getAdapter(); Map<String, String> details = new HashMap<>(properties); details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter); - validateAdapterSpecificProperties(details, srcLoc, appCtx); + return details; } protected Map<String, String> createAndValidateAdapterConfigurationForCopyToStmt( ExternalDetailsDecl externalDetailsDecl, Set<String> supportedAdapters, SourceLocation sourceLocation, MetadataTransactionContext mdTxnCtx, MetadataProvider md) throws AlgebricksException { + normalizeAdapters(externalDetailsDecl, sourceLocation); String adapterName = externalDetailsDecl.getAdapter(); Map<String, String> properties = externalDetailsDecl.getProperties(); + properties.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapterName); WriterValidationUtil.validateWriterConfiguration(adapterName, supportedAdapters, properties, sourceLocation); return properties; } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index 29a4a6e..3ffb080 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -9,7 +9,7 @@ "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, "aws.assume.role.duration" : 900, - "aws.refresh.assume.role.threshold" : 0.5, + "aws.refresh.assume.role.threshold.percentage" : 75, "azure.request.timeout" : 120, "cloud.acquire.token.timeout" : 100, "cloud.deployment" : false, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index dbbf83f..aeb3cea 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -9,7 +9,7 @@ "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, "aws.assume.role.duration" : 900, - "aws.refresh.assume.role.threshold" : 0.5, + "aws.refresh.assume.role.threshold.percentage" : 75, "azure.request.timeout" : 120, "cloud.acquire.token.timeout" : 100, "cloud.deployment" : false, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index 4778cb8..b475612 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -9,7 +9,7 @@ "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, "aws.assume.role.duration" : 900, - "aws.refresh.assume.role.threshold" : 0.5, + "aws.refresh.assume.role.threshold.percentage" : 75, "azure.request.timeout" : 120, "cloud.acquire.token.timeout" : 100, "cloud.deployment" : false, diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java index f6559f0..62437b2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java @@ -18,12 +18,12 @@ */ package org.apache.asterix.common.config; -import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE; import static org.apache.hyracks.control.common.config.OptionTypes.LEVEL; import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER_BYTE_UNIT; import static org.apache.hyracks.control.common.config.OptionTypes.STRING; +import static org.apache.hyracks.control.common.config.OptionTypes.getRangedIntegerType; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; @@ -55,14 +55,16 @@ LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a UDF in seconds"), AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client requests in seconds"), AWS_ASSUME_ROLE_DURATION( - POSITIVE_INTEGER, + getRangedIntegerType(900, 43200), 900, "AWS assuming role duration in seconds. " + "Range from 900 seconds (15 mins) to 43200 seconds (12 hours)"), - AWS_REFRESH_ASSUME_ROLE_THRESHOLD( - DOUBLE, - .5, - "Percentage of left duration before assume role credentials " + "needs to be refreshed"); + AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE( + getRangedIntegerType(25, 90), + 75, + "Percentage of duration passed before assume role credentials need to be refreshed, the value ranges " + + "from 25 to 90, default is 75. For example, if the value is set to 65, this means the " + + "credentials need to be refreshed if 65% of the total expiration duration is already passed"); private final IOptionType type; private final Object defaultValue; @@ -91,7 +93,7 @@ case LIBRARY_DEPLOY_TIMEOUT: case AZURE_REQUEST_TIMEOUT: case AWS_ASSUME_ROLE_DURATION: - case AWS_REFRESH_ASSUME_ROLE_THRESHOLD: + case AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE: return Section.COMMON; case CC_JAVA_OPTS: case NC_JAVA_OPTS: @@ -177,7 +179,7 @@ return accessor.getInt(Option.AWS_ASSUME_ROLE_DURATION); } - public double getAwsRefreshAssumeRoleThreshold() { - return accessor.getDouble(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD); + public int getAwsRefreshAssumeRoleThresholdPercentage() { + return accessor.getInt(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java index 3ff444d..3a9ae1c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java @@ -20,64 +20,29 @@ import java.util.Map; -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.metadata.IFullyQualifiedName; - public interface IExternalCredentialsCache { /** - * Returns the cached credentials. Can be of any supported external credentials type + * Returns the cached credentials. * - * @param configuration configuration containing external collection details - * @return credentials if present, null otherwise + * @param key credentials key + * @return credentials if present and not expired/need refreshing, null otherwise */ - Object getCredentials(Map<String, String> configuration) throws CompilationException; + Object get(String key); /** - * Returns the cached credentials. Can be of any supported external credentials type + * Deletes the cache for the provided entity * - * @param fqn fully qualified name of the credentials entity - * @return credentials if present, null otherwise + * @param key credentials key */ - Object getCredentials(IFullyQualifiedName fqn) throws CompilationException; + void delete(String key); /** * Updates the credentials cache with the provided credentials for the specified name * - * @param configuration configuration containing external collection details + * @param key credentials key + * @param type credentials type * @param credentials credentials to cache */ - void updateCache(Map<String, String> configuration, Map<String, String> credentials) throws CompilationException; - - /** - * Updates the credentials cache with the provided credentials for the specified name - * - * @param fqn fully qualified name for the credentials entity - * @param type type of the entity - * @param credentials credentials to cache - */ - void updateCache(IFullyQualifiedName fqn, String type, Map<String, String> credentials); - - /** - * Deletes the cache for the provided enitty - * - * @param fqn fully qualified name of entity for which the credentials are to be deleted - */ - void deleteCredentials(IFullyQualifiedName fqn); - - /** - * Returns the name of the entity which the cached credentials belong to - * - * @param configuration configuration containing external collection details - * @return name of entity which credentials belong to - */ - String getName(Map<String, String> configuration) throws CompilationException; - - /** - * Returns the name of the entity which the cached credentials belong to - * - * @param fqn fully qualified name for the credentials entity - * @return name of entity which credentials belong to - */ - String getName(IFullyQualifiedName fqn) throws CompilationException; + void put(String key, String type, Map<String, String> credentials); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 4cc1656..9e9df8b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -105,6 +105,7 @@ public static final String KEY_WAIT_FOR_DATA = "wait-for-data"; public static final String KEY_FEED_NAME = "feed"; // a string representing external bucket name + public static final String KEY_ENTITY_ID = "entity-id"; public static final String KEY_EXTERNAL_SOURCE_TYPE = "type"; // a comma delimited list of nodes public static final String KEY_NODES = "nodes"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java index 57889ea..4e0a435 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java @@ -241,7 +241,7 @@ public static AwsCredentialsProvider getTrustAccountCredentials(IApplicationContext appCtx, Map<String, String> configuration) throws CompilationException { IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache(); - Object credentialsObject = cache.getCredentials(configuration); + Object credentialsObject = cache.get(configuration.get(ExternalDataConstants.KEY_ENTITY_ID)); if (credentialsObject != null) { return () -> (AwsSessionCredentials) credentialsObject; } @@ -389,7 +389,7 @@ */ public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf jobConf, Map<String, String> configuration, int numberOfPartitions) throws CompilationException { - setHadoopCredentials(jobConf, configuration); + setHadoopCredentials(appCtx, jobConf, configuration); String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); Region region = validateAndGetRegion(configuration.get(REGION_FIELD_NAME)); jobConf.set(HADOOP_REGION, region.toString()); @@ -421,7 +421,8 @@ * @param jobConf hadoop job config * @param configuration external details configuration */ - private static void setHadoopCredentials(JobConf jobConf, Map<String, String> configuration) { + private static void setHadoopCredentials(IApplicationContext appCtx, JobConf jobConf, + Map<String, String> configuration) { AuthenticationType authenticationType = getAuthenticationType(configuration); switch (authenticationType) { case ANONYMOUS: @@ -432,7 +433,11 @@ jobConf.set(HADOOP_ASSUME_ROLE_ARN, configuration.get(ROLE_ARN_FIELD_NAME)); jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, configuration.get(EXTERNAL_ID_FIELD_NAME)); jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, "parquet-" + UUID.randomUUID()); - jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, "15m"); + + // hadoop accepts time 15m to 1h, we will base it on the provided configuration + int durationInSeconds = appCtx.getExternalProperties().getAwsAssumeRoleDuration(); + String hadoopDuration = getHadoopDuration(durationInSeconds); + jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, hadoopDuration); // TODO: this assumes basic keys always, also support if we use InstanceProfile to assume a role jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, HADOOP_SIMPLE); @@ -456,6 +461,36 @@ } /** + * Hadoop accepts duration values from 15m to 1h (in this format). We will base this on the configured + * duration in seconds. If the time exceeds 1 hour, we will return 1h + * + * @param seconds configured duration in seconds + * @return hadoop updated duration + */ + private static String getHadoopDuration(int seconds) { + // constants for time thresholds + final int FIFTEEN_MINUTES_IN_SECONDS = 15 * 60; + final int ONE_HOUR_IN_SECONDS = 60 * 60; + + // Adjust seconds to fit within bounds + if (seconds < FIFTEEN_MINUTES_IN_SECONDS) { + seconds = FIFTEEN_MINUTES_IN_SECONDS; + } else if (seconds > ONE_HOUR_IN_SECONDS) { + seconds = ONE_HOUR_IN_SECONDS; + } + + // Convert seconds to minutes + int minutes = seconds / 60; + + // Format the result + if (minutes == 60) { + return "1h"; + } else { + return minutes + "m"; + } + } + + /** * Validate external dataset properties * * @param configuration properties @@ -471,27 +506,6 @@ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } - String arnRole = configuration.get(ROLE_ARN_FIELD_NAME); - String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME); - String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); - - if (arnRole != null) { - return; - } else if (externalId != null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME, - EXTERNAL_ID_FIELD_NAME); - } else if (accessKeyId == null || secretAccessKey == null) { - // If one is passed, the other is required - if (accessKeyId != null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME, - ACCESS_KEY_ID_FIELD_NAME); - } else if (secretAccessKey != null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME, - SECRET_ACCESS_KEY_FIELD_NAME); - } - } - validateIncludeExclude(configuration); try { // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index ef056c9..6f3a9b6 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -48,6 +48,7 @@ import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.common.metadata.DatasetFullyQualifiedName; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.common.metadata.LockList; import org.apache.asterix.common.metadata.MetadataConstants; @@ -998,6 +999,7 @@ configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, dataset.getDatabaseName()); configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE, dataset.getDataverseName().getCanonicalForm()); + setExternalEntityId(configuration, dataset); return AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(), adapterName, configuration, itemType, null, warningCollector, filterEvaluatorFactory); } catch (Exception e) { @@ -1005,6 +1007,20 @@ } } + public void setExternalEntityId(Map<String, String> configuration, Dataset dataset) throws AlgebricksException { + configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset.getDatasetFullyQualifiedName().toString()); + } + + public void setExternalEntityIdFromParts(Map<String, String> configuration, String database, + DataverseName dataverse, String dataset, boolean isUuid) throws AlgebricksException { + if (isUuid) { + configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset); + } else { + DatasetFullyQualifiedName fqn = new DatasetFullyQualifiedName(database, dataverse, dataset); + configuration.put(ExternalDataConstants.KEY_ENTITY_ID, fqn.toString()); + } + } + public TxnId getTxnId() { return txnId; } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19365 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: If5bd9ed7c1fc0aedba7f7bb90acb48c2edf3d580 Gerrit-Change-Number: 19365 Gerrit-PatchSet: 9 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-MessageType: merged
