>From Michael Blow <[email protected]>:

Michael Blow has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19126 )

Change subject: Merge branch 'gerrit/goldfish' into 'master'
......................................................................

Merge branch 'gerrit/goldfish' into 'master'

Change-Id: Ia6f1478cc62fb07c2eba3c6c9828b568c43a3cc0
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
3 files changed, 30 insertions(+), 263 deletions(-)

Approvals:
  Jenkins: Verified
  Michael Blow: Looks good to me, approved; Verified




diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 2ba1844..f36d25d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -24,6 +24,7 @@
 import static 
org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
 import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
@@ -468,5 +469,8 @@
         if (!response.sdkHttpResponse().isSuccessful()) {
             throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
         }
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableExists(configuration);
+        }
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 4453e68..a2b50e1 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -19,15 +19,6 @@
 package org.apache.asterix.external.util.aws.s3;

 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
-<<<<<<< HEAD   (d21a38 [ASTERIXDB-3514][EXT]: Assume role only when temporary 
crede)
-=======
-import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
-import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
-import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
-import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
->>>>>>> BRANCH (7026fa Merge "Merge branch 'gerrit/trinity' into 
'gerrit/goldfish'")
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

@@ -68,260 +59,6 @@
         throw new AssertionError("do not instantiate");
     }

-<<<<<<< HEAD   (d21a38 [ASTERIXDB-3514][EXT]: Assume role only when temporary 
crede)
-=======
-    public static boolean isRetryableError(String errorCode) {
-        return errorCode.equals(ERROR_INTERNAL_ERROR) || 
errorCode.equals(ERROR_SLOW_DOWN);
-    }
-
-    /**
-     * Builds the S3 client using the provided configuration
-     *
-     * @param configuration properties
-     * @return S3 client
-     * @throws CompilationException CompilationException
-     */
-    public static S3Client buildAwsS3Client(Map<String, String> configuration) 
throws CompilationException {
-        // TODO(Hussain): Need to ensure that all required parameters are 
present in a previous step
-        String instanceProfile = 
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-        String regionId = configuration.get(REGION_FIELD_NAME);
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
-        S3ClientBuilder builder = S3Client.builder();
-
-        // Credentials
-        AwsCredentialsProvider credentialsProvider =
-                buildCredentialsProvider(instanceProfile, accessKeyId, 
secretAccessKey, sessionToken);
-
-        builder.credentialsProvider(credentialsProvider);
-
-        // Validate the region
-        List<Region> regions = S3Client.serviceMetadata().regions();
-        Optional<Region> selectedRegion = regions.stream().filter(region -> 
region.id().equals(regionId)).findFirst();
-
-        if (selectedRegion.isEmpty()) {
-            throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
-        }
-        builder.region(selectedRegion.get());
-
-        // Validate the service endpoint if present
-        if (serviceEndpoint != null) {
-            try {
-                URI uri = new URI(serviceEndpoint);
-                try {
-                    builder.endpointOverride(uri);
-                } catch (NullPointerException ex) {
-                    throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
-                }
-            } catch (URISyntaxException ex) {
-                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
-                        String.format("Invalid service endpoint %s", 
serviceEndpoint));
-            }
-        }
-
-        return builder.build();
-    }
-
-    public static AwsCredentialsProvider buildCredentialsProvider(String 
instanceProfile, String accessKeyId,
-            String secretAccessKey, String sessionToken) throws 
CompilationException {
-
-        // Credentials
-        AwsCredentialsProvider credentialsProvider;
-
-        // nothing provided, anonymous authentication
-        if (instanceProfile == null && accessKeyId == null && secretAccessKey 
== null && sessionToken == null) {
-            credentialsProvider = AnonymousCredentialsProvider.create();
-        } else if (instanceProfile != null) {
-
-            // only "true" value is allowed
-            if (!instanceProfile.equalsIgnoreCase("true")) {
-                throw new 
CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
INSTANCE_PROFILE_FIELD_NAME, "true");
-            }
-
-            // no other authentication parameters are allowed
-            if (accessKeyId != null) {
-                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
-                        INSTANCE_PROFILE_FIELD_NAME);
-            }
-            if (secretAccessKey != null) {
-                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, 
SECRET_ACCESS_KEY_FIELD_NAME,
-                        INSTANCE_PROFILE_FIELD_NAME);
-            }
-            if (sessionToken != null) {
-                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, 
SESSION_TOKEN_FIELD_NAME,
-                        INSTANCE_PROFILE_FIELD_NAME);
-            }
-            credentialsProvider = InstanceProfileCredentialsProvider.create();
-        } else if (accessKeyId != null || secretAccessKey != null) {
-            // accessKeyId authentication
-            if (accessKeyId == null) {
-                throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
-                        SECRET_ACCESS_KEY_FIELD_NAME);
-            }
-            if (secretAccessKey == null) {
-                throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
SECRET_ACCESS_KEY_FIELD_NAME,
-                        ACCESS_KEY_ID_FIELD_NAME);
-            }
-
-            // use session token if provided
-            if (sessionToken != null) {
-                credentialsProvider = StaticCredentialsProvider
-                        .create(AwsSessionCredentials.create(accessKeyId, 
secretAccessKey, sessionToken));
-            } else {
-                credentialsProvider =
-                        
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, 
secretAccessKey));
-            }
-        } else {
-            // if only session token is provided, accessKeyId is required
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
-                    SESSION_TOKEN_FIELD_NAME);
-        }
-        return credentialsProvider;
-    }
-
-    /**
-     * Builds the S3 client using the provided configuration
-     *
-     * @param configuration      properties
-     * @param numberOfPartitions number of partitions in the cluster
-     */
-    public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, 
String> configuration,
-            int numberOfPartitions) {
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
-        //Disable caching S3 FileSystem
-        HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
-
-        /*
-         * Authentication Methods:
-         * 1- Anonymous: no accessKeyId and no secretAccessKey
-         * 2- Temporary: has to provide accessKeyId, secretAccessKey and 
sessionToken
-         * 3- Private: has to provide accessKeyId and secretAccessKey
-         */
-        if (accessKeyId == null) {
-            //Tells hadoop-aws it is an anonymous access
-            conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
-        } else {
-            conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
-            conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
-            if (sessionToken != null) {
-                conf.set(HADOOP_SESSION_TOKEN, sessionToken);
-                //Tells hadoop-aws it is a temporary access
-                conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
-            }
-        }
-
-        /*
-         * This is to allow S3 definition to have path-style form. Should 
always be true to match the current
-         * way we access files in S3
-         */
-        conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
-
-        /*
-         * Set the size of S3 connection pool to be the number of partitions
-         */
-        conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, 
String.valueOf(numberOfPartitions));
-
-        if (serviceEndpoint != null) {
-            // Validation of the URL should be done at hadoop-aws level
-            conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        } else {
-            //Region is ignored and buckets could be found by the central 
endpoint
-            conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
-        }
-    }
-
-    /**
-     * Validate external dataset properties
-     *
-     * @param configuration properties
-     * @throws CompilationException Compilation exception
-     */
-    public static void validateProperties(Map<String, String> configuration, 
SourceLocation srcLoc,
-            IWarningCollector collector) throws CompilationException {
-        if (isDeltaTable(configuration)) {
-            validateDeltaTableProperties(configuration);
-        }
-        // check if the format property is present
-        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
-            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
-        }
-        // Both parameters should be passed, or neither should be passed (for 
anonymous/no auth)
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        if (accessKeyId == null || secretAccessKey == null) {
-            // If one is passed, the other is required
-            if (accessKeyId != null) {
-                throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
SECRET_ACCESS_KEY_FIELD_NAME,
-                        ACCESS_KEY_ID_FIELD_NAME);
-            } else if (secretAccessKey != null) {
-                throw new 
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, 
ACCESS_KEY_ID_FIELD_NAME,
-                        SECRET_ACCESS_KEY_FIELD_NAME);
-            }
-        }
-
-        validateIncludeExclude(configuration);
-        try {
-            // TODO(htowaileb): maybe something better, this will check to 
ensure type is supported before creation
-            new ExternalDataPrefix(configuration);
-        } catch (AlgebricksException ex) {
-            throw new 
CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
-        }
-
-        // Check if the bucket is present
-        S3Client s3Client = buildAwsS3Client(configuration);
-        S3Response response;
-        boolean useOldApi = false;
-        String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        String prefix = getPrefix(configuration);
-
-        try {
-            response = isBucketEmpty(s3Client, container, prefix, false);
-        } catch (S3Exception ex) {
-            // Method not implemented, try falling back to old API
-            try {
-                // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                if 
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
-                    useOldApi = true;
-                    response = isBucketEmpty(s3Client, container, prefix, 
true);
-                } else {
-                    throw ex;
-                }
-            } catch (SdkException ex2) {
-                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, 
getMessageOrToString(ex));
-            }
-        } catch (SdkException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, getMessageOrToString(ex));
-        } finally {
-            if (s3Client != null) {
-                CleanupUtils.close(s3Client, null);
-            }
-        }
-
-        boolean isEmpty = useOldApi ? ((ListObjectsResponse) 
response).contents().isEmpty()
-                : ((ListObjectsV2Response) response).contents().isEmpty();
-        if (isEmpty && collector.shouldWarn()) {
-            Warning warning = Warning.of(srcLoc, 
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-            collector.warn(warning);
-        }
-
-        // Returns 200 only in case the bucket exists, otherwise, throws an 
exception. However, to
-        // ensure coverage, check if the result is successful as well and not 
only catch exceptions
-        if (!response.sdkHttpResponse().isSuccessful()) {
-            throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
-        }
-        if (isDeltaTable(configuration)) {
-            validateDeltaTableExists(configuration);
-        }
-    }
-
->>>>>>> BRANCH (7026fa Merge "Merge branch 'gerrit/trinity' into 
'gerrit/goldfish'")
     /**
      * Checks for a single object in the specified bucket to determine if the 
bucket is empty or not.
      *

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19126
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ia6f1478cc62fb07c2eba3c6c9828b568c43a3cc0
Gerrit-Change-Number: 19126
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-MessageType: merged

Reply via email to