>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20493?usp=email )
Change subject: [ASTERIXDB-3659][EXT]: delegate assume role auth to AWS SDK
......................................................................
[ASTERIXDB-3659][EXT]: delegate assume role auth to AWS SDK
Ext-ref: MB-68987
Change-Id: I6a3c755f94d377b443f21594443fac830875610e
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M asterixdb/asterix-external-data/pom.xml
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
6 files changed, 243 insertions(+), 94 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/93/20493/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index c9fd485..defffe0 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -50,6 +50,8 @@
import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.util.aws.AwsUtils;
+import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.util.IoUtil;
@@ -63,6 +65,7 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.SdkHttpClient;
@@ -90,6 +93,7 @@
public final class S3CloudClient implements ICloudClient {
private static final Logger LOGGER = LogManager.getLogger();
private final S3ClientConfig config;
+ private final CloseableAwsClients awsClients;
private final S3Client s3Client;
private final ICloudGuardian guardian;
private final IRequestProfilerLimiter profiler;
@@ -99,9 +103,10 @@
this(config, buildClient(config), guardian);
}
- public S3CloudClient(S3ClientConfig config, S3Client s3Client,
ICloudGuardian guardian) {
+ public S3CloudClient(S3ClientConfig config, CloseableAwsClients
awsClients, ICloudGuardian guardian) {
this.config = config;
- this.s3Client = s3Client;
+ this.awsClients = awsClients;
+ this.s3Client = (S3Client) awsClients.getConsumingClient();
this.guardian = guardian;
this.writeBufferSize = config.getWriteBufferSize();
long profilerInterval = config.getProfilerLogInterval();
@@ -344,7 +349,7 @@
@Override
public void close() {
- s3Client.close();
+ AwsUtils.closeClients(awsClients);
}
@Override
@@ -359,9 +364,11 @@
return new S3BufferedWriter(s3Client, profiler, guardian, bucket,
config.getPrefix() + path);
}
- private static S3Client buildClient(S3ClientConfig config) {
+ private static CloseableAwsClients buildClient(S3ClientConfig config) {
+ CloseableAwsClients awsClients = new CloseableAwsClients();
S3ClientBuilder builder = S3Client.builder();
- builder.credentialsProvider(config.createCredentialsProvider());
+ AwsCredentialsProvider credentialsProvider =
config.createCredentialsProvider();
+ builder.credentialsProvider(credentialsProvider);
builder.region(Region.of(config.getRegion()));
builder.forcePathStyle(config.isForcePathStyle());
@@ -386,7 +393,10 @@
}
SdkHttpClient httpClient =
ApacheHttpClient.builder().buildWithDefaults(customHttpConfigBuilder.build());
builder.httpClient(httpClient);
- return builder.build();
+
+ awsClients.setConsumingClient(builder.build());
+ awsClients.setCredentialsProvider(credentialsProvider);
+ return awsClients;
}
private Set<CloudFile> filterAndGet(List<S3Object> contents,
FilenameFilter filter) {
diff --git a/asterixdb/asterix-external-data/pom.xml
b/asterixdb/asterix-external-data/pom.xml
index f3be28f..520642f 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -461,6 +461,10 @@
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
+ <artifactId>glue</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
</dependency>
<dependency>
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index bea0966..ded052b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -36,6 +36,7 @@
import
org.apache.asterix.external.input.record.reader.stream.AvailableInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.AwsUtils;
+import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -53,15 +54,18 @@
private static final int MAX_RETRIES = 5; // We will retry 5 times in case
of internal error from AWS S3 service
private final IApplicationContext ncAppCtx;
private final String bucket;
- private S3Client s3Client;
+ private final CloseableAwsClients awsClients;
+ private final S3Client s3Client;
private ResponseInputStream<?> s3InStream;
public AwsS3InputStream(IApplicationContext ncAppCtx, Map<String, String>
configuration, List<String> filePaths,
IExternalFilterValueEmbedder valueEmbedder) throws
HyracksDataException {
super(configuration, filePaths, valueEmbedder);
this.ncAppCtx = ncAppCtx;
- this.s3Client = buildAwsS3Client(configuration);
this.bucket =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+ this.awsClients = buildAwsS3Client(configuration);
+ this.s3Client = (S3Client) awsClients.getConsumingClient();
}
@Override
@@ -97,11 +101,7 @@
LOGGER.debug(() -> "Key " + userData(request.key()) + " was
not found in bucket {}" + request.bucket());
return false;
} catch (S3Exception ex) {
- if (AwsUtils.isArnAssumedRoleExpiredToken(configuration,
ex.awsErrorDetails().errorCode())) {
- LOGGER.debug(() -> "Expired AWS assume role session, will
attempt to refresh the session");
- rebuildAwsS3Client(configuration);
- LOGGER.debug(() -> "Successfully refreshed AWS assume role
session");
- } else if (shouldRetry(ex.awsErrorDetails().errorCode(),
retries++)) {
+ if (shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
LOGGER.debug(() -> "S3 retryable error: " +
userData(ex.getMessage()));
} else {
throw
RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
getMessageOrToString(ex));
@@ -132,9 +132,7 @@
}
CleanupUtils.close(in, null);
}
- if (s3Client != null) {
- CleanupUtils.close(s3Client, null);
- }
+ AwsUtils.closeClients(awsClients);
}
@Override
@@ -147,15 +145,11 @@
return false;
}
- private S3Client buildAwsS3Client(Map<String, String> configuration)
throws HyracksDataException {
+ private CloseableAwsClients buildAwsS3Client(Map<String, String>
configuration) throws HyracksDataException {
try {
return S3Utils.buildClient(ncAppCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
}
-
- private void rebuildAwsS3Client(Map<String, String> configuration) throws
HyracksDataException {
- s3Client = buildAwsS3Client(configuration);
- }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
index a648853..f3d7613 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
@@ -24,7 +24,6 @@
import static
org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
import static
org.apache.asterix.external.util.aws.AwsConstants.ACCESS_KEY_ID_FIELD_NAME;
import static
org.apache.asterix.external.util.aws.AwsConstants.CROSS_REGION_FIELD_NAME;
-import static
org.apache.asterix.external.util.aws.AwsConstants.ERROR_EXPIRED_TOKEN;
import static
org.apache.asterix.external.util.aws.AwsConstants.EXTERNAL_ID_FIELD_NAME;
import static
org.apache.asterix.external.util.aws.AwsConstants.INSTANCE_PROFILE_FIELD_NAME;
import static
org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME;
@@ -33,6 +32,7 @@
import static
org.apache.asterix.external.util.aws.AwsConstants.SESSION_TOKEN_FIELD_NAME;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -41,10 +41,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.external.IExternalCredentialsCache;
-import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -52,10 +49,13 @@
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.AwsClient;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
import software.amazon.awssdk.services.sts.model.Credentials;
@@ -74,19 +74,14 @@
throw new AssertionError("do not instantiate");
}
- public static boolean isArnAssumedRoleExpiredToken(Map<String, String>
configuration, String errorCode) {
- return ERROR_EXPIRED_TOKEN.equals(errorCode)
- && getAuthenticationType(configuration) ==
AuthenticationType.ARN_ASSUME_ROLE;
- }
-
public static AwsCredentialsProvider
buildCredentialsProvider(IApplicationContext appCtx,
- Map<String, String> configuration) throws CompilationException {
+ Map<String, String> configuration, CloseableAwsClients awsClients)
throws CompilationException {
AuthenticationType authenticationType =
getAuthenticationType(configuration);
switch (authenticationType) {
case ANONYMOUS:
return AnonymousCredentialsProvider.create();
case ARN_ASSUME_ROLE:
- return getTrustAccountCredentials(appCtx, configuration);
+ return getTrustAccountCredentials(appCtx, configuration,
awsClients);
case INSTANCE_PROFILE:
return getInstanceProfileCredentials(configuration);
case ACCESS_KEYS:
@@ -157,21 +152,29 @@
* @throws CompilationException CompilationException
*/
public static AwsCredentialsProvider
getTrustAccountCredentials(IApplicationContext appCtx,
- Map<String, String> configuration) throws CompilationException {
- IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
- Object credentialsObject =
cache.get(configuration.get(ExternalDataConstants.KEY_ENTITY_ID));
- if (credentialsObject != null) {
- return () -> (AwsSessionCredentials) credentialsObject;
- }
- IExternalCredentialsCacheUpdater cacheUpdater =
appCtx.getExternalCredentialsCacheUpdater();
- AwsSessionCredentials credentials;
- try {
- credentials = (AwsSessionCredentials)
cacheUpdater.generateAndCacheCredentials(configuration);
- } catch (HyracksDataException ex) {
- throw new
CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION,
ex, ex.getMessage());
- }
+ Map<String, String> configuration, CloseableAwsClients awsClients)
throws CompilationException {
+ AwsCredentialsProvider credentialsToAssumeRole =
getCredentialsToAssumeRole(configuration);
- return () -> credentials;
+ // build sts client used for assuming role
+ StsClientBuilder stsClientBuilder = StsClient.builder();
+ stsClientBuilder.credentialsProvider(credentialsToAssumeRole);
+
stsClientBuilder.region(validateAndGetRegion(configuration.get(REGION_FIELD_NAME)));
+ StsClient stsClient = stsClientBuilder.build();
+ awsClients.setStsClient(stsClient);
+
+ AssumeRoleRequest.Builder refreshRequestBuilder =
AssumeRoleRequest.builder();
+ refreshRequestBuilder.roleArn(configuration.get(ROLE_ARN_FIELD_NAME));
+
refreshRequestBuilder.externalId(configuration.get(EXTERNAL_ID_FIELD_NAME));
+ refreshRequestBuilder.roleSessionName(UUID.randomUUID().toString());
+ refreshRequestBuilder.durationSeconds((int)
Duration.ofHours(1).toSeconds());
+
+ StsAssumeRoleCredentialsProvider.Builder builder =
StsAssumeRoleCredentialsProvider.builder();
+ builder.refreshRequest(refreshRequestBuilder.build());
+ builder.stsClient(stsClient);
+ StsAssumeRoleCredentialsProvider credentialsProvider = builder.build();
+ awsClients.setCredentialsProvider(credentialsProvider);
+
+ return credentialsProvider;
}
/**
@@ -283,4 +286,57 @@
public static String generateExternalId() {
return UUID.randomUUID().toString();
}
+
+ public static void closeClients(CloseableAwsClients clients) {
+ if (clients == null) {
+ return;
+ }
+
+ if (clients.getConsumingClient() != null) {
+ CleanupUtils.close(clients.getConsumingClient(), null);
+ }
+
+ if (clients.getStsClient() != null) {
+ CleanupUtils.close(clients.getStsClient(), null);
+ }
+
+ AwsCredentialsProvider credentialsProvider =
clients.getCredentialsProvider();
+ if (credentialsProvider instanceof StsAssumeRoleCredentialsProvider
assumeRoleCredentialsProvider) {
+ CleanupUtils.close(assumeRoleCredentialsProvider, null);
+ }
+ }
+
+ public static class CloseableAwsClients {
+ private AwsClient consumingClient;
+ private StsClient stsClient;
+ private AwsCredentialsProvider credentialsProvider;
+
+ public CloseableAwsClients() {
+
+ }
+
+ public AwsClient getConsumingClient() {
+ return consumingClient;
+ }
+
+ public StsClient getStsClient() {
+ return stsClient;
+ }
+
+ public AwsCredentialsProvider getCredentialsProvider() {
+ return credentialsProvider;
+ }
+
+ public void setConsumingClient(AwsClient consumingClient) {
+ this.consumingClient = consumingClient;
+ }
+
+ public void setStsClient(StsClient stsClient) {
+ this.stsClient = stsClient;
+ }
+
+ public void setCredentialsProvider(AwsCredentialsProvider
credentialsProvider) {
+ this.credentialsProvider = credentialsProvider;
+ }
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
new file mode 100644
index 0000000..f4e52b0
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws.glue;
+
+import static
org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME;
+import static
org.apache.asterix.external.util.aws.AwsConstants.SERVICE_END_POINT_FIELD_NAME;
+import static
org.apache.asterix.external.util.aws.AwsUtils.buildCredentialsProvider;
+import static
org.apache.asterix.external.util.aws.AwsUtils.validateAndGetRegion;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.GlueClientBuilder;
+
+public class GlueUtils {
+ private GlueUtils() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ /**
+ * Builds the S3 client using the provided configuration
+ *
+ * @param configuration properties
+ * @return S3 client
+ * @throws CompilationException CompilationException
+ */
+ public static CloseableAwsClients buildClient(IApplicationContext appCtx,
Map<String, String> configuration)
+ throws CompilationException {
+ CloseableAwsClients awsClients = new CloseableAwsClients();
+ String regionId = configuration.get(REGION_FIELD_NAME);
+ String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+ Region region = validateAndGetRegion(regionId);
+ AwsCredentialsProvider credentialsProvider =
buildCredentialsProvider(appCtx, configuration, awsClients);
+
+ GlueClientBuilder builder = GlueClient.builder();
+ builder.region(region);
+ builder.credentialsProvider(credentialsProvider);
+
+ // Validate the service endpoint if present
+ if (serviceEndpoint != null) {
+ try {
+ URI uri = new URI(serviceEndpoint);
+ try {
+ builder.endpointOverride(uri);
+ } catch (NullPointerException ex) {
+ throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
getMessageOrToString(ex));
+ }
+ } catch (URISyntaxException ex) {
+ throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
+ String.format("Invalid service endpoint %s",
serviceEndpoint));
+ }
+ }
+
+ awsClients.setConsumingClient(builder.build());
+ return awsClients;
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index e20e39f..46dbfae 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -79,6 +79,7 @@
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.aws.AwsUtils;
+import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -86,7 +87,6 @@
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.api.util.CleanupUtils;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkException;
@@ -115,14 +115,15 @@
* @return S3 client
* @throws CompilationException CompilationException
*/
- public static S3Client buildClient(IApplicationContext appCtx, Map<String,
String> configuration)
+ public static CloseableAwsClients buildClient(IApplicationContext appCtx,
Map<String, String> configuration)
throws CompilationException {
+ CloseableAwsClients awsClients = new CloseableAwsClients();
String regionId = configuration.get(REGION_FIELD_NAME);
String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
Region region = validateAndGetRegion(regionId);
boolean crossRegion =
validateAndGetCrossRegion(configuration.get(CROSS_REGION_FIELD_NAME));
- AwsCredentialsProvider credentialsProvider =
buildCredentialsProvider(appCtx, configuration);
+ AwsCredentialsProvider credentialsProvider =
buildCredentialsProvider(appCtx, configuration, awsClients);
S3ClientBuilder builder = S3Client.builder();
builder.region(region);
@@ -147,7 +148,8 @@
boolean pathStyleAddressing =
validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME),
serviceEndpoint);
builder.forcePathStyle(pathStyleAddressing);
- return builder.build();
+ awsClients.setConsumingClient(builder.build());
+ return awsClients;
}
public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx,
JobConf conf,
@@ -293,7 +295,8 @@
}
// Check if the bucket is present
- S3Client s3Client = buildClient(appCtx, configuration);
+ CloseableAwsClients awsClients = buildClient(appCtx, configuration);
+ S3Client s3Client = (S3Client) awsClients.getConsumingClient();
S3Response response;
boolean useOldApi = false;
String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -317,9 +320,7 @@
} catch (SdkException ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, getMessageOrToString(ex));
} finally {
- if (s3Client != null) {
- CleanupUtils.close(s3Client, null);
- }
+ AwsUtils.closeClients(awsClients);
}
boolean isEmpty = useOldApi ? ((ListObjectsResponse)
response).contents().isEmpty()
@@ -379,7 +380,8 @@
// Prepare to retrieve the objects
List<S3Object> filesOnly;
String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- S3Client s3Client = buildClient(appCtx, configuration);
+ CloseableAwsClients awsClients = buildClient(appCtx, configuration);
+ S3Client s3Client = (S3Client) awsClients.getConsumingClient();
String prefix = getPrefix(configuration);
try {
@@ -401,9 +403,7 @@
} catch (SdkException ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, getMessageOrToString(ex));
} finally {
- if (s3Client != null) {
- CleanupUtils.close(s3Client, null);
- }
+ AwsUtils.closeClients(awsClients);
}
// Warn if no files are returned
@@ -522,56 +522,57 @@
}
}
- public static Map<String, List<String>>
S3ObjectsOfSingleDepth(IApplicationContext appCtx,
- Map<String, String> configuration, String container, String prefix)
- throws CompilationException, HyracksDataException {
- // create s3 client
- S3Client s3Client = buildClient(appCtx, configuration);
- // fetch all the s3 objects
- return listS3ObjectsOfSingleDepth(s3Client, container, prefix);
- }
-
/**
* Uses the latest API to retrieve the objects from the storage of a
single level.
*
- * @param s3Client S3 client
- * @param container container name
- * @param prefix definition prefix
+ * @param appCtx application context
+ * @param configuration configuration
+ * @param container container name
+ * @param prefix definition prefix
*/
- private static Map<String, List<String>>
listS3ObjectsOfSingleDepth(S3Client s3Client, String container,
- String prefix) {
- Map<String, List<String>> allObjects = new HashMap<>();
- ListObjectsV2Iterable listObjectsInterable;
- ListObjectsV2Request.Builder listObjectsBuilder =
-
ListObjectsV2Request.builder().bucket(container).prefix(prefix).delimiter("/");
+ public static Map<String, List<String>>
listS3ObjectsOfSingleDepth(IApplicationContext appCtx,
+ Map<String, String> configuration, String container, String
prefix) throws CompilationException {
+ CloseableAwsClients awsClients = buildClient(appCtx, configuration);
+ S3Client s3Client = (S3Client) awsClients.getConsumingClient();
+
+ ListObjectsV2Request.Builder listObjectsBuilder =
ListObjectsV2Request.builder();
+ listObjectsBuilder.bucket(container);
listObjectsBuilder.prefix(prefix);
+ listObjectsBuilder.delimiter("/");
+ ListObjectsV2Request listObjectsV2Request = listObjectsBuilder.build();
+
+ Map<String, List<String>> allObjects = new HashMap<>();
List<String> files = new ArrayList<>();
List<String> folders = new ArrayList<>();
+
// to skip the prefix as a file from the response
boolean checkPrefixInFile = true;
- listObjectsInterable =
s3Client.listObjectsV2Paginator(listObjectsBuilder.build());
- for (ListObjectsV2Response response : listObjectsInterable) {
- // put all the files
- for (S3Object object : response.contents()) {
- String fileName = object.key();
- fileName = fileName.substring(prefix.length(),
fileName.length());
- if (checkPrefixInFile) {
- if (prefix.equals(object.key()))
+ try {
+ ListObjectsV2Iterable listObjectsIterable =
s3Client.listObjectsV2Paginator(listObjectsV2Request);
+ for (ListObjectsV2Response response : listObjectsIterable) {
+ // put all the files
+ for (S3Object object : response.contents()) {
+ String fileName = object.key();
+ fileName = fileName.substring(prefix.length());
+ if (checkPrefixInFile && prefix.equals(object.key())) {
checkPrefixInFile = false;
- else {
+ } else {
files.add(fileName);
}
- } else {
- files.add(fileName);
+ }
+
+ // put all the folders
+ for (CommonPrefix object : response.commonPrefixes()) {
+ String folderName = object.prefix();
+ folderName = folderName.substring(prefix.length());
+ folders.add(
+ folderName.endsWith("/") ? folderName.substring(0,
folderName.length() - 1) : folderName);
}
}
- // put all the folders
- for (CommonPrefix object : response.commonPrefixes()) {
- String folderName = object.prefix();
- folderName = folderName.substring(prefix.length(),
folderName.length());
- folders.add(folderName.endsWith("/") ? folderName.substring(0,
folderName.length() - 1) : folderName);
- }
+ } finally {
+ AwsUtils.closeClients(awsClients);
}
+
allObjects.put("files", files);
allObjects.put("folders", folders);
return allObjects;
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20493?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: I6a3c755f94d377b443f21594443fac830875610e
Gerrit-Change-Number: 20493
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>