This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 9f454cdb48537e1fc0f9dd7b66ad75470a8c514a
Author: Hussain Towaileb <hussain.towai...@couchbase.com>
AuthorDate: Fri Feb 5 16:49:11 2021 +0300

    [ASTERIXDB-2827][EXT]: S3 external dataset: properly fallback to old API
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Properly fallback to old API if the new API is not supported.
      if the old API fails as well, then report the error properly.
    
    Change-Id: Ib453eb396def92218951b9e45a89b6c0f48a54f6
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9844
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Hussain Towaileb <hussai...@gmail.com>
    Reviewed-by: Michael Blow <mb...@apache.org>
---
 asterixdb/NOTICE                                   |   2 +-
 asterixdb/asterix-external-data/pom.xml            |   4 +
 .../record/reader/aws/AwsS3InputStreamFactory.java | 122 ++++++++++++++++-----
 .../external/util/ExternalDataConstants.java       |   3 +-
 .../asterix/external/util/ExternalDataUtils.java   | 109 ++++++++++++------
 asterixdb/pom.xml                                  |   5 +
 hyracks-fullstack/NOTICE                           |   2 +-
 7 files changed, 182 insertions(+), 65 deletions(-)

diff --git a/asterixdb/NOTICE b/asterixdb/NOTICE
index b4729a8..4aabe27 100644
--- a/asterixdb/NOTICE
+++ b/asterixdb/NOTICE
@@ -1,5 +1,5 @@
 Apache AsterixDB
-Copyright 2015-2020 The Apache Software Foundation
+Copyright 2015-2021 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index 8270d71..169bcb6 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -437,6 +437,10 @@
     </dependency>
     <dependency>
       <groupId>software.amazon.awssdk</groupId>
