>From Michael Blow <[email protected]>:
Michael Blow has submitted this change. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19126 )
Change subject: Merge branch 'gerrit/goldfish' into 'master'
......................................................................
Merge branch 'gerrit/goldfish' into 'master'
Change-Id: Ia6f1478cc62fb07c2eba3c6c9828b568c43a3cc0
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
3 files changed, 30 insertions(+), 263 deletions(-)
Approvals:
Jenkins: Verified
Michael Blow: Looks good to me, approved; Verified
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 2ba1844..f36d25d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -24,6 +24,7 @@
import static
org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
import static
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
import static
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
@@ -468,5 +469,8 @@
if (!response.sdkHttpResponse().isSuccessful()) {
throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
}
+ if (isDeltaTable(configuration)) {
+ validateDeltaTableExists(configuration);
+ }
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 4453e68..a2b50e1 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -19,15 +19,6 @@
package org.apache.asterix.external.util.aws.s3;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
-<<<<<<< HEAD (d21a38 [ASTERIXDB-3514][EXT]: Assume role only when temporary
crede)
-=======
-import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
-import static
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
-import static
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
-import static
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
->>>>>>> BRANCH (7026fa Merge "Merge branch 'gerrit/trinity' into
'gerrit/goldfish'")
import static
org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
@@ -68,260 +59,6 @@
throw new AssertionError("do not instantiate");
}
-<<<<<<< HEAD (d21a38 [ASTERIXDB-3514][EXT]: Assume role only when temporary
crede)
-=======
- public static boolean isRetryableError(String errorCode) {
- return errorCode.equals(ERROR_INTERNAL_ERROR) ||
errorCode.equals(ERROR_SLOW_DOWN);
- }
-
- /**
- * Builds the S3 client using the provided configuration
- *
- * @param configuration properties
- * @return S3 client
- * @throws CompilationException CompilationException
- */
- public static S3Client buildAwsS3Client(Map<String, String> configuration)
throws CompilationException {
- // TODO(Hussain): Need to ensure that all required parameters are
present in a previous step
- String instanceProfile =
configuration.get(INSTANCE_PROFILE_FIELD_NAME);
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
- String regionId = configuration.get(REGION_FIELD_NAME);
- String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
- S3ClientBuilder builder = S3Client.builder();
-
- // Credentials
- AwsCredentialsProvider credentialsProvider =
- buildCredentialsProvider(instanceProfile, accessKeyId,
secretAccessKey, sessionToken);
-
- builder.credentialsProvider(credentialsProvider);
-
- // Validate the region
- List<Region> regions = S3Client.serviceMetadata().regions();
- Optional<Region> selectedRegion = regions.stream().filter(region ->
region.id().equals(regionId)).findFirst();
-
- if (selectedRegion.isEmpty()) {
- throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
- }
- builder.region(selectedRegion.get());
-
- // Validate the service endpoint if present
- if (serviceEndpoint != null) {
- try {
- URI uri = new URI(serviceEndpoint);
- try {
- builder.endpointOverride(uri);
- } catch (NullPointerException ex) {
- throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
getMessageOrToString(ex));
- }
- } catch (URISyntaxException ex) {
- throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
- String.format("Invalid service endpoint %s",
serviceEndpoint));
- }
- }
-
- return builder.build();
- }
-
- public static AwsCredentialsProvider buildCredentialsProvider(String
instanceProfile, String accessKeyId,
- String secretAccessKey, String sessionToken) throws
CompilationException {
-
- // Credentials
- AwsCredentialsProvider credentialsProvider;
-
- // nothing provided, anonymous authentication
- if (instanceProfile == null && accessKeyId == null && secretAccessKey
== null && sessionToken == null) {
- credentialsProvider = AnonymousCredentialsProvider.create();
- } else if (instanceProfile != null) {
-
- // only "true" value is allowed
- if (!instanceProfile.equalsIgnoreCase("true")) {
- throw new
CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
INSTANCE_PROFILE_FIELD_NAME, "true");
- }
-
- // no other authentication parameters are allowed
- if (accessKeyId != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
ACCESS_KEY_ID_FIELD_NAME,
- INSTANCE_PROFILE_FIELD_NAME);
- }
- if (secretAccessKey != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
SECRET_ACCESS_KEY_FIELD_NAME,
- INSTANCE_PROFILE_FIELD_NAME);
- }
- if (sessionToken != null) {
- throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
SESSION_TOKEN_FIELD_NAME,
- INSTANCE_PROFILE_FIELD_NAME);
- }
- credentialsProvider = InstanceProfileCredentialsProvider.create();
- } else if (accessKeyId != null || secretAccessKey != null) {
- // accessKeyId authentication
- if (accessKeyId == null) {
- throw new
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
ACCESS_KEY_ID_FIELD_NAME,
- SECRET_ACCESS_KEY_FIELD_NAME);
- }
- if (secretAccessKey == null) {
- throw new
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
SECRET_ACCESS_KEY_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME);
- }
-
- // use session token if provided
- if (sessionToken != null) {
- credentialsProvider = StaticCredentialsProvider
- .create(AwsSessionCredentials.create(accessKeyId,
secretAccessKey, sessionToken));
- } else {
- credentialsProvider =
-
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId,
secretAccessKey));
- }
- } else {
- // if only session token is provided, accessKeyId is required
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
ACCESS_KEY_ID_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- }
- return credentialsProvider;
- }
-
- /**
- * Builds the S3 client using the provided configuration
- *
- * @param configuration properties
- * @param numberOfPartitions number of partitions in the cluster
- */
- public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String,
String> configuration,
- int numberOfPartitions) {
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
- String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
- //Disable caching S3 FileSystem
- HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
-
- /*
- * Authentication Methods:
- * 1- Anonymous: no accessKeyId and no secretAccessKey
- * 2- Temporary: has to provide accessKeyId, secretAccessKey and
sessionToken
- * 3- Private: has to provide accessKeyId and secretAccessKey
- */
- if (accessKeyId == null) {
- //Tells hadoop-aws it is an anonymous access
- conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
- } else {
- conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
- conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
- if (sessionToken != null) {
- conf.set(HADOOP_SESSION_TOKEN, sessionToken);
- //Tells hadoop-aws it is a temporary access
- conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
- }
- }
-
- /*
- * This is to allow S3 definition to have path-style form. Should
always be true to match the current
- * way we access files in S3
- */
- conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
-
- /*
- * Set the size of S3 connection pool to be the number of partitions
- */
- conf.set(HADOOP_S3_CONNECTION_POOL_SIZE,
String.valueOf(numberOfPartitions));
-
- if (serviceEndpoint != null) {
- // Validation of the URL should be done at hadoop-aws level
- conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
- } else {
- //Region is ignored and buckets could be found by the central
endpoint
- conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
- }
- }
-
- /**
- * Validate external dataset properties
- *
- * @param configuration properties
- * @throws CompilationException Compilation exception
- */
- public static void validateProperties(Map<String, String> configuration,
SourceLocation srcLoc,
- IWarningCollector collector) throws CompilationException {
- if (isDeltaTable(configuration)) {
- validateDeltaTableProperties(configuration);
- }
- // check if the format property is present
- else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
srcLoc, ExternalDataConstants.KEY_FORMAT);
- }
- // Both parameters should be passed, or neither should be passed (for
anonymous/no auth)
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- if (accessKeyId == null || secretAccessKey == null) {
- // If one is passed, the other is required
- if (accessKeyId != null) {
- throw new
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
SECRET_ACCESS_KEY_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME);
- } else if (secretAccessKey != null) {
- throw new
CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT,
ACCESS_KEY_ID_FIELD_NAME,
- SECRET_ACCESS_KEY_FIELD_NAME);
- }
- }
-
- validateIncludeExclude(configuration);
- try {
- // TODO(htowaileb): maybe something better, this will check to
ensure type is supported before creation
- new ExternalDataPrefix(configuration);
- } catch (AlgebricksException ex) {
- throw new
CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
- }
-
- // Check if the bucket is present
- S3Client s3Client = buildAwsS3Client(configuration);
- S3Response response;
- boolean useOldApi = false;
- String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- String prefix = getPrefix(configuration);
-
- try {
- response = isBucketEmpty(s3Client, container, prefix, false);
- } catch (S3Exception ex) {
- // Method not implemented, try falling back to old API
- try {
- // For error code, see
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- if
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
- useOldApi = true;
- response = isBucketEmpty(s3Client, container, prefix,
true);
- } else {
- throw ex;
- }
- } catch (SdkException ex2) {
- throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2,
getMessageOrToString(ex));
- }
- } catch (SdkException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, getMessageOrToString(ex));
- } finally {
- if (s3Client != null) {
- CleanupUtils.close(s3Client, null);
- }
- }
-
- boolean isEmpty = useOldApi ? ((ListObjectsResponse)
response).contents().isEmpty()
- : ((ListObjectsV2Response) response).contents().isEmpty();
- if (isEmpty && collector.shouldWarn()) {
- Warning warning = Warning.of(srcLoc,
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- collector.warn(warning);
- }
-
- // Returns 200 only in case the bucket exists, otherwise, throws an
exception. However, to
- // ensure coverage, check if the result is successful as well and not
only catch exceptions
- if (!response.sdkHttpResponse().isSuccessful()) {
- throw new
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
- }
- if (isDeltaTable(configuration)) {
- validateDeltaTableExists(configuration);
- }
- }
-
->>>>>>> BRANCH (7026fa Merge "Merge branch 'gerrit/trinity' into
'gerrit/goldfish'")
/**
* Checks for a single object in the specified bucket to determine if the
bucket is empty or not.
*
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19126
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ia6f1478cc62fb07c2eba3c6c9828b568c43a3cc0
Gerrit-Change-Number: 19126
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-MessageType: merged