>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19365 )
Change subject: [ASTERIXDB-3514][EXT]: Introduce external entity name resolver
......................................................................
[ASTERIXDB-3514][EXT]: Introduce external entity name resolver
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Used to provide a name resolving mechanism. extensions
can extend them to resolve names as desired.
Ext-ref: MB-63505
Change-Id: If5bd9ed7c1fc0aedba7f7bb90acb48c2edf3d580
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalEntityNameResolver.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalEntityNameResolverProvider.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalEntityNameResolver.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalEntityNameResolverProvider.java
14 files changed, 338 insertions(+), 140 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/65/19365/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 9cecaa7..d88ff1d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -26,6 +26,7 @@
import org.apache.asterix.app.external.ExternalCredentialsCache;
import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater;
+import org.apache.asterix.app.external.ExternalEntityNameResolverProvider;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IConfigValidator;
import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -59,6 +60,7 @@
import org.apache.asterix.common.external.IAdapterFactoryService;
import org.apache.asterix.common.external.IExternalCredentialsCache;
import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.external.IExternalEntityNameResolverProvider;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
@@ -131,6 +133,7 @@
private final IOManager ioManager;
private final INamespacePathResolver namespacePathResolver;
private final INamespaceResolver namespaceResolver;
+ private final IExternalEntityNameResolverProvider
externalEntityNameResolverProvider;
private final IExternalCredentialsCache externalCredentialsCache;
private final IExternalCredentialsCacheUpdater
externalCredentialsCacheUpdater;
@@ -183,6 +186,7 @@
this.globalTxManager = globalTxManager;
this.ioManager = ioManager;
dataPartitioningProvider = DataPartitioningProvider.create(this);
+ externalEntityNameResolverProvider = new
ExternalEntityNameResolverProvider();
externalCredentialsCache = new ExternalCredentialsCache(this);
externalCredentialsCacheUpdater = new
ExternalCredentialsCacheUpdater(this);
}
@@ -425,6 +429,11 @@
}
@Override
+ public IExternalEntityNameResolverProvider
getExternalEntityNameResolverProvider() {
+ return externalEntityNameResolverProvider;
+ }
+
+ @Override
public IExternalCredentialsCache getExternalCredentialsCache() {
return externalCredentialsCache;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
index 8cb3a47..3a6325e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -24,14 +24,9 @@
import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.api.IApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IExternalCredentialsCache;
-import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.asterix.common.metadata.IFullyQualifiedName;
-import org.apache.asterix.common.metadata.MetadataConstants;
+import org.apache.asterix.common.external.IExternalEntityNameResolver;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.commons.lang3.tuple.Pair;
@@ -46,22 +41,17 @@
private static final Logger LOGGER = LogManager.getLogger();
private final ConcurrentMap<String, Pair<Span, Object>> cache = new
ConcurrentHashMap<>();
private final int awsAssumeRoleDuration;
- private final double refreshAwsAssumeRoleThreshold;
+ private final int refreshAwsAssumeRoleThresholdPercentage;
public ExternalCredentialsCache(IApplicationContext appCtx) {
this.awsAssumeRoleDuration =
appCtx.getExternalProperties().getAwsAssumeRoleDuration();
- this.refreshAwsAssumeRoleThreshold =
appCtx.getExternalProperties().getAwsRefreshAssumeRoleThreshold();
+ this.refreshAwsAssumeRoleThresholdPercentage =
+
appCtx.getExternalProperties().getAwsRefreshAssumeRoleThresholdPercentage();
}
@Override
- public synchronized Object getCredentials(Map<String, String>
configuration) throws CompilationException {
- IFullyQualifiedName fqn =
getFullyQualifiedNameFromConfiguration(configuration);
- return getCredentials(fqn);
- }
-
- @Override
- public synchronized Object getCredentials(IFullyQualifiedName fqn) {
- String name = getName(fqn);
+ public synchronized Object getCredentials(IExternalEntityNameResolver
resolver) throws CompilationException {
+ String name = resolver.resolveName();
if (cache.containsKey(name) &&
!needsRefresh(cache.get(name).getLeft())) {
return cache.get(name).getRight();
}
@@ -69,36 +59,8 @@
}
@Override
- public synchronized void updateCache(Map<String, String> configuration,
Map<String, String> credentials)
- throws CompilationException {
- IFullyQualifiedName fqn =
getFullyQualifiedNameFromConfiguration(configuration);
- String type =
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
- updateCache(fqn, type, credentials);
- }
-
- @Override
- public synchronized void updateCache(IFullyQualifiedName fqn, String type,
Map<String, String> credentials) {
- String name = getName(fqn);
- if
(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
- updateAwsCache(name, credentials);
- }
- }
-
- private void updateAwsCache(String name, Map<String, String> credentials) {
- String accessKeyId =
credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey =
credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken =
credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
- doUpdateAwsCache(name, AwsSessionCredentials.create(accessKeyId,
secretAccessKey, sessionToken));
- }
-
- private void doUpdateAwsCache(String name, AwsSessionCredentials
credentials) {
- cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration,
TimeUnit.SECONDS), credentials));
- LOGGER.info("Received and cached new credentials for {}", name);
- }
-
- @Override
- public void deleteCredentials(IFullyQualifiedName fqn) {
- String name = getName(fqn);
+ public void deleteCredentials(IExternalEntityNameResolver resolver) throws
CompilationException {
+ String name = resolver.resolveName();
Object removed = cache.remove(name);
if (removed != null) {
LOGGER.info("Removed cached credentials for {}", name);
@@ -108,14 +70,22 @@
}
@Override
- public String getName(Map<String, String> configuration) throws
CompilationException {
- IFullyQualifiedName fqn =
getFullyQualifiedNameFromConfiguration(configuration);
- return getName(fqn);
+ public synchronized void updateCache(IExternalEntityNameResolver resolver,
String type,
+ Map<String, String> credentials) throws CompilationException {
+ String name = resolver.resolveName();
+ if
(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
+ updateAwsCache(name, credentials);
+ }
}
- @Override
- public String getName(IFullyQualifiedName fqn) {
- return fqn.toString();
+ private void updateAwsCache(String name, Map<String, String> credentials) {
+ String accessKeyId =
credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey =
credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
+ String sessionToken =
credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
+
+ AwsSessionCredentials sessionCreds =
AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
+ cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration,
TimeUnit.SECONDS), sessionCreds));
+ LOGGER.info("Received and cached new credentials for {}", name);
}
/**
@@ -125,27 +95,9 @@
* @return true if the remaining time is less than the configured refresh
percentage, false otherwise
*/
private boolean needsRefresh(Span span) {
- return (double) span.remaining(TimeUnit.SECONDS)
- / span.getSpan(TimeUnit.SECONDS) <
refreshAwsAssumeRoleThreshold;
- }
-
- protected IFullyQualifiedName
getFullyQualifiedNameFromConfiguration(Map<String, String> configuration)
- throws CompilationException {
- String database =
configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
- if (database == null) {
- database = MetadataConstants.DEFAULT_DATABASE;
- }
- String stringDataverse =
configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE);
- DataverseName dataverse = getDataverseName(stringDataverse);
- String dataset = configuration.get(ExternalDataConstants.KEY_DATASET);
- return new DatasetFullyQualifiedName(database, dataverse, dataset);
- }
-
- protected DataverseName getDataverseName(String dataverse) throws
CompilationException {
- try {
- return DataverseName.createSinglePartName(dataverse);
- } catch (AsterixException ex) {
- throw new
CompilationException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, dataverse);
- }
+ double remaining = (double) span.remaining(TimeUnit.SECONDS) /
span.getSpan(TimeUnit.SECONDS);
+ double passed = 1 - remaining;
+ int passedPercentage = (int) (passed * 100);
+ return passedPercentage > refreshAwsAssumeRoleThresholdPercentage;
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
index f07caaa..3d72bc7 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -39,7 +39,9 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.external.IExternalCredentialsCache;
import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.external.IExternalEntityNameResolver;
import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.asterix.messaging.CCMessageBroker;
@@ -65,7 +67,9 @@
public synchronized Object generateAndCacheCredentials(Map<String, String>
configuration)
throws HyracksDataException, CompilationException {
IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
- Object credentials = cache.getCredentials(configuration);
+ IExternalEntityNameResolver nameResolver =
appCtx.getExternalEntityNameResolverProvider().create(configuration);
+ String name = nameResolver.resolveName();
+ Object credentials = cache.getCredentials(nameResolver);
if (credentials != null) {
return credentials;
}
@@ -74,9 +78,7 @@
* if we are the CC, generate new creds and ask all NCs to update
their cache
* if we are the NC, send a message to the CC to generate new creds
and ask all NCs to update their cache
*/
- String name = cache.getName(configuration);
- if (appCtx instanceof ICcApplicationContext) {
- ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+ if (appCtx instanceof ICcApplicationContext ccAppCtx) {
IClusterManagementWork.ClusterState state =
ccAppCtx.getClusterStateManager().getState();
if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE,
state);
@@ -106,7 +108,8 @@
// request all NCs to update their credentials cache with the
latest creds
updateNcsCredentialsCache(ccAppCtx, name, credentialsMap,
configuration);
- cache.updateCache(configuration, credentialsMap);
+ String type =
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+ cache.updateCache(nameResolver, type, credentialsMap);
credentials = AwsSessionCredentials.create(accessKeyId,
secretAccessKey, sessionToken);
} else {
NCMessageBroker broker = (NCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalEntityNameResolver.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalEntityNameResolver.java
new file mode 100644
index 0000000..25fd64c
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalEntityNameResolver.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalEntityNameResolver;
+import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataConstants;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.metadata.entities.Dataset;
+
+public class ExternalEntityNameResolver implements IExternalEntityNameResolver
{
+ protected Object entity;
+ protected Map<String, String> configuration;
+
+ protected ExternalEntityNameResolver(Object entity) {
+ this.entity = entity;
+ }
+
+ protected ExternalEntityNameResolver(Map<String, String> configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public String resolveName() throws CompilationException {
+ if (entity == null && configuration == null) {
+ throw new IllegalStateException("No external entity provided");
+ }
+
+ if (entity != null) {
+ return resolveNameFromEntity();
+ } else {
+ return resolveNameFromConfiguration();
+ }
+ }
+
+ protected String resolveNameFromEntity() {
+ if (entity instanceof Dataset dataset && dataset.getDatasetType() ==
DatasetConfig.DatasetType.EXTERNAL) {
+ return dataset.getDatasetFullyQualifiedName().toString();
+ }
+ throw new IllegalArgumentException(entity.getClass() + " is an invalid
external entity type");
+ }
+
+ protected String resolveNameFromConfiguration() throws
CompilationException {
+ String database =
configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
+ if (database == null) {
+ database = MetadataConstants.DEFAULT_DATABASE;
+ }
+ DataverseName dataverseName;
+ String stringDataverse =
configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE);
+ try {
+ dataverseName =
DataverseName.createSinglePartName(stringDataverse);
+ } catch (AsterixException ex) {
+ throw new
CompilationException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, stringDataverse);
+ }
+ String dataset = configuration.get(ExternalDataConstants.KEY_DATASET);
+ return new DatasetFullyQualifiedName(database, dataverseName,
dataset).toString();
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalEntityNameResolverProvider.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalEntityNameResolverProvider.java
new file mode 100644
index 0000000..77db93d
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalEntityNameResolverProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.util.Map;
+
+import org.apache.asterix.common.external.IExternalEntityNameResolver;
+import org.apache.asterix.common.external.IExternalEntityNameResolverProvider;
+
+public class ExternalEntityNameResolverProvider implements
IExternalEntityNameResolverProvider {
+
+ public ExternalEntityNameResolverProvider() {
+ }
+
+ public IExternalEntityNameResolver create(Object entity) {
+ return new ExternalEntityNameResolver(entity);
+ }
+
+ public IExternalEntityNameResolver create(Map<String, String>
configuration) {
+ return new ExternalEntityNameResolver(configuration);
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
index d1a9ebf..f054589 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -22,7 +22,9 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalEntityNameResolver;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -42,7 +44,10 @@
@Override
public void handle(INcApplicationContext appCtx) throws
HyracksDataException {
try {
- appCtx.getExternalCredentialsCache().updateCache(configuration,
credentials);
+ IExternalEntityNameResolver nameResolver =
+
appCtx.getExternalEntityNameResolverProvider().create(configuration);
+ String type =
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+ appCtx.getExternalCredentialsCache().updateCache(nameResolver,
type, credentials);
} catch (CompilationException ex) {
LOGGER.info("Failed to process request", ex);
throw HyracksDataException.create(ex);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 343baf0..e68a6e2 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -34,6 +34,7 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.app.external.ExternalCredentialsCache;
import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater;
+import org.apache.asterix.app.external.ExternalEntityNameResolverProvider;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.cloud.CloudConfigurator;
import org.apache.asterix.cloud.LocalPartitionBootstrapper;
@@ -67,6 +68,7 @@
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.external.IExternalCredentialsCache;
import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.external.IExternalEntityNameResolverProvider;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
@@ -190,6 +192,7 @@
private final INamespacePathResolver namespacePathResolver;
private final INamespaceResolver namespaceResolver;
private IDiskCacheMonitoringService diskCacheService;
+ protected IExternalEntityNameResolverProvider
externalEntityNameResolverProvider;
protected IExternalCredentialsCache externalCredentialsCache;
protected IExternalCredentialsCacheUpdater externalCredentialsCacheUpdater;
@@ -216,6 +219,7 @@
cacheManager = new CacheManager();
this.namespacePathResolver = namespacePathResolver;
this.namespaceResolver = namespaceResolver;
+ this.externalEntityNameResolverProvider = new
ExternalEntityNameResolverProvider();
this.externalCredentialsCache = new ExternalCredentialsCache(this);
this.externalCredentialsCacheUpdater = new
ExternalCredentialsCacheUpdater(this);
}
@@ -758,6 +762,11 @@
}
@Override
+ public IExternalEntityNameResolverProvider
getExternalEntityNameResolverProvider() {
+ return externalEntityNameResolverProvider;
+ }
+
+ @Override
public IExternalCredentialsCache getExternalCredentialsCache() {
return externalCredentialsCache;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 51371da..a36e83a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2448,9 +2448,7 @@
sourceLoc, EnumSet.of(DropOption.IF_EXISTS),
requestParameters.isForceDropDataset());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
- if (ds.getDatasetType().equals(DatasetType.EXTERNAL)) {
-
appCtx.getExternalCredentialsCache().deleteCredentials(ds.getDatasetFullyQualifiedName());
- }
+ deleteDatasetCachedCredentials(ds);
return true;
} catch (Exception e) {
LOGGER.error("failed to drop dataset; executing compensating
operations", e);
@@ -2498,6 +2496,13 @@
}
}
+ protected void deleteDatasetCachedCredentials(Dataset dataset) throws
CompilationException {
+ if (dataset.getDatasetType().equals(DatasetType.EXTERNAL)) {
+ appCtx.getExternalCredentialsCache()
+
.deleteCredentials(appCtx.getExternalEntityNameResolverProvider().create(dataset));
+ }
+ }
+
protected void handleIndexDropStatement(MetadataProvider metadataProvider,
Statement stmt,
IHyracksClientConnection hcc, IRequestParameters
requestParameters) throws Exception {
IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index 19e4ad7..439d58b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.external.IExternalCredentialsCache;
import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.external.IExternalEntityNameResolverProvider;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -110,6 +111,12 @@
INamespacePathResolver getNamespacePathResolver();
/**
+ * Enables extensions to provide their own name resolvers for external
entities
+ * @return external entity name resolver factory
+ */
+ IExternalEntityNameResolverProvider
getExternalEntityNameResolverProvider();
+
+ /**
* @return external credentials cache
*/
IExternalCredentialsCache getExternalCredentialsCache();
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index f6559f0..62437b2 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -18,12 +18,12 @@
*/
package org.apache.asterix.common.config;
-import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
import static org.apache.hyracks.control.common.config.OptionTypes.LEVEL;
import static
org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
import static
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
import static
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static
org.apache.hyracks.control.common.config.OptionTypes.getRangedIntegerType;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
@@ -55,14 +55,16 @@
LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a
UDF in seconds"),
AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client
requests in seconds"),
AWS_ASSUME_ROLE_DURATION(
- POSITIVE_INTEGER,
+ getRangedIntegerType(900, 43200),
900,
"AWS assuming role duration in seconds. "
+ "Range from 900 seconds (15 mins) to 43200 seconds
(12 hours)"),
- AWS_REFRESH_ASSUME_ROLE_THRESHOLD(
- DOUBLE,
- .5,
- "Percentage of left duration before assume role credentials "
+ "needs to be refreshed");
+ AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE(
+ getRangedIntegerType(25, 90),
+ 75,
+ "Percentage of duration passed before assume role credentials
need to be refreshed, the value ranges "
+ + "from 25 to 90, default is 75. For example, if the
value is set to 65, this means the "
+ + "credentials need to be refreshed if 65% of the
total expiration duration is already passed");
private final IOptionType type;
private final Object defaultValue;
@@ -91,7 +93,7 @@
case LIBRARY_DEPLOY_TIMEOUT:
case AZURE_REQUEST_TIMEOUT:
case AWS_ASSUME_ROLE_DURATION:
- case AWS_REFRESH_ASSUME_ROLE_THRESHOLD:
+ case AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE:
return Section.COMMON;
case CC_JAVA_OPTS:
case NC_JAVA_OPTS:
@@ -177,7 +179,7 @@
return accessor.getInt(Option.AWS_ASSUME_ROLE_DURATION);
}
- public double getAwsRefreshAssumeRoleThreshold() {
- return accessor.getDouble(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD);
+ public int getAwsRefreshAssumeRoleThresholdPercentage() {
+ return
accessor.getInt(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE);
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
index 3ff444d..5f33b53 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -21,63 +21,31 @@
import java.util.Map;
import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.metadata.IFullyQualifiedName;
public interface IExternalCredentialsCache {
/**
- * Returns the cached credentials. Can be of any supported external
credentials type
+ * Returns the cached credentials.
*
- * @param configuration configuration containing external collection
details
- * @return credentials if present, null otherwise
+ * @param resolver external entity name resolver
+ * @return credentials if present and not expired/need refreshing, null
otherwise
*/
- Object getCredentials(Map<String, String> configuration) throws
CompilationException;
-
- /**
- * Returns the cached credentials. Can be of any supported external
credentials type
- *
- * @param fqn fully qualified name of the credentials entity
- * @return credentials if present, null otherwise
- */
- Object getCredentials(IFullyQualifiedName fqn) throws CompilationException;
-
- /**
- * Updates the credentials cache with the provided credentials for the
specified name
- *
- * @param configuration configuration containing external collection
details
- * @param credentials credentials to cache
- */
- void updateCache(Map<String, String> configuration, Map<String, String>
credentials) throws CompilationException;
-
- /**
- * Updates the credentials cache with the provided credentials for the
specified name
- *
- * @param fqn fully qualified name for the credentials entity
- * @param type type of the entity
- * @param credentials credentials to cache
- */
- void updateCache(IFullyQualifiedName fqn, String type, Map<String, String>
credentials);
+ Object getCredentials(IExternalEntityNameResolver resolver) throws
CompilationException;
/**
* Deletes the cache for the provided enitty
*
- * @param fqn fully qualified name of entity for which the credentials are
to be deleted
+ * @param resolver external entity name resolver
*/
- void deleteCredentials(IFullyQualifiedName fqn);
+ void deleteCredentials(IExternalEntityNameResolver resolver) throws
CompilationException;
/**
- * Returns the name of the entity which the cached credentials belong to
+ * Updates the credentials cache with the provided credentials for the
specified name
*
- * @param configuration configuration containing external collection
details
- * @return name of entity which credentials belong to
+ * @param resolver external entity name resolver
+ * @param type type of the entity
+ * @param credentials credentials to cache
*/
- String getName(Map<String, String> configuration) throws
CompilationException;
-
- /**
- * Returns the name of the entity which the cached credentials belong to
- *
- * @param fqn fully qualified name for the credentials entity
- * @return name of entity which credentials belong to
- */
- String getName(IFullyQualifiedName fqn) throws CompilationException;
+ void updateCache(IExternalEntityNameResolver resolver, String type,
Map<String, String> credentials)
+ throws CompilationException;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalEntityNameResolver.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalEntityNameResolver.java
new file mode 100644
index 0000000..1d9caf7
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalEntityNameResolver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+
+@FunctionalInterface
+public interface IExternalEntityNameResolver {
+
+ String resolveName() throws CompilationException;
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalEntityNameResolverProvider.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalEntityNameResolverProvider.java
new file mode 100644
index 0000000..a6a5a36
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalEntityNameResolverProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import java.util.Map;
+
+public interface IExternalEntityNameResolverProvider {
+
+ /**
+ * Creates an external entity name resolver based on the provided object
entity
+ * @param entity object entity
+ * @return external entity name resolver
+ */
+ IExternalEntityNameResolver create(Object entity);
+
+ /**
+ * Creates an external entity name resolver based on the provided
configuration
+ * @param configuration configuration
+ * @return external entity name resolver
+ */
+ IExternalEntityNameResolver create(Map<String, String> configuration);
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 57889ea..f5b7a76 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -241,7 +241,8 @@
public static AwsCredentialsProvider
getTrustAccountCredentials(IApplicationContext appCtx,
Map<String, String> configuration) throws CompilationException {
IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
- Object credentialsObject = cache.getCredentials(configuration);
+ Object credentialsObject =
+
cache.getCredentials(appCtx.getExternalEntityNameResolverProvider().create(configuration));
if (credentialsObject != null) {
return () -> (AwsSessionCredentials) credentialsObject;
}
@@ -389,7 +390,7 @@
*/
public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx,
JobConf jobConf,
Map<String, String> configuration, int numberOfPartitions) throws
CompilationException {
- setHadoopCredentials(jobConf, configuration);
+ setHadoopCredentials(appCtx, jobConf, configuration);
String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
Region region =
validateAndGetRegion(configuration.get(REGION_FIELD_NAME));
jobConf.set(HADOOP_REGION, region.toString());
@@ -421,7 +422,8 @@
* @param jobConf hadoop job config
* @param configuration external details configuration
*/
- private static void setHadoopCredentials(JobConf jobConf, Map<String,
String> configuration) {
+ private static void setHadoopCredentials(IApplicationContext appCtx,
JobConf jobConf,
+ Map<String, String> configuration) {
AuthenticationType authenticationType =
getAuthenticationType(configuration);
switch (authenticationType) {
case ANONYMOUS:
@@ -432,7 +434,11 @@
jobConf.set(HADOOP_ASSUME_ROLE_ARN,
configuration.get(ROLE_ARN_FIELD_NAME));
jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID,
configuration.get(EXTERNAL_ID_FIELD_NAME));
jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, "parquet-" +
UUID.randomUUID());
- jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, "15m");
+
+ // hadoop accepts time 15m to 1h, we will base it on the
provided configuration
+ int durationInSeconds =
appCtx.getExternalProperties().getAwsAssumeRoleDuration();
+ String hadoopDuration = getHadoopDuration(durationInSeconds);
+ jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION,
hadoopDuration);
// TODO: this assumes basic keys always, also support if we
use InstanceProfile to assume a role
jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY,
HADOOP_SIMPLE);
@@ -456,6 +462,36 @@
}
/**
+ * Hadoop accepts duration values from 15m to 1h (in this format). We will
base this on the configured
+ * duration in seconds. If the time exceeds 1 hour, we will return 1h
+ *
+ * @param seconds configured duration in seconds
+ * @return hadoop updated duration
+ */
+ private static String getHadoopDuration(int seconds) {
+ // constants for time thresholds
+ final int FIFTEEN_MINUTES_IN_SECONDS = 15 * 60;
+ final int ONE_HOUR_IN_SECONDS = 60 * 60;
+
+ // Adjust seconds to fit within bounds
+ if (seconds < FIFTEEN_MINUTES_IN_SECONDS) {
+ seconds = FIFTEEN_MINUTES_IN_SECONDS;
+ } else if (seconds > ONE_HOUR_IN_SECONDS) {
+ seconds = ONE_HOUR_IN_SECONDS;
+ }
+
+ // Convert seconds to minutes
+ int minutes = seconds / 60;
+
+ // Format the result
+ if (minutes == 60) {
+ return "1h";
+ } else {
+ return minutes + "m";
+ }
+ }
+
+ /**
* Validate external dataset properties
*
* @param configuration properties
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19365
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: If5bd9ed7c1fc0aedba7f7bb90acb48c2edf3d580
Gerrit-Change-Number: 19365
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-MessageType: newchange