+      <artifactId>aws-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
       <artifactId>http-client-spi</artifactId>
     </dependency>
     <dependency>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index f3a36ff..0bc4c40 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -50,8 +50,11 @@ import org.apache.hyracks.api.util.CleanupUtils;
 
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 public class AwsS3InputStreamFactory implements IInputStreamFactory {
@@ -88,10 +91,6 @@ public class AwsS3InputStreamFactory implements 
IInputStreamFactory {
         this.configuration = configuration;
         ICcApplicationContext ccApplicationContext = (ICcApplicationContext) 
ctx.getApplicationContext();
 
-        String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
-
-        List<S3Object> filesOnly = new ArrayList<>();
-
         // Ensure the validity of include/exclude
         ExternalDataUtils.AwsS3.validateIncludeExclude(configuration);
 
@@ -126,35 +125,24 @@ public class AwsS3InputStreamFactory implements 
IInputStreamFactory {
             p = (matchers, key) -> true;
         }
 
-        S3Client s3Client = 
ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
-
         // Get all objects in a bucket and extract the paths to files
-        ListObjectsV2Request.Builder listObjectsBuilder = 
ListObjectsV2Request.builder().bucket(container);
-        ExternalDataUtils.AwsS3.setPrefix(configuration, listObjectsBuilder);
-
-        ListObjectsV2Response listObjectsResponse;
-        boolean done = false;
-        String newMarker = null;
+        List<S3Object> filesOnly;
+        String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
+        S3Client s3Client = 
ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
 
         try {
-            while (!done) {
-                // List the objects from the start, or from the last marker in 
case of truncated result
-                if (newMarker == null) {
-                    listObjectsResponse = 
s3Client.listObjectsV2(listObjectsBuilder.build());
+            filesOnly = listS3Objects(s3Client, container, matchersList, p);
+        } catch (S3Exception ex) {
+            // New API is not implemented, try falling back to old API
+            try {
+                // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+                if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) 
{
+                    filesOnly = oldApiListS3Objects(s3Client, container, 
matchersList, p);
                 } else {
-                    listObjectsResponse =
-                            
s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
-                }
-
-                // Collect the paths to files only
-                collectAndFilterFiles(listObjectsResponse.contents(), p, 
matchersList, filesOnly);
-
-                // Mark the flag as done if done, otherwise, get the marker of 
the previous response for the next request
-                if (!listObjectsResponse.isTruncated()) {
-                    done = true;
-                } else {
-                    newMarker = listObjectsResponse.nextContinuationToken();
+                    throw ex;
                 }
+            } catch (SdkException ex2) {
+                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2.getMessage());
             }
         } catch (SdkException ex) {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex.getMessage());
@@ -179,6 +167,84 @@ public class AwsS3InputStreamFactory implements 
IInputStreamFactory {
     }
 
     /**
+     * Uses the latest API to retrieve the objects from the storage.
+     *
+     * @param s3Client S3 client
+     * @param container container name
+     * @param matchersList include/exclude matchers to apply
+     * @param predicate predicate to use for comparison
+     */
+    private List<S3Object> listS3Objects(S3Client s3Client, String container, 
List<Matcher> matchersList,
+            BiPredicate<List<Matcher>, String> predicate) {
+        String newMarker = null;
+        List<S3Object> filesOnly = new ArrayList<>();
+
+        ListObjectsV2Response listObjectsResponse;
+        ListObjectsV2Request.Builder listObjectsBuilder = 
ListObjectsV2Request.builder().bucket(container);
+        listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration));
+
+        while (true) {
+            // List the objects from the start, or from the last marker in 
case of truncated result
+            if (newMarker == null) {
+                listObjectsResponse = 
s3Client.listObjectsV2(listObjectsBuilder.build());
+            } else {
+                listObjectsResponse = 
s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+            }
+
+            // Collect the paths to files only
+            collectAndFilterFiles(listObjectsResponse.contents(), predicate, 
matchersList, filesOnly);
+
+            // Mark the flag as done if done, otherwise, get the marker of the 
previous response for the next request
+            if (!listObjectsResponse.isTruncated()) {
+                break;
+            } else {
+                newMarker = listObjectsResponse.nextContinuationToken();
+            }
+        }
+
+        return filesOnly;
+    }
+
+    /**
+     * Uses the old API (in case the new API is not implemented) to retrieve 
the objects from the storage
+     *
+     * @param s3Client S3 client
+     * @param container container name
+     * @param matchersList include/exclude matchers to apply
+     * @param predicate predicate to use for comparison
+     */
+    private List<S3Object> oldApiListS3Objects(S3Client s3Client, String 
container, List<Matcher> matchersList,
+            BiPredicate<List<Matcher>, String> predicate) {
+        String newMarker = null;
+        List<S3Object> filesOnly = new ArrayList<>();
+
+        ListObjectsResponse listObjectsResponse;
+        ListObjectsRequest.Builder listObjectsBuilder = 
ListObjectsRequest.builder().bucket(container);
+        listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration));
+
+        while (true) {
+            // List the objects from the start, or from the last marker in 
case of truncated result
+            if (newMarker == null) {
+                listObjectsResponse = 
s3Client.listObjects(listObjectsBuilder.build());
+            } else {
+                listObjectsResponse = 
s3Client.listObjects(listObjectsBuilder.marker(newMarker).build());
+            }
+
+            // Collect the paths to files only
+            collectAndFilterFiles(listObjectsResponse.contents(), predicate, 
matchersList, filesOnly);
+
+            // Mark the flag as done if done, otherwise, get the marker of the 
previous response for the next request
+            if (!listObjectsResponse.isTruncated()) {
+                break;
+            } else {
+                newMarker = listObjectsResponse.nextMarker();
+            }
+        }
+
+        return filesOnly;
+    }
+
+    /**
      * AWS S3 returns all the objects as paths, not differentiating between 
folder and files. The path is considered
      * a file if it does not end up with a "/" which is the separator in a 
folder structure.
      *
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 252ed5b..53306c5 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -278,6 +278,8 @@ public class ExternalDataConstants {
     public static final String EMPTY_FIELD = "empty value";
     public static final String INVALID_VAL = "invalid value";
 
+    public static final String DEFINITION_FIELD_NAME = "definition";
+
     public static class AwsS3 {
         private AwsS3() {
             throw new AssertionError("do not instantiate");
@@ -287,7 +289,6 @@ public class ExternalDataConstants {
         public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
         public static final String SECRET_ACCESS_KEY_FIELD_NAME = 
"secretAccessKey";
         public static final String CONTAINER_NAME_FIELD_NAME = "container";
-        public static final String DEFINITION_FIELD_NAME = "definition";
         public static final String SERVICE_END_POINT_FIELD_NAME = 
"serviceEndpoint";
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index fc31286..8206d4c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -69,8 +69,12 @@ import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Response;
 
 public class ExternalDataUtils {
 
@@ -473,7 +477,7 @@ public class ExternalDataUtils {
 
         switch (type) {
             case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
-                ExternalDataUtils.AwsS3.validateProperties(configuration, 
srcLoc, collector);
+                AwsS3.validateProperties(configuration, srcLoc, collector);
                 break;
             default:
                 // Nothing needs to be done
@@ -587,6 +591,19 @@ public class ExternalDataUtils {
         return result.toString();
     }
 
+    /**
+     * Adjusts the prefix (if needed) and returns it
+     *
+     * @param configuration configuration
+     */
+    public static String getPrefix(Map<String, String> configuration) {
+        String definition = 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+        if (definition != null && !definition.isEmpty()) {
+            return definition + (!definition.endsWith("/") ? "/" : "");
+        }
+        return "";
+    }
+
     public static class AwsS3 {
         private AwsS3() {
             throw new AssertionError("do not instantiate");
@@ -642,23 +659,9 @@ public class ExternalDataUtils {
         }
 
         /**
-         * Sets the prefix for the list objects builder if it is available
-         *
-         * @param configuration configuration
-         * @param builder builder
-         */
-        public static void setPrefix(Map<String, String> configuration, 
ListObjectsV2Request.Builder builder) {
-            String definition = 
configuration.get(ExternalDataConstants.AwsS3.DEFINITION_FIELD_NAME);
-            if (definition != null) {
-                builder.prefix(definition + (!definition.isEmpty() && 
!definition.endsWith("/") ? "/" : ""));
-            }
-        }
-
-        /**
          * Validate external dataset properties
          *
          * @param configuration properties
-         *
          * @throws CompilationException Compilation exception
          */
         public static void validateProperties(Map<String, String> 
configuration, SourceLocation srcLoc,
@@ -672,26 +675,26 @@ public class ExternalDataUtils {
             validateIncludeExclude(configuration);
 
             // Check if the bucket is present
-            S3Client s3Client = null;
-            try {
-                String container = 
configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
-                s3Client = buildAwsS3Client(configuration);
-                ListObjectsV2Request.Builder listObjectsBuilder = 
ListObjectsV2Request.builder();
-                setPrefix(configuration, listObjectsBuilder);
-
-                ListObjectsV2Response response =
-                        
s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
-
-                if (response.contents().isEmpty() && collector.shouldWarn()) {
-                    Warning warning =
-                            WarningUtil.forAsterix(srcLoc, 
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-                    collector.warn(warning);
-                }
+            S3Client s3Client = buildAwsS3Client(configuration);;
+            S3Response response;
+            boolean useOldApi = false;
+            String container = 
configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+            String prefix = getPrefix(configuration);
 
-                // Returns 200 only in case the bucket exists, however, 
otherwise, throws an exception. However, to
-                // ensure coverage, check if the result is successful as well 
and not only catch exceptions
-                if (!response.sdkHttpResponse().isSuccessful()) {
-                    throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+            try {
+                response = isBucketEmpty(s3Client, container, prefix, false);
+            } catch (S3Exception ex) {
+                // Method not implemented, try falling back to old API
+                try {
+                    // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+                    if 
(ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+                        useOldApi = true;
+                        response = isBucketEmpty(s3Client, container, prefix, 
true);
+                    } else {
+                        throw ex;
+                    }
+                } catch (SdkException ex2) {
+                    throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2.getMessage());
                 }
             } catch (SdkException ex) {
                 throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
@@ -700,6 +703,44 @@ public class ExternalDataUtils {
                     CleanupUtils.close(s3Client, null);
                 }
             }
+
+            boolean isEmpty = useOldApi ? ((ListObjectsResponse) 
response).contents().isEmpty()
+                    : ((ListObjectsV2Response) response).contents().isEmpty();
+            if (isEmpty && collector.shouldWarn()) {
+                Warning warning =
+                        WarningUtil.forAsterix(srcLoc, 
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                collector.warn(warning);
+            }
+
+            // Returns 200 only in case the bucket exists, otherwise, throws 
an exception. However, to
+            // ensure coverage, check if the result is successful as well and 
not only catch exceptions
+            if (!response.sdkHttpResponse().isSuccessful()) {
+                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+            }
+        }
+
+        /**
+         * Checks for a single object in the specified bucket to determine if 
the bucket is empty or not.
+         *
+         * @param s3Client s3 client
+         * @param container the container name
+         * @param prefix Prefix to be used
+         * @param useOldApi flag whether to use the old API or not
+         *
+         * @return returns the S3 response
+         */
+        private static S3Response isBucketEmpty(S3Client s3Client, String 
container, String prefix, boolean useOldApi) {
+            S3Response response;
+            if (useOldApi) {
+                ListObjectsRequest.Builder listObjectsBuilder = 
ListObjectsRequest.builder();
+                listObjectsBuilder.prefix(prefix);
+                response = 
s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build());
+            } else {
+                ListObjectsV2Request.Builder listObjectsBuilder = 
ListObjectsV2Request.builder();
+                listObjectsBuilder.prefix(prefix);
+                response = 
s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
+            }
+            return response;
         }
 
         /**
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 72a658e..5b7e6c9 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1424,6 +1424,11 @@
       </dependency>
       <dependency>
         <groupId>software.amazon.awssdk</groupId>
+        <artifactId>aws-core</artifactId>
+        <version>${awsjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
         <artifactId>sdk-core</artifactId>
         <version>${awsjavasdk.version}</version>
         <exclusions>
diff --git a/hyracks-fullstack/NOTICE b/hyracks-fullstack/NOTICE
index 95fe98a..57c5843 100644
--- a/hyracks-fullstack/NOTICE
+++ b/hyracks-fullstack/NOTICE
@@ -1,5 +1,5 @@
 Apache Hyracks and Algebricks
-Copyright 2015-2020 The Apache Software Foundation
+Copyright 2015-2021 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).

Reply via email to