This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 9f454cdb48537e1fc0f9dd7b66ad75470a8c514a Author: Hussain Towaileb <hussain.towai...@couchbase.com> AuthorDate: Fri Feb 5 16:49:11 2021 +0300 [ASTERIXDB-2827][EXT]: S3 external dataset: properly fallback to old API - user model changes: no - storage format changes: no - interface changes: no Details: - Properly fallback to old API if the new API is not supported. if the old API fails as well, then report the error properly. Change-Id: Ib453eb396def92218951b9e45a89b6c0f48a54f6 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9844 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Hussain Towaileb <hussai...@gmail.com> Reviewed-by: Michael Blow <mb...@apache.org> --- asterixdb/NOTICE | 2 +- asterixdb/asterix-external-data/pom.xml | 4 + .../record/reader/aws/AwsS3InputStreamFactory.java | 122 ++++++++++++++++----- .../external/util/ExternalDataConstants.java | 3 +- .../asterix/external/util/ExternalDataUtils.java | 109 ++++++++++++------ asterixdb/pom.xml | 5 + hyracks-fullstack/NOTICE | 2 +- 7 files changed, 182 insertions(+), 65 deletions(-) diff --git a/asterixdb/NOTICE b/asterixdb/NOTICE index b4729a8..4aabe27 100644 --- a/asterixdb/NOTICE +++ b/asterixdb/NOTICE @@ -1,5 +1,5 @@ Apache AsterixDB -Copyright 2015-2020 The Apache Software Foundation +Copyright 2015-2021 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index 8270d71..169bcb6 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -437,6 +437,10 @@ </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> + <artifactId>aws-core</artifactId> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> <artifactId>http-client-spi</artifactId> </dependency> <dependency> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java index f3a36ff..0bc4c40 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java @@ -50,8 +50,11 @@ import org.apache.hyracks.api.util.CleanupUtils; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.S3Object; public class AwsS3InputStreamFactory implements IInputStreamFactory { @@ -88,10 +91,6 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { this.configuration = configuration; ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext(); - String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME); - - List<S3Object> filesOnly = new ArrayList<>(); - // Ensure the validity of include/exclude ExternalDataUtils.AwsS3.validateIncludeExclude(configuration); @@ -126,35 +125,24 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { p = (matchers, key) -> true; } - S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration); - // Get all objects in a bucket and extract the paths to files - ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container); - ExternalDataUtils.AwsS3.setPrefix(configuration, listObjectsBuilder); - - ListObjectsV2Response listObjectsResponse; - boolean done = false; - String newMarker = null; + List<S3Object> filesOnly; + String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME); + S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration); try { - while (!done) { - // List the objects from the start, or from the last marker in case of truncated result - if (newMarker == null) { - listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build()); + filesOnly = listS3Objects(s3Client, container, matchersList, p); + } catch (S3Exception ex) { + // New API is not implemented, try falling back to old API + try { + // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html + if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) { + filesOnly = oldApiListS3Objects(s3Client, container, matchersList, p); } else { - listObjectsResponse = - s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build()); - } - - // Collect the paths to files only - collectAndFilterFiles(listObjectsResponse.contents(), p, matchersList, filesOnly); - - // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request - if (!listObjectsResponse.isTruncated()) { - done = true; - } else { - newMarker = listObjectsResponse.nextContinuationToken(); + throw ex; } + } catch (SdkException ex2) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2.getMessage()); } } catch (SdkException ex) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); @@ -179,6 +167,84 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { } /** + * Uses the latest API to retrieve the objects from the storage. + * + * @param s3Client S3 client + * @param container container name + * @param matchersList include/exclude matchers to apply + * @param predicate predicate to use for comparison + */ + private List<S3Object> listS3Objects(S3Client s3Client, String container, List<Matcher> matchersList, + BiPredicate<List<Matcher>, String> predicate) { + String newMarker = null; + List<S3Object> filesOnly = new ArrayList<>(); + + ListObjectsV2Response listObjectsResponse; + ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container); + listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration)); + + while (true) { + // List the objects from the start, or from the last marker in case of truncated result + if (newMarker == null) { + listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build()); + } else { + listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build()); + } + + // Collect the paths to files only + collectAndFilterFiles(listObjectsResponse.contents(), predicate, matchersList, filesOnly); + + // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request + if (!listObjectsResponse.isTruncated()) { + break; + } else { + newMarker = listObjectsResponse.nextContinuationToken(); + } + } + + return filesOnly; + } + + /** + * Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage + * + * @param s3Client S3 client + * @param container container name + * @param matchersList include/exclude matchers to apply + * @param predicate predicate to use for comparison + */ + private List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, List<Matcher> matchersList, + BiPredicate<List<Matcher>, String> predicate) { + String newMarker = null; + List<S3Object> filesOnly = new ArrayList<>(); + + ListObjectsResponse listObjectsResponse; + ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container); + listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration)); + + while (true) { + // List the objects from the start, or from the last marker in case of truncated result + if (newMarker == null) { + listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build()); + } else { + listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build()); + } + + // Collect the paths to files only + collectAndFilterFiles(listObjectsResponse.contents(), predicate, matchersList, filesOnly); + + // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request + if (!listObjectsResponse.isTruncated()) { + break; + } else { + newMarker = listObjectsResponse.nextMarker(); + } + } + + return filesOnly; + } + + /** * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered * a file if it does not end up with a "/" which is the separator in a folder structure. * diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 252ed5b..53306c5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -278,6 +278,8 @@ public class ExternalDataConstants { public static final String EMPTY_FIELD = "empty value"; public static final String INVALID_VAL = "invalid value"; + public static final String DEFINITION_FIELD_NAME = "definition"; + public static class AwsS3 { private AwsS3() { throw new AssertionError("do not instantiate"); @@ -287,7 +289,6 @@ public class ExternalDataConstants { public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId"; public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey"; public static final String CONTAINER_NAME_FIELD_NAME = "container"; - public static final String DEFINITION_FIELD_NAME = "definition"; public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint"; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index fc31286..8206d4c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -69,8 +69,12 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Response; public class ExternalDataUtils { @@ -473,7 +477,7 @@ public class ExternalDataUtils { switch (type) { case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3: - ExternalDataUtils.AwsS3.validateProperties(configuration, srcLoc, collector); + AwsS3.validateProperties(configuration, srcLoc, collector); break; default: // Nothing needs to be done @@ -587,6 +591,19 @@ public class ExternalDataUtils { return result.toString(); } + /** + * Adjusts the prefix (if needed) and returns it + * + * @param configuration configuration + */ + public static String getPrefix(Map<String, String> configuration) { + String definition = configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); + if (definition != null && !definition.isEmpty()) { + return definition + (!definition.endsWith("/") ? "/" : ""); + } + return ""; + } + public static class AwsS3 { private AwsS3() { throw new AssertionError("do not instantiate"); @@ -642,23 +659,9 @@ public class ExternalDataUtils { } /** - * Sets the prefix for the list objects builder if it is available - * - * @param configuration configuration - * @param builder builder - */ - public static void setPrefix(Map<String, String> configuration, ListObjectsV2Request.Builder builder) { - String definition = configuration.get(ExternalDataConstants.AwsS3.DEFINITION_FIELD_NAME); - if (definition != null) { - builder.prefix(definition + (!definition.isEmpty() && !definition.endsWith("/") ? "/" : "")); - } - } - - /** * Validate external dataset properties * * @param configuration properties - * * @throws CompilationException Compilation exception */ public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc, @@ -672,26 +675,26 @@ public class ExternalDataUtils { validateIncludeExclude(configuration); // Check if the bucket is present - S3Client s3Client = null; - try { - String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME); - s3Client = buildAwsS3Client(configuration); - ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder(); - setPrefix(configuration, listObjectsBuilder); - - ListObjectsV2Response response = - s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build()); - - if (response.contents().isEmpty() && collector.shouldWarn()) { - Warning warning = - WarningUtil.forAsterix(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - collector.warn(warning); - } + S3Client s3Client = buildAwsS3Client(configuration);; + S3Response response; + boolean useOldApi = false; + String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME); + String prefix = getPrefix(configuration); - // Returns 200 only in case the bucket exists, however, otherwise, throws an exception. However, to - // ensure coverage, check if the result is successful as well and not only catch exceptions - if (!response.sdkHttpResponse().isSuccessful()) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); + try { + response = isBucketEmpty(s3Client, container, prefix, false); + } catch (S3Exception ex) { + // Method not implemented, try falling back to old API + try { + // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html + if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) { + useOldApi = true; + response = isBucketEmpty(s3Client, container, prefix, true); + } else { + throw ex; + } + } catch (SdkException ex2) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2.getMessage()); } } catch (SdkException ex) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); @@ -700,6 +703,44 @@ public class ExternalDataUtils { CleanupUtils.close(s3Client, null); } } + + boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty() + : ((ListObjectsV2Response) response).contents().isEmpty(); + if (isEmpty && collector.shouldWarn()) { + Warning warning = + WarningUtil.forAsterix(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); + collector.warn(warning); + } + + // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to + // ensure coverage, check if the result is successful as well and not only catch exceptions + if (!response.sdkHttpResponse().isSuccessful()) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); + } + } + + /** + * Checks for a single object in the specified bucket to determine if the bucket is empty or not. + * + * @param s3Client s3 client + * @param container the container name + * @param prefix Prefix to be used + * @param useOldApi flag whether to use the old API or not + * + * @return returns the S3 response + */ + private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) { + S3Response response; + if (useOldApi) { + ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder(); + listObjectsBuilder.prefix(prefix); + response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build()); + } else { + ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder(); + listObjectsBuilder.prefix(prefix); + response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build()); + } + return response; } /** diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index 72a658e..5b7e6c9 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -1424,6 +1424,11 @@ </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> + <artifactId>aws-core</artifactId> + <version>${awsjavasdk.version}</version> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> <artifactId>sdk-core</artifactId> <version>${awsjavasdk.version}</version> <exclusions> diff --git a/hyracks-fullstack/NOTICE b/hyracks-fullstack/NOTICE index 95fe98a..57c5843 100644 --- a/hyracks-fullstack/NOTICE +++ b/hyracks-fullstack/NOTICE @@ -1,5 +1,5 @@ Apache Hyracks and Algebricks -Copyright 2015-2020 The Apache Software Foundation +Copyright 2015-2021 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/).