>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20493?usp=email )


Change subject: [ASTERIXDB-3659][EXT]: delegate assume role auth to AWS SDK
......................................................................

[ASTERIXDB-3659][EXT]: delegate assume role auth to AWS SDK

Ext-ref: MB-68987
Change-Id: I6a3c755f94d377b443f21594443fac830875610e
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M asterixdb/asterix-external-data/pom.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
6 files changed, 243 insertions(+), 94 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/93/20493/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index c9fd485..defffe0 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -50,6 +50,8 @@
 import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.util.aws.AwsUtils;
+import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.util.IoUtil;
@@ -63,6 +65,7 @@
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;

+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.sync.RequestBody;
 import software.amazon.awssdk.http.SdkHttpClient;
@@ -90,6 +93,7 @@
 public final class S3CloudClient implements ICloudClient {
     private static final Logger LOGGER = LogManager.getLogger();
     private final S3ClientConfig config;
+    private final CloseableAwsClients awsClients;
     private final S3Client s3Client;
     private final ICloudGuardian guardian;
     private final IRequestProfilerLimiter profiler;
@@ -99,9 +103,10 @@
         this(config, buildClient(config), guardian);
     }

-    public S3CloudClient(S3ClientConfig config, S3Client s3Client, 
ICloudGuardian guardian) {
+    public S3CloudClient(S3ClientConfig config, CloseableAwsClients 
awsClients, ICloudGuardian guardian) {
         this.config = config;
-        this.s3Client = s3Client;
+        this.awsClients = awsClients;
+        this.s3Client = (S3Client) awsClients.getConsumingClient();
         this.guardian = guardian;
         this.writeBufferSize = config.getWriteBufferSize();
         long profilerInterval = config.getProfilerLogInterval();
@@ -344,7 +349,7 @@

     @Override
     public void close() {
-        s3Client.close();
+        AwsUtils.closeClients(awsClients);
     }

     @Override
@@ -359,9 +364,11 @@
         return new S3BufferedWriter(s3Client, profiler, guardian, bucket, 
config.getPrefix() + path);
     }

-    private static S3Client buildClient(S3ClientConfig config) {
+    private static CloseableAwsClients buildClient(S3ClientConfig config) {
+        CloseableAwsClients awsClients = new CloseableAwsClients();
         S3ClientBuilder builder = S3Client.builder();
-        builder.credentialsProvider(config.createCredentialsProvider());
+        AwsCredentialsProvider credentialsProvider = 
config.createCredentialsProvider();
+        builder.credentialsProvider(credentialsProvider);
         builder.region(Region.of(config.getRegion()));
         builder.forcePathStyle(config.isForcePathStyle());

@@ -386,7 +393,10 @@
         }
         SdkHttpClient httpClient = 
ApacheHttpClient.builder().buildWithDefaults(customHttpConfigBuilder.build());
         builder.httpClient(httpClient);
-        return builder.build();
+
+        awsClients.setConsumingClient(builder.build());
+        awsClients.setCredentialsProvider(credentialsProvider);
+        return awsClients;
     }

     private Set<CloudFile> filterAndGet(List<S3Object> contents, 
FilenameFilter filter) {
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index f3be28f..520642f 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -461,6 +461,10 @@
     </dependency>
     <dependency>
       <groupId>software.amazon.awssdk</groupId>
+      <artifactId>glue</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
       <artifactId>regions</artifactId>
     </dependency>
     <dependency>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index bea0966..ded052b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -36,6 +36,7 @@
 import 
org.apache.asterix.external.input.record.reader.stream.AvailableInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.AwsUtils;
+import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -53,15 +54,18 @@
     private static final int MAX_RETRIES = 5; // We will retry 5 times in case 
of internal error from AWS S3 service
     private final IApplicationContext ncAppCtx;
     private final String bucket;
-    private S3Client s3Client;
+    private final CloseableAwsClients awsClients;
+    private final S3Client s3Client;
     private ResponseInputStream<?> s3InStream;

     public AwsS3InputStream(IApplicationContext ncAppCtx, Map<String, String> 
configuration, List<String> filePaths,
             IExternalFilterValueEmbedder valueEmbedder) throws 
HyracksDataException {
         super(configuration, filePaths, valueEmbedder);
         this.ncAppCtx = ncAppCtx;
-        this.s3Client = buildAwsS3Client(configuration);
         this.bucket = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+        this.awsClients = buildAwsS3Client(configuration);
+        this.s3Client = (S3Client) awsClients.getConsumingClient();
     }

     @Override
@@ -97,11 +101,7 @@
                 LOGGER.debug(() -> "Key " + userData(request.key()) + " was 
not found in bucket {}" + request.bucket());
                 return false;
             } catch (S3Exception ex) {
-                if (AwsUtils.isArnAssumedRoleExpiredToken(configuration, 
ex.awsErrorDetails().errorCode())) {
-                    LOGGER.debug(() -> "Expired AWS assume role session, will 
attempt to refresh the session");
-                    rebuildAwsS3Client(configuration);
-                    LOGGER.debug(() -> "Successfully refreshed AWS assume role 
session");
-                } else if (shouldRetry(ex.awsErrorDetails().errorCode(), 
retries++)) {
+                if (shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
                     LOGGER.debug(() -> "S3 retryable error: " + 
userData(ex.getMessage()));
                 } else {
                     throw 
RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
@@ -132,9 +132,7 @@
             }
             CleanupUtils.close(in, null);
         }
-        if (s3Client != null) {
-            CleanupUtils.close(s3Client, null);
-        }
+        AwsUtils.closeClients(awsClients);
     }

     @Override
@@ -147,15 +145,11 @@
         return false;
     }

-    private S3Client buildAwsS3Client(Map<String, String> configuration) 
throws HyracksDataException {
+    private CloseableAwsClients buildAwsS3Client(Map<String, String> 
configuration) throws HyracksDataException {
         try {
             return S3Utils.buildClient(ncAppCtx, configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
     }
-
-    private void rebuildAwsS3Client(Map<String, String> configuration) throws 
HyracksDataException {
-        s3Client = buildAwsS3Client(configuration);
-    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
index a648853..f3d7613 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
@@ -24,7 +24,6 @@
 import static 
org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
 import static 
org.apache.asterix.external.util.aws.AwsConstants.ACCESS_KEY_ID_FIELD_NAME;
 import static 
org.apache.asterix.external.util.aws.AwsConstants.CROSS_REGION_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.AwsConstants.ERROR_EXPIRED_TOKEN;
 import static 
org.apache.asterix.external.util.aws.AwsConstants.EXTERNAL_ID_FIELD_NAME;
 import static 
org.apache.asterix.external.util.aws.AwsConstants.INSTANCE_PROFILE_FIELD_NAME;
 import static 
org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME;
@@ -33,6 +32,7 @@
 import static 
org.apache.asterix.external.util.aws.AwsConstants.SESSION_TOKEN_FIELD_NAME;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -41,10 +41,7 @@
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.external.IExternalCredentialsCache;
-import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;

 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -52,10 +49,13 @@
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 import 
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.AwsClient;
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
 import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
 import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
 import software.amazon.awssdk.services.sts.model.Credentials;
@@ -74,19 +74,14 @@
         throw new AssertionError("do not instantiate");
     }

-    public static boolean isArnAssumedRoleExpiredToken(Map<String, String> 
configuration, String errorCode) {
-        return ERROR_EXPIRED_TOKEN.equals(errorCode)
-                && getAuthenticationType(configuration) == 
AuthenticationType.ARN_ASSUME_ROLE;
-    }
-
     public static AwsCredentialsProvider 
buildCredentialsProvider(IApplicationContext appCtx,
-            Map<String, String> configuration) throws CompilationException {
+            Map<String, String> configuration, CloseableAwsClients awsClients) 
throws CompilationException {
         AuthenticationType authenticationType = 
getAuthenticationType(configuration);
         switch (authenticationType) {
             case ANONYMOUS:
                 return AnonymousCredentialsProvider.create();
             case ARN_ASSUME_ROLE:
-                return getTrustAccountCredentials(appCtx, configuration);
+                return getTrustAccountCredentials(appCtx, configuration, 
awsClients);
             case INSTANCE_PROFILE:
                 return getInstanceProfileCredentials(configuration);
             case ACCESS_KEYS:
@@ -157,21 +152,29 @@
      * @throws CompilationException CompilationException
      */
     public static AwsCredentialsProvider 
getTrustAccountCredentials(IApplicationContext appCtx,
-            Map<String, String> configuration) throws CompilationException {
-        IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
-        Object credentialsObject = 
cache.get(configuration.get(ExternalDataConstants.KEY_ENTITY_ID));
-        if (credentialsObject != null) {
-            return () -> (AwsSessionCredentials) credentialsObject;
-        }
-        IExternalCredentialsCacheUpdater cacheUpdater = 
appCtx.getExternalCredentialsCacheUpdater();
-        AwsSessionCredentials credentials;
-        try {
-            credentials = (AwsSessionCredentials) 
cacheUpdater.generateAndCacheCredentials(configuration);
-        } catch (HyracksDataException ex) {
-            throw new 
CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION, 
ex, ex.getMessage());
-        }
+            Map<String, String> configuration, CloseableAwsClients awsClients) 
throws CompilationException {
+        AwsCredentialsProvider credentialsToAssumeRole = 
getCredentialsToAssumeRole(configuration);

-        return () -> credentials;
+        // build sts client used for assuming role
+        StsClientBuilder stsClientBuilder = StsClient.builder();
+        stsClientBuilder.credentialsProvider(credentialsToAssumeRole);
+        
stsClientBuilder.region(validateAndGetRegion(configuration.get(REGION_FIELD_NAME)));
+        StsClient stsClient = stsClientBuilder.build();
+        awsClients.setStsClient(stsClient);
+
+        AssumeRoleRequest.Builder refreshRequestBuilder = 
AssumeRoleRequest.builder();
+        refreshRequestBuilder.roleArn(configuration.get(ROLE_ARN_FIELD_NAME));
+        
refreshRequestBuilder.externalId(configuration.get(EXTERNAL_ID_FIELD_NAME));
+        refreshRequestBuilder.roleSessionName(UUID.randomUUID().toString());
+        refreshRequestBuilder.durationSeconds((int) 
Duration.ofHours(1).toSeconds());
+
+        StsAssumeRoleCredentialsProvider.Builder builder = 
StsAssumeRoleCredentialsProvider.builder();
+        builder.refreshRequest(refreshRequestBuilder.build());
+        builder.stsClient(stsClient);
+        StsAssumeRoleCredentialsProvider credentialsProvider = builder.build();
+        awsClients.setCredentialsProvider(credentialsProvider);
+
+        return credentialsProvider;
     }

     /**
@@ -283,4 +286,57 @@
     public static String generateExternalId() {
         return UUID.randomUUID().toString();
     }
+
+    public static void closeClients(CloseableAwsClients clients) {
+        if (clients == null) {
+            return;
+        }
+
+        if (clients.getConsumingClient() != null) {
+            CleanupUtils.close(clients.getConsumingClient(), null);
+        }
+
+        if (clients.getStsClient() != null) {
+            CleanupUtils.close(clients.getStsClient(), null);
+        }
+
+        AwsCredentialsProvider credentialsProvider = 
clients.getCredentialsProvider();
+        if (credentialsProvider instanceof StsAssumeRoleCredentialsProvider 
assumeRoleCredentialsProvider) {
+            CleanupUtils.close(assumeRoleCredentialsProvider, null);
+        }
+    }
+
+    public static class CloseableAwsClients {
+        private AwsClient consumingClient;
+        private StsClient stsClient;
+        private AwsCredentialsProvider credentialsProvider;
+
+        public CloseableAwsClients() {
+
+        }
+
+        public AwsClient getConsumingClient() {
+            return consumingClient;
+        }
+
+        public StsClient getStsClient() {
+            return stsClient;
+        }
+
+        public AwsCredentialsProvider getCredentialsProvider() {
+            return credentialsProvider;
+        }
+
+        public void setConsumingClient(AwsClient consumingClient) {
+            this.consumingClient = consumingClient;
+        }
+
+        public void setStsClient(StsClient stsClient) {
+            this.stsClient = stsClient;
+        }
+
+        public void setCredentialsProvider(AwsCredentialsProvider 
credentialsProvider) {
+            this.credentialsProvider = credentialsProvider;
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
new file mode 100644
index 0000000..f4e52b0
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws.glue;
+
+import static 
org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.SERVICE_END_POINT_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsUtils.buildCredentialsProvider;
+import static 
org.apache.asterix.external.util.aws.AwsUtils.validateAndGetRegion;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.GlueClientBuilder;
+
+public class GlueUtils {
+    private GlueUtils() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    /**
+     * Builds the S3 client using the provided configuration
+     *
+     * @param configuration properties
+     * @return S3 client
+     * @throws CompilationException CompilationException
+     */
+    public static CloseableAwsClients buildClient(IApplicationContext appCtx, 
Map<String, String> configuration)
+            throws CompilationException {
+        CloseableAwsClients awsClients = new CloseableAwsClients();
+        String regionId = configuration.get(REGION_FIELD_NAME);
+        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+        Region region = validateAndGetRegion(regionId);
+        AwsCredentialsProvider credentialsProvider = 
buildCredentialsProvider(appCtx, configuration, awsClients);
+
+        GlueClientBuilder builder = GlueClient.builder();
+        builder.region(region);
+        builder.credentialsProvider(credentialsProvider);
+
+        // Validate the service endpoint if present
+        if (serviceEndpoint != null) {
+            try {
+                URI uri = new URI(serviceEndpoint);
+                try {
+                    builder.endpointOverride(uri);
+                } catch (NullPointerException ex) {
+                    throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
+                }
+            } catch (URISyntaxException ex) {
+                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
+                        String.format("Invalid service endpoint %s", 
serviceEndpoint));
+            }
+        }
+
+        awsClients.setConsumingClient(builder.build());
+        return awsClients;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index e20e39f..46dbfae 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -79,6 +79,7 @@
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.aws.AwsUtils;
+import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -86,7 +87,6 @@
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.api.util.CleanupUtils;

 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.core.exception.SdkException;
@@ -115,14 +115,15 @@
      * @return S3 client
      * @throws CompilationException CompilationException
      */
-    public static S3Client buildClient(IApplicationContext appCtx, Map<String, 
String> configuration)
+    public static CloseableAwsClients buildClient(IApplicationContext appCtx, 
Map<String, String> configuration)
             throws CompilationException {
+        CloseableAwsClients awsClients = new CloseableAwsClients();
         String regionId = configuration.get(REGION_FIELD_NAME);
         String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);

         Region region = validateAndGetRegion(regionId);
         boolean crossRegion = 
validateAndGetCrossRegion(configuration.get(CROSS_REGION_FIELD_NAME));
-        AwsCredentialsProvider credentialsProvider = 
buildCredentialsProvider(appCtx, configuration);
+        AwsCredentialsProvider credentialsProvider = 
buildCredentialsProvider(appCtx, configuration, awsClients);

         S3ClientBuilder builder = S3Client.builder();
         builder.region(region);
@@ -147,7 +148,8 @@
         boolean pathStyleAddressing =
                 
validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME),
 serviceEndpoint);
         builder.forcePathStyle(pathStyleAddressing);
-        return builder.build();
+        awsClients.setConsumingClient(builder.build());
+        return awsClients;
     }

     public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, 
JobConf conf,
@@ -293,7 +295,8 @@
         }

         // Check if the bucket is present
-        S3Client s3Client = buildClient(appCtx, configuration);
+        CloseableAwsClients awsClients = buildClient(appCtx, configuration);
+        S3Client s3Client = (S3Client) awsClients.getConsumingClient();
         S3Response response;
         boolean useOldApi = false;
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -317,9 +320,7 @@
         } catch (SdkException ex) {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
         } finally {
-            if (s3Client != null) {
-                CleanupUtils.close(s3Client, null);
-            }
+            AwsUtils.closeClients(awsClients);
         }

         boolean isEmpty = useOldApi ? ((ListObjectsResponse) 
response).contents().isEmpty()
@@ -379,7 +380,8 @@
         // Prepare to retrieve the objects
         List<S3Object> filesOnly;
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        S3Client s3Client = buildClient(appCtx, configuration);
+        CloseableAwsClients awsClients = buildClient(appCtx, configuration);
+        S3Client s3Client = (S3Client) awsClients.getConsumingClient();
         String prefix = getPrefix(configuration);

         try {
@@ -401,9 +403,7 @@
         } catch (SdkException ex) {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
         } finally {
-            if (s3Client != null) {
-                CleanupUtils.close(s3Client, null);
-            }
+            AwsUtils.closeClients(awsClients);
         }

         // Warn if no files are returned
@@ -522,56 +522,57 @@
         }
     }

-    public static Map<String, List<String>> 
S3ObjectsOfSingleDepth(IApplicationContext appCtx,
-            Map<String, String> configuration, String container, String prefix)
-            throws CompilationException, HyracksDataException {
-        // create s3 client
-        S3Client s3Client = buildClient(appCtx, configuration);
-        // fetch all the s3 objects
-        return listS3ObjectsOfSingleDepth(s3Client, container, prefix);
-    }
-
     /**
      * Uses the latest API to retrieve the objects from the storage of a 
single level.
      *
-     * @param s3Client              S3 client
-     * @param container             container name
-     * @param prefix                definition prefix
+     * @param appCtx application context
+     * @param configuration configuration
+     * @param container container name
+     * @param prefix definition prefix
      */
-    private static Map<String, List<String>> 
listS3ObjectsOfSingleDepth(S3Client s3Client, String container,
-            String prefix) {
-        Map<String, List<String>> allObjects = new HashMap<>();
-        ListObjectsV2Iterable listObjectsInterable;
-        ListObjectsV2Request.Builder listObjectsBuilder =
-                
ListObjectsV2Request.builder().bucket(container).prefix(prefix).delimiter("/");
+    public static Map<String, List<String>> 
listS3ObjectsOfSingleDepth(IApplicationContext appCtx,
+            Map<String, String> configuration, String container, String 
prefix) throws CompilationException {
+        CloseableAwsClients awsClients = buildClient(appCtx, configuration);
+        S3Client s3Client = (S3Client) awsClients.getConsumingClient();
+
+        ListObjectsV2Request.Builder listObjectsBuilder = 
ListObjectsV2Request.builder();
+        listObjectsBuilder.bucket(container);
         listObjectsBuilder.prefix(prefix);
+        listObjectsBuilder.delimiter("/");
+        ListObjectsV2Request listObjectsV2Request = listObjectsBuilder.build();
+
+        Map<String, List<String>> allObjects = new HashMap<>();
         List<String> files = new ArrayList<>();
         List<String> folders = new ArrayList<>();
+
         // to skip the prefix as a file from the response
         boolean checkPrefixInFile = true;
-        listObjectsInterable = 
s3Client.listObjectsV2Paginator(listObjectsBuilder.build());
-        for (ListObjectsV2Response response : listObjectsInterable) {
-            // put all the files
-            for (S3Object object : response.contents()) {
-                String fileName = object.key();
-                fileName = fileName.substring(prefix.length(), 
fileName.length());
-                if (checkPrefixInFile) {
-                    if (prefix.equals(object.key()))
+        try {
+            ListObjectsV2Iterable listObjectsIterable = 
s3Client.listObjectsV2Paginator(listObjectsV2Request);
+            for (ListObjectsV2Response response : listObjectsIterable) {
+                // put all the files
+                for (S3Object object : response.contents()) {
+                    String fileName = object.key();
+                    fileName = fileName.substring(prefix.length());
+                    if (checkPrefixInFile && prefix.equals(object.key())) {
                         checkPrefixInFile = false;
-                    else {
+                    } else {
                         files.add(fileName);
                     }
-                } else {
-                    files.add(fileName);
+                }
+
+                // put all the folders
+                for (CommonPrefix object : response.commonPrefixes()) {
+                    String folderName = object.prefix();
+                    folderName = folderName.substring(prefix.length());
+                    folders.add(
+                            folderName.endsWith("/") ? folderName.substring(0, 
folderName.length() - 1) : folderName);
                 }
             }
-            // put all the folders
-            for (CommonPrefix object : response.commonPrefixes()) {
-                String folderName = object.prefix();
-                folderName = folderName.substring(prefix.length(), 
folderName.length());
-                folders.add(folderName.endsWith("/") ? folderName.substring(0, 
folderName.length() - 1) : folderName);
-            }
+        } finally {
+            AwsUtils.closeClients(awsClients);
         }
+
         allObjects.put("files", files);
         allObjects.put("folders", folders);
         return allObjects;

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20493?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: I6a3c755f94d377b443f21594443fac830875610e
Gerrit-Change-Number: 20493
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>

Reply via email to