>From Ritik Raj <[email protected]>:

Ritik Raj has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20549?usp=email )

Change subject: [ASTERIXDB-3669][CLOUD] Introduced S3 sync client for parallel 
downloader
......................................................................

[ASTERIXDB-3669][CLOUD] Introduced S3 sync client for parallel downloader

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Some S3-compatible object storage systems
do not support the S3 CRT or Async clients.

This patch introduces a synchronous S3 client
implementation for the parallel downloader,
which serves as a fallback in such cases
to ensure compatibility and reliability.

Additionally, a new configuration option has been
added to allow selection of the desired parallel
downloader client type (CRT, Async, or Sync).

Also, Introduced s3 client read timeout config.

Ext-ref: MB-69226

Change-Id: Id16c530916e7e223201e2395d7aa38a4640367b6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20549
Reviewed-by: Murtadha Hubail <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Ritik Raj <[email protected]>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/config/ConfigValidator.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
M 
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
9 files changed, 282 insertions(+), 23 deletions(-)

Approvals:
  Ritik Raj: Verified
  Murtadha Hubail: Looks good to me, approved
  Jenkins: Verified




diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/config/ConfigValidator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/config/ConfigValidator.java
index 138a2b4..b77db22 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/config/ConfigValidator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/config/ConfigValidator.java
@@ -18,7 +18,9 @@
  */
 package org.apache.asterix.app.config;

+import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.common.api.IConfigValidator;
+import org.apache.asterix.common.config.CloudProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.runtime.compression.CompressionManager;
 import org.apache.hyracks.api.config.IOption;
@@ -30,7 +32,10 @@
         boolean valid = true;
         if (option == StorageProperties.Option.STORAGE_COMPRESSION_BLOCK) {
             valid = CompressionManager.isRegisteredScheme((String) value);
+        } else if (option == 
CloudProperties.Option.CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE) {
+            valid = 
S3ClientConfig.S3ParallelDownloaderClientType.validate((String) value);
         }
+
         if (!valid) {
             throw new IllegalArgumentException("Invalid value " + value + " 
for option " + option.name());
         }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
index 22de1e9..6f1c453 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.io.FileReference;

 public interface IParallelDownloader extends AutoCloseable {
+    String STORAGE_SUB_DIR = "storage";

     /**
      * Downloads files in all partitions
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index b6683ad..a75eb4b 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -47,19 +47,22 @@
     private final boolean forcePathStyle;
     private final boolean disableSslVerify;
     private final boolean storageListEventuallyConsistent;
-    private final boolean enableCrtClient;
+    private final int s3ReadTimeoutInSeconds;
+    private final S3ParallelDownloaderClientType parallelDownloaderClientType;

     public S3ClientConfig(String region, String endpoint, String prefix, 
boolean anonymousAuth,
-            long profilerLogInterval, int writeBufferSize, boolean 
enableCrtClient) {
+            long profilerLogInterval, int writeBufferSize,
+            S3ParallelDownloaderClientType parallelDownloaderClientType) {
         this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, 
writeBufferSize, 1, 0, 0, 0, false, false,
-                false, 0, 0, enableCrtClient);
+                false, 0, 0, -1, parallelDownloaderClientType);
     }

     private S3ClientConfig(String region, String endpoint, String prefix, 
boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize, long 
tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
             int readMaxRequestsPerSeconds, int requestsMaxHttpConnections, 
boolean forcePathStyle,
             boolean disableSslVerify, boolean storageListEventuallyConsistent, 
int requestsMaxPendingHttpConnections,
-            int requestsHttpConnectionAcquireTimeout, boolean enableCrtClient) 
{
+            int requestsHttpConnectionAcquireTimeout, int 
s3ReadTimeoutInSeconds,
+            S3ParallelDownloaderClientType parallelDownloaderClientType) {
         this.region = Objects.requireNonNull(region, "region");
         this.endpoint = endpoint;
         this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -75,7 +78,8 @@
         this.forcePathStyle = forcePathStyle;
         this.disableSslVerify = disableSslVerify;
         this.storageListEventuallyConsistent = storageListEventuallyConsistent;
-        this.enableCrtClient = enableCrtClient;
+        this.s3ReadTimeoutInSeconds = s3ReadTimeoutInSeconds;
+        this.parallelDownloaderClientType = parallelDownloaderClientType;
     }

     public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -87,7 +91,26 @@
                 cloudProperties.isStorageForcePathStyle(), 
cloudProperties.isStorageDisableSSLVerify(),
                 cloudProperties.isStorageListEventuallyConsistent(),
                 cloudProperties.getRequestsMaxPendingHttpConnections(),
-                cloudProperties.getRequestsHttpConnectionAcquireTimeout(), 
cloudProperties.isS3EnableCrtClient());
+                cloudProperties.getRequestsHttpConnectionAcquireTimeout(), 
cloudProperties.getS3ReadTimeoutInSeconds(),
+                
S3ParallelDownloaderClientType.valueOf(cloudProperties.getS3ParallelDownloaderClientType()));
+    }
+
+    public enum S3ParallelDownloaderClientType {
+        CRT,
+        ASYNC,
+        SYNC;
+
+        public static boolean validate(String clientType) {
+            if (clientType == null || clientType.isEmpty()) {
+                return false;
+            }
+            for (S3ParallelDownloaderClientType type : values()) {
+                if (type.name().equalsIgnoreCase(clientType)) {
+                    return true;
+                }
+            }
+            return false;
+        }
     }

     public static S3ClientConfig of(Map<String, String> configuration, int 
writeBufferSize) {
@@ -101,7 +124,8 @@
         String prefix = "";
         boolean anonymousAuth = false;

-        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, 
profilerLogInterval, writeBufferSize, false);
+        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, 
profilerLogInterval, writeBufferSize,
+                S3ParallelDownloaderClientType.ASYNC);
     }

     public String getRegion() {
@@ -169,8 +193,11 @@
         return storageListEventuallyConsistent;
     }
 
-    public boolean isCrtClientEnabled() {
-        return enableCrtClient;
+    public S3ParallelDownloaderClientType getParallelDownloaderClientType() {
+        return parallelDownloaderClientType;
     }

+    public int getS3ReadTimeoutInSeconds() {
+        return s3ReadTimeoutInSeconds;
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index defffe0..ab27ad4 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -329,7 +329,11 @@

     @Override
     public IParallelDownloader createParallelDownloader(String bucket, 
IOManager ioManager) {
-        return new S3ParallelDownloader(bucket, ioManager, config, profiler);
+        S3ClientConfig.S3ParallelDownloaderClientType 
parallelDownloaderClientType = config.getParallelDownloaderClientType();
+        return switch (parallelDownloaderClientType) {
+            case CRT, ASYNC -> new S3ParallelDownloader(bucket, ioManager, 
config, profiler);
+            case SYNC -> new S3SyncDownloader(bucket, ioManager, config, 
profiler);
+        };
     }

     @Override
@@ -364,7 +368,7 @@
         return new S3BufferedWriter(s3Client, profiler, guardian, bucket, 
config.getPrefix() + path);
     }

-    private static CloseableAwsClients buildClient(S3ClientConfig config) {
+    public static CloseableAwsClients buildClient(S3ClientConfig config) {
         CloseableAwsClients awsClients = new CloseableAwsClients();
         S3ClientBuilder builder = S3Client.builder();
         AwsCredentialsProvider credentialsProvider = 
config.createCredentialsProvider();
@@ -385,6 +389,10 @@
             
customHttpConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT,
                     
Duration.ofSeconds(config.getRequestsHttpConnectionAcquireTimeout()));
         }
+        if (config.getS3ReadTimeoutInSeconds() > 0) {
+            
customHttpConfigBuilder.put(SdkHttpConfigurationOption.READ_TIMEOUT,
+                    Duration.ofSeconds(config.getS3ReadTimeoutInSeconds()));
+        }
         if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
             builder.endpointOverride(URI.create(config.getEndpoint()));
         }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index 50d05c6..0321a35 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -181,7 +181,9 @@

     private static S3AsyncClient createAsyncClient(S3ClientConfig config) {
         // CRT client is not supported by all local S3 providers, but provides 
a better performance with AWS S3
-        if (config.isCrtClientEnabled()) {
+        S3ClientConfig.S3ParallelDownloaderClientType 
parallelDownloaderClientType =
+                config.getParallelDownloaderClientType();
+        if (parallelDownloaderClientType == 
S3ClientConfig.S3ParallelDownloaderClientType.CRT) {
             return createS3CrtAsyncClient(config);
         }
         return createS3AsyncClient(config);
@@ -211,6 +213,10 @@
             
customHttpConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT,
                     
Duration.ofSeconds(config.getRequestsHttpConnectionAcquireTimeout()));
         }
+        if (config.getS3ReadTimeoutInSeconds() > 0) {
+            
customHttpConfigBuilder.put(SdkHttpConfigurationOption.READ_TIMEOUT,
+                    Duration.ofSeconds(config.getS3ReadTimeoutInSeconds()));
+        }
         SdkAsyncHttpClient nettyHttpClient =
                 
NettyNioAsyncHttpClient.builder().buildWithDefaults(customHttpConfigBuilder.build());
         builder.httpClient(nettyHttpClient);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
new file mode 100644
index 0000000..53e3dad
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.aws.s3;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+@ThreadSafe
+public class S3SyncDownloader implements IParallelDownloader {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final String bucket;
+    private final IOManager ioManager;
+    private final S3Client s3Client;
+    private final S3ClientConfig config;
+    private final IRequestProfilerLimiter profiler;
+    private final ExecutorService executorService;
+
+    S3SyncDownloader(String bucket, IOManager ioManager, S3ClientConfig 
config, IRequestProfilerLimiter profiler) {
+        this.bucket = bucket;
+        this.ioManager = ioManager;
+        this.config = config;
+        this.profiler = profiler;
+        this.s3Client = (S3Client) 
S3CloudClient.buildClient(config).getConsumingClient();
+        this.executorService = Executors.newCachedThreadPool();
+    }
+
+    @Override
+    public void downloadFiles(Collection<FileReference> toDownload) throws 
HyracksDataException {
+        try {
+            downloadFilesAndWait(toDownload);
+        } catch (IOException | InterruptedException | ExecutionException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void downloadFilesAndWait(Collection<FileReference> toDownload)
+            throws IOException, ExecutionException, InterruptedException {
+        List<Future<?>> downloads = new ArrayList<>();
+        int maxPending = config.getRequestsMaxPendingHttpConnections();
+        for (FileReference fileReference : toDownload) {
+            profiler.objectGet();
+            FileUtils.createParentDirectories(fileReference.getFile());
+            Future<?> future = executorService.submit(() -> {
+                try {
+                    downloadFile(fileReference);
+                } catch (HyracksDataException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            downloads.add(future);
+
+            if (maxPending > 0 && downloads.size() >= maxPending) {
+                waitForFileDownloads(downloads);
+                downloads.clear();
+            }
+        }
+        if (!downloads.isEmpty()) {
+            waitForFileDownloads(downloads);
+        }
+    }
+
+    private void waitForFileDownloads(List<Future<?>> downloads) throws 
ExecutionException, InterruptedException {
+        for (Future<?> download : downloads) {
+            download.get();
+        }
+    }
+
+    private void downloadFile(FileReference fileReference) throws 
HyracksDataException {
+        GetObjectRequest request = GetObjectRequest.builder().bucket(bucket)
+                .key(config.getPrefix() + 
fileReference.getRelativePath()).build();
+
+        Path targetPath = fileReference.getFile().toPath();
+        try (ResponseInputStream<GetObjectResponse> response = 
s3Client.getObject(request);
+                OutputStream outputStream = Files.newOutputStream(targetPath, 
StandardOpenOption.CREATE,
+                        StandardOpenOption.WRITE, 
StandardOpenOption.TRUNCATE_EXISTING)) {
+
+            response.transferTo(outputStream);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
+            throws HyracksDataException {
+        Set<FileReference> failedFiles;
+        try {
+            failedFiles = downloadDirectoriesAndWait(toDownload);
+        } catch (IOException | InterruptedException | ExecutionException e) {
+            throw HyracksDataException.create(e);
+        }
+        return failedFiles;
+    }
+
+    private Set<FileReference> 
downloadDirectoriesAndWait(Collection<FileReference> toDownload)
+            throws IOException, ExecutionException, InterruptedException {
+        Set<FileReference> failedFiles = ConcurrentHashMap.newKeySet();
+        List<Future<?>> downloads = new ArrayList<>();
+
+        int maxPending = config.getRequestsMaxPendingHttpConnections();
+        List<S3Object> downloadObjects = new ArrayList<>();
+        for (FileReference fileReference : toDownload) {
+            profiler.objectMultipartDownload();
+            String prefix = config.getPrefix() + 
fileReference.getRelativePath();
+            List<S3Object> objects = S3ClientUtils.listS3Objects(s3Client, 
bucket, prefix);
+            downloadObjects.addAll(objects);
+        }
+
+        for (S3Object s3Object : downloadObjects) {
+            String key = createDiskSubPath(s3Object.key());
+            FileReference targetFile = ioManager.resolve(key);
+
+            FileUtils.createParentDirectories(targetFile.getFile());
+
+            Future<Void> future = executorService.submit(() -> {
+                try {
+                    profiler.objectGet();
+                    downloadFile(targetFile);
+                } catch (IOException e) {
+                    // Record failed file
+                    failedFiles.add(targetFile);
+                    LOGGER.debug("Failed to download file using sync client: 
file {} having s3Key: {}", targetFile,
+                            s3Object.key(), e);
+                }
+                return null;
+            });
+            downloads.add(future);
+
+            if (maxPending > 0 && downloads.size() >= maxPending) {
+                waitForDirectoryFileDownloads(downloads);
+                downloads.clear();
+            }
+        }
+
+        if (!downloads.isEmpty()) {
+            waitForDirectoryFileDownloads(downloads);
+        }
+
+        return failedFiles;
+    }
+
+    private void waitForDirectoryFileDownloads(List<Future<?>> downloads)
+            throws ExecutionException, InterruptedException {
+        for (Future<?> download : downloads) {
+            download.get();
+        }
+    }
+
+    private String createDiskSubPath(String objectName) {
+        if (!objectName.startsWith(STORAGE_SUB_DIR)) {
+            objectName = 
objectName.substring(objectName.indexOf(STORAGE_SUB_DIR));
+        }
+        return objectName;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        s3Client.close();
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 25bc217..364fb2a 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -45,7 +45,6 @@
 import reactor.core.publisher.Mono;

 public class AzureParallelDownloader implements IParallelDownloader {
-    public static final String STORAGE_SUB_DIR = "storage";
     private final IOManager ioManager;
     private final BlobContainerAsyncClient blobContainerAsyncClient;
     private final IRequestProfilerLimiter profiler;
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 1bb0f74..5f8ee8c 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -67,8 +67,8 @@
         
client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
         LOGGER.info("Client created successfully");
         int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
-        S3ClientConfig config =
-                new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
"", true, 0, writeBufferSize, false);
+        S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION, 
MOCK_SERVER_HOSTNAME, "", true, 0,
+                writeBufferSize, 
S3ClientConfig.S3ParallelDownloaderClientType.ASYNC);
         CLOUD_CLIENT = new S3CloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 4b6c928..50ec1f0 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -20,6 +20,7 @@

 import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
@@ -77,9 +78,10 @@
         CLOUD_STORAGE_FORCE_PATH_STYLE(BOOLEAN, false),
         CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false),
         CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false),
-        CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT(BOOLEAN, 
(Function<IApplicationConfig, Boolean>) app -> {
+        CLOUD_STORAGE_S3_CLIENT_READ_TIMEOUT(INTEGER, -1),
+        CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE(STRING, 
(Function<IApplicationConfig, String>) app -> {
             String endpoint = app.getString(CLOUD_STORAGE_ENDPOINT);
-            return endpoint == null || endpoint.isEmpty();
+            return (endpoint == null || endpoint.isEmpty()) ? "crt" : "async";
         });

         private final IOptionType interpreter;
@@ -196,8 +198,10 @@
                 case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT:
                     return "Indicates whether or not deleted objects may be 
contained in list operations for some time"
                             + "after they are deleted";
-                case CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT:
-                    return "Indicates whether or not to use the AWS CRT S3 
client for async requests";
+                case CLOUD_STORAGE_S3_CLIENT_READ_TIMEOUT:
+                    return "The read timeout (in seconds) for S3 sync client 
(-1 means SDK default)";
+                case CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE:
+                    return "The S3 client to use for parallel downloads (crt, 
async or sync)";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -215,8 +219,8 @@

         @Override
         public String usageDefaultOverride(IApplicationConfig accessor, 
Function<IOption, String> optionPrinter) {
-            if (this == CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT) {
-                return "true when no custom endpoint is set, otherwise false";
+            if (this == CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE) {
+                return "crt if no custom endpoint is set; async otherwise";
             }
             return IOption.super.usageDefaultOverride(accessor, optionPrinter);
         }
@@ -325,7 +329,11 @@
         return 
accessor.getBoolean(Option.CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT);
     }

-    public boolean isS3EnableCrtClient() {
-        return accessor.getBoolean(Option.CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT);
+    public String getS3ParallelDownloaderClientType() {
+        return 
accessor.getString(Option.CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE).toUpperCase();
+    }
+
+    public int getS3ReadTimeoutInSeconds() {
+        return accessor.getInt(Option.CLOUD_STORAGE_S3_CLIENT_READ_TIMEOUT);
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20549?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Id16c530916e7e223201e2395d7aa38a4640367b6
Gerrit-Change-Number: 20549
Gerrit-PatchSet: 8
Gerrit-Owner: Ritik Raj <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Ritik Raj <[email protected]>
Gerrit-CC: Michael Blow <[email protected]>

Reply via email to