>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20555?usp=email )

Change subject: [NO ISSUR][EXT]: fix reading parquet with assume role auth
......................................................................

[NO ISSUR][EXT]: fix reading parquet with assume role auth

Ext-ref: MB-69108
Change-Id: Iaa7c38b00bf90f2388ba7b1a6555447ae8826df3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20555
Integration-Tests: Jenkins <[email protected]>
Tested-by: Hussain Towaileb <[email protected]>
Reviewed-by: Michael Blow <[email protected]>
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
A 
asterixdb/asterix-external-data/src/main/java/software/amazon/awssdk/thirdparty/org/apache/http/client/utils/URIBuilder.java
4 files changed, 115 insertions(+), 20 deletions(-)

Approvals:
  Michael Blow: Looks good to me, approved
  Jenkins: Verified
  Hussain Towaileb: Verified




diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
index 974eceb..217a138 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java
@@ -83,8 +83,9 @@
                 SdkRequest req = context.request();
                 SdkResponse resp = context.response();
                 if (req instanceof AssumeRoleRequest assumeReq && resp 
instanceof AssumeRoleResponse assumeResp) {
-                    LOGGER.info("STS refresh success [Thread={}, Role={}, 
Expiry={}]", Thread.currentThread().getName(),
+                    LOGGER.debug("STS refresh success [Role={}, Session={}, 
Expiry={}]",
                             assumeReq.roleArn(),
+                            assumeReq.roleSessionName(),
                             assumeResp.credentials().expiration());
                 }
                 ExecutionInterceptor.super.afterExecution(context, 
executionAttributes);
@@ -140,6 +141,11 @@
     }

     public static AuthenticationType getAuthenticationType(Map<String, String> 
configuration) {
+        return getAuthenticationType(configuration, false);
+    }
+
+    public static AuthenticationType getAuthenticationType(Map<String, String> 
configuration,
+            boolean credentialsToAssumeRole) {
         String roleArn = configuration.get(ROLE_ARN_FIELD_NAME);
         String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
         String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
@@ -147,7 +153,7 @@

         if (noAuth(configuration)) {
             return AuthenticationType.ANONYMOUS;
-        } else if (roleArn != null) {
+        } else if (roleArn != null && !credentialsToAssumeRole) {
             return AuthenticationType.ARN_ASSUME_ROLE;
         } else if (instanceProfile != null) {
             return AuthenticationType.INSTANCE_PROFILE;
@@ -214,10 +220,11 @@
         awsClients.setStsClient(stsClient);

         // build refresh role request
+        String roleArn = configuration.get(ROLE_ARN_FIELD_NAME);
         String sessionName = UUID.randomUUID().toString();
-        LOGGER.info("Assuming role with session name ({}) for ({})", 
sessionName, Thread.currentThread().getName());
+        LOGGER.debug("Assuming RoleArn ({}) and SessionName ({})", roleArn, 
sessionName);
         AssumeRoleRequest.Builder refreshRequestBuilder = 
AssumeRoleRequest.builder();
-        refreshRequestBuilder.roleArn(configuration.get(ROLE_ARN_FIELD_NAME));
+        refreshRequestBuilder.roleArn(roleArn);
         
refreshRequestBuilder.externalId(configuration.get(EXTERNAL_ID_FIELD_NAME));
         refreshRequestBuilder.roleSessionName(sessionName);
         if (appCtx != null) {
@@ -335,6 +342,11 @@
         return UUID.randomUUID().toString();
     }

+    public static String buildStsUri(String regionId) {
+        // hadoop expects the endpoint without scheme, it automatically 
prepends "https://";
+        return "sts." + regionId + ".amazonaws.com";
+    }
+
     public static void closeClients(CloseableAwsClients clients) {
         if (clients == null) {
             return;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
index dbddb38..2701c36 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -31,16 +31,22 @@
     /*
      * Hadoop-AWS
      */
+    // assume role
     public static final String HADOOP_ASSUME_ROLE_ARN = 
"fs.s3a.assumed.role.arn";
     public static final String HADOOP_ASSUME_ROLE_EXTERNAL_ID = 
"fs.s3a.assumed.role.external.id";
     public static final String HADOOP_ASSUME_ROLE_SESSION_NAME = 
"fs.s3a.assumed.role.session.name";
     public static final String HADOOP_ASSUME_ROLE_SESSION_DURATION = 
"fs.s3a.assumed.role.session.duration";
+    public static final String HADOOP_ASSUME_ROLE_ENDPOINT = 
"fs.s3a.assumed.role.sts.endpoint";
+    public static final String HADOOP_ASSUME_ROLE_REGION = 
"fs.s3a.assumed.role.sts.endpoint.region";
+
+    // basic keys
     public static final String HADOOP_ACCESS_KEY_ID = "fs.s3a.access.key";
     public static final String HADOOP_SECRET_ACCESS_KEY = "fs.s3a.secret.key";
     public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token";
-    public static final String HADOOP_REGION = "fs.s3a.region";
+
+    // regions and endpoints
     public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint";
-    public static final String HADOOP_S3_FILESYSTEM_IMPLEMENTATION = 
"fs.s3a.impl";
+    public static final String HADOOP_REGION = "fs.s3a.endpoint.region";

     /*
      * Internal configurations
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 6484b20..d4201be 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.util.aws.s3;

 import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
+import static 
org.apache.asterix.common.exceptions.ErrorCode.LONG_LIVED_CREDENTIALS_NEEDED_TO_ASSUME_ROLE;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
 import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
@@ -36,6 +37,7 @@
 import static 
org.apache.asterix.external.util.aws.AwsConstants.SERVICE_END_POINT_FIELD_NAME;
 import static 
org.apache.asterix.external.util.aws.AwsConstants.SESSION_TOKEN_FIELD_NAME;
 import static 
org.apache.asterix.external.util.aws.AwsUtils.buildCredentialsProvider;
+import static org.apache.asterix.external.util.aws.AwsUtils.buildStsUri;
 import static 
org.apache.asterix.external.util.aws.AwsUtils.getAuthenticationType;
 import static 
org.apache.asterix.external.util.aws.AwsUtils.validateAndGetCrossRegion;
 import static 
org.apache.asterix.external.util.aws.AwsUtils.validateAndGetRegion;
@@ -45,7 +47,9 @@
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUMED_ROLE;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_ARN;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_ENDPOINT;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_EXTERNAL_ID;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_REGION;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY;
@@ -80,13 +84,14 @@
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.aws.AwsUtils;
 import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
-import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;

 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.core.exception.SdkException;
@@ -104,6 +109,8 @@
 import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;

 public class S3Utils {
+    private static final Logger LOGGER = LogManager.getLogger();
+
     private S3Utils() {
         throw new AssertionError("do not instantiate");
     }
@@ -152,16 +159,21 @@
      */
     public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, 
JobConf jobConf,
             Map<String, String> configuration, int numberOfPartitions) throws 
CompilationException {
-        setHadoopCredentials(appCtx, jobConf, configuration);
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
         Region region = 
validateAndGetRegion(configuration.get(REGION_FIELD_NAME));
-        jobConf.set(HADOOP_REGION, region.toString());
+        boolean crossRegion = 
validateAndGetCrossRegion(configuration.get(CROSS_REGION_FIELD_NAME));
+
+        // if region is set, hadoop will always try the specified region only, 
if bucket is not found, it will fail
+        // if we want to use cross-region, we do not set the endpoint, which 
will default to central region and will
+        // automatically detect the bucket
+        if (!crossRegion) {
+            jobConf.set(HADOOP_REGION, region.toString());
+        }
+        setHadoopCredentials(appCtx, jobConf, configuration, 
region.toString());
+
+        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
         if (serviceEndpoint != null) {
             // Validation of the URL should be done at hadoop-aws level
             jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        } else {
-            //Region is ignored and buckets could be found by the central 
endpoint
-            jobConf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
         }

         boolean pathStyleAddressing =
@@ -187,7 +199,7 @@
      * @param configuration external details configuration
      */
     private static void setHadoopCredentials(IApplicationContext appCtx, 
JobConf jobConf,
-            Map<String, String> configuration) throws CompilationException {
+            Map<String, String> configuration, String region) throws 
CompilationException {
         AwsUtils.AuthenticationType authenticationType = 
getAuthenticationType(configuration);
         switch (authenticationType) {
             case ANONYMOUS:
@@ -195,19 +207,35 @@
                 break;
             case ARN_ASSUME_ROLE:
                 jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, 
HADOOP_ASSUMED_ROLE);
-                jobConf.set(HADOOP_ASSUME_ROLE_ARN, 
configuration.get(ROLE_ARN_FIELD_NAME));
                 jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, 
configuration.get(EXTERNAL_ID_FIELD_NAME));
-                jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, "parquet-" + 
UUID.randomUUID());
+                jobConf.set(HADOOP_ASSUME_ROLE_REGION, region);
+                jobConf.set(HADOOP_ASSUME_ROLE_ENDPOINT, buildStsUri(region));
+
+                String roleArn = configuration.get(ROLE_ARN_FIELD_NAME);
+                String sessionName = UUID.randomUUID().toString();
+                jobConf.set(HADOOP_ASSUME_ROLE_ARN, roleArn);
+                jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, sessionName);
+                LOGGER.debug("Assuming RoleArn ({}) and SessionName ({})", 
roleArn, sessionName);

                 // hadoop accepts time 15m to 1h, we will base it on the 
provided configuration
                 int durationInSeconds = 
appCtx.getExternalProperties().getAwsAssumeRoleDuration();
                 String hadoopDuration = getHadoopDuration(durationInSeconds);
                 jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, 
hadoopDuration);

-                // TODO: this assumes basic keys always, also support if we 
use InstanceProfile to assume a role
-                jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, 
HADOOP_SIMPLE);
-                jobConf.set(HADOOP_ACCESS_KEY_ID, 
configuration.get(ACCESS_KEY_ID_FIELD_NAME));
-                jobConf.set(HADOOP_SECRET_ACCESS_KEY, 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME));
+                // credentials used to assume the role
+                AwsUtils.AuthenticationType assumeRoleCredsType = 
AwsUtils.getAuthenticationType(configuration, true);
+                switch (assumeRoleCredsType) {
+                    case INSTANCE_PROFILE:
+                        jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, 
HADOOP_INSTANCE_PROFILE);
+                        break;
+                    case ACCESS_KEYS:
+                        jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, 
HADOOP_SIMPLE);
+                        jobConf.set(HADOOP_ACCESS_KEY_ID, 
configuration.get(ACCESS_KEY_ID_FIELD_NAME));
+                        jobConf.set(HADOOP_SECRET_ACCESS_KEY, 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME));
+                        break;
+                    default:
+                        throw 
CompilationException.create(LONG_LIVED_CREDENTIALS_NEEDED_TO_ASSUME_ROLE);
+                }
                 break;
             case INSTANCE_PROFILE:
                 jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, 
HADOOP_INSTANCE_PROFILE);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/software/amazon/awssdk/thirdparty/org/apache/http/client/utils/URIBuilder.java
 
b/asterixdb/asterix-external-data/src/main/java/software/amazon/awssdk/thirdparty/org/apache/http/client/utils/URIBuilder.java
new file mode 100644
index 0000000..47a97f8
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/software/amazon/awssdk/thirdparty/org/apache/http/client/utils/URIBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package software.amazon.awssdk.thirdparty.org.apache.http.client.utils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * This class is introduced to avoid direct dependency on Apache HttpClient 
that is shaded in aws-java-sdk-bundle
+ * which is huge in size and comes with few CVEs, considering this is all we 
need from that bundle. This can be
+ * removed after upgrading hadoop to a version that includes the change that 
relies on Java's URI instead of the
+ * shaded URI builder dependency.
+ *
+ * See the following for more details:
+ * https://issues.apache.org/jira/browse/HADOOP-19282
+ */
+public class URIBuilder {
+    private final org.apache.http.client.utils.URIBuilder uriBuilder = new 
org.apache.http.client.utils.URIBuilder();
+
+    public URIBuilder setHost(String host) {
+        uriBuilder.setHost(host);
+        return this;
+    }
+
+    public URIBuilder setScheme(String scheme) {
+        uriBuilder.setScheme(scheme);
+        return this;
+    }
+
+    public URI build() throws URISyntaxException {
+        return uriBuilder.build();
+    }
+}
\ No newline at end of file

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20555?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Iaa7c38b00bf90f2388ba7b1a6555447ae8826df3
Gerrit-Change-Number: 20555
Gerrit-PatchSet: 3
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>

Reply via email to