>From Ritik Raj <[email protected]>: Ritik Raj has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20549?usp=email )
Change subject: [ASTERIXDB-3669][CLOUD] Introduced S3 sync client for parallel downloader ...................................................................... [ASTERIXDB-3669][CLOUD] Introduced S3 sync client for parallel downloader - user model changes: no - storage format changes: no - interface changes: no Details: Some S3-compatible object storage systems do not support the S3 CRT or Async clients. This patch introduces a synchronous S3 client implementation for the parallel downloader, which serves as a fallback in such cases to ensure compatibility and reliability. Additionally, a new configuration option has been added to allow selection of the desired parallel downloader client type (CRT, Async, or Sync). Also, Introduced s3 client read timeout config. Ext-ref: MB-69226 Change-Id: Id16c530916e7e223201e2395d7aa38a4640367b6 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20549 Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Ritik Raj <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/config/ConfigValidator.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java A asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java M asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java 9 files changed, 282 insertions(+), 23 deletions(-) Approvals: Ritik Raj: Verified Murtadha Hubail: Looks good to me, approved Jenkins: Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/config/ConfigValidator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/config/ConfigValidator.java index 138a2b4..b77db22 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/config/ConfigValidator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/config/ConfigValidator.java @@ -18,7 +18,9 @@ */ package org.apache.asterix.app.config; +import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig; import org.apache.asterix.common.api.IConfigValidator; +import org.apache.asterix.common.config.CloudProperties; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.runtime.compression.CompressionManager; import org.apache.hyracks.api.config.IOption; @@ -30,7 +32,10 @@ boolean valid = true; if (option == StorageProperties.Option.STORAGE_COMPRESSION_BLOCK) { valid = CompressionManager.isRegisteredScheme((String) value); + } else if (option == CloudProperties.Option.CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE) { + valid = S3ClientConfig.S3ParallelDownloaderClientType.validate((String) value); } + if (!valid) { throw new IllegalArgumentException("Invalid value " + value + " for option " + option.name()); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java index 22de1e9..6f1c453 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java @@ -24,6 +24,7 @@ import org.apache.hyracks.api.io.FileReference; public interface IParallelDownloader extends AutoCloseable { + String STORAGE_SUB_DIR = "storage"; /** * Downloads files in all partitions diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java index b6683ad..a75eb4b 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java @@ -47,19 +47,22 @@ private final boolean forcePathStyle; private final boolean disableSslVerify; private final boolean storageListEventuallyConsistent; - private final boolean enableCrtClient; + private final int s3ReadTimeoutInSeconds; + private final S3ParallelDownloaderClientType parallelDownloaderClientType; public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, - long profilerLogInterval, int writeBufferSize, boolean enableCrtClient) { + long profilerLogInterval, int writeBufferSize, + S3ParallelDownloaderClientType parallelDownloaderClientType) { this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0, false, false, - false, 0, 0, enableCrtClient); + false, 0, 0, -1, parallelDownloaderClientType); } private S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, long profilerLogInterval, int writeBufferSize, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int readMaxRequestsPerSeconds, int requestsMaxHttpConnections, boolean forcePathStyle, boolean disableSslVerify, boolean storageListEventuallyConsistent, int requestsMaxPendingHttpConnections, - int requestsHttpConnectionAcquireTimeout, boolean enableCrtClient) { + int requestsHttpConnectionAcquireTimeout, int s3ReadTimeoutInSeconds, + S3ParallelDownloaderClientType parallelDownloaderClientType) { this.region = Objects.requireNonNull(region, "region"); this.endpoint = endpoint; this.prefix = Objects.requireNonNull(prefix, "prefix"); @@ -75,7 +78,8 @@ this.forcePathStyle = forcePathStyle; this.disableSslVerify = disableSslVerify; this.storageListEventuallyConsistent = storageListEventuallyConsistent; - this.enableCrtClient = enableCrtClient; + this.s3ReadTimeoutInSeconds = s3ReadTimeoutInSeconds; + this.parallelDownloaderClientType = parallelDownloaderClientType; } public static S3ClientConfig of(CloudProperties cloudProperties) { @@ -87,7 +91,26 @@ cloudProperties.isStorageForcePathStyle(), cloudProperties.isStorageDisableSSLVerify(), cloudProperties.isStorageListEventuallyConsistent(), cloudProperties.getRequestsMaxPendingHttpConnections(), - cloudProperties.getRequestsHttpConnectionAcquireTimeout(), cloudProperties.isS3EnableCrtClient()); + cloudProperties.getRequestsHttpConnectionAcquireTimeout(), cloudProperties.getS3ReadTimeoutInSeconds(), + S3ParallelDownloaderClientType.valueOf(cloudProperties.getS3ParallelDownloaderClientType())); + } + + public enum S3ParallelDownloaderClientType { + CRT, + ASYNC, + SYNC; + + public static boolean validate(String clientType) { + if (clientType == null || clientType.isEmpty()) { + return false; + } + for (S3ParallelDownloaderClientType type : values()) { + if (type.name().equalsIgnoreCase(clientType)) { + return true; + } + } + return false; + } } public static S3ClientConfig of(Map<String, String> configuration, int writeBufferSize) { @@ -101,7 +124,8 @@ String prefix = ""; boolean anonymousAuth = false; - return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, false); + return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, + S3ParallelDownloaderClientType.ASYNC); } public String getRegion() { @@ -169,8 +193,11 @@ return storageListEventuallyConsistent; } - public boolean isCrtClientEnabled() { - return enableCrtClient; + public S3ParallelDownloaderClientType getParallelDownloaderClientType() { + return parallelDownloaderClientType; } + public int getS3ReadTimeoutInSeconds() { + return s3ReadTimeoutInSeconds; + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java index defffe0..ab27ad4 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java @@ -329,7 +329,11 @@ @Override public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) { - return new S3ParallelDownloader(bucket, ioManager, config, profiler); + S3ClientConfig.S3ParallelDownloaderClientType parallelDownloaderClientType = config.getParallelDownloaderClientType(); + return switch (parallelDownloaderClientType) { + case CRT, ASYNC -> new S3ParallelDownloader(bucket, ioManager, config, profiler); + case SYNC -> new S3SyncDownloader(bucket, ioManager, config, profiler); + }; } @Override @@ -364,7 +368,7 @@ return new S3BufferedWriter(s3Client, profiler, guardian, bucket, config.getPrefix() + path); } - private static CloseableAwsClients buildClient(S3ClientConfig config) { + public static CloseableAwsClients buildClient(S3ClientConfig config) { CloseableAwsClients awsClients = new CloseableAwsClients(); S3ClientBuilder builder = S3Client.builder(); AwsCredentialsProvider credentialsProvider = config.createCredentialsProvider(); @@ -385,6 +389,10 @@ customHttpConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT, Duration.ofSeconds(config.getRequestsHttpConnectionAcquireTimeout())); } + if (config.getS3ReadTimeoutInSeconds() > 0) { + customHttpConfigBuilder.put(SdkHttpConfigurationOption.READ_TIMEOUT, + Duration.ofSeconds(config.getS3ReadTimeoutInSeconds())); + } if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) { builder.endpointOverride(URI.create(config.getEndpoint())); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java index 50d05c6..0321a35 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java @@ -181,7 +181,9 @@ private static S3AsyncClient createAsyncClient(S3ClientConfig config) { // CRT client is not supported by all local S3 providers, but provides a better performance with AWS S3 - if (config.isCrtClientEnabled()) { + S3ClientConfig.S3ParallelDownloaderClientType parallelDownloaderClientType = + config.getParallelDownloaderClientType(); + if (parallelDownloaderClientType == S3ClientConfig.S3ParallelDownloaderClientType.CRT) { return createS3CrtAsyncClient(config); } return createS3AsyncClient(config); @@ -211,6 +213,10 @@ customHttpConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT, Duration.ofSeconds(config.getRequestsHttpConnectionAcquireTimeout())); } + if (config.getS3ReadTimeoutInSeconds() > 0) { + customHttpConfigBuilder.put(SdkHttpConfigurationOption.READ_TIMEOUT, + Duration.ofSeconds(config.getS3ReadTimeoutInSeconds())); + } SdkAsyncHttpClient nettyHttpClient = NettyNioAsyncHttpClient.builder().buildWithDefaults(customHttpConfigBuilder.build()); builder.httpClient(nettyHttpClient); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java new file mode 100644 index 0000000..53e3dad --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients.aws.s3; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; +import org.apache.commons.io.FileUtils; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.control.nc.io.IOManager; +import org.apache.hyracks.util.annotations.ThreadSafe; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Object; + +@ThreadSafe +public class S3SyncDownloader implements IParallelDownloader { + private static final Logger LOGGER = LogManager.getLogger(); + + private final String bucket; + private final IOManager ioManager; + private final S3Client s3Client; + private final S3ClientConfig config; + private final IRequestProfilerLimiter profiler; + private final ExecutorService executorService; + + S3SyncDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfilerLimiter profiler) { + this.bucket = bucket; + this.ioManager = ioManager; + this.config = config; + this.profiler = profiler; + this.s3Client = (S3Client) S3CloudClient.buildClient(config).getConsumingClient(); + this.executorService = Executors.newCachedThreadPool(); + } + + @Override + public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException { + try { + downloadFilesAndWait(toDownload); + } catch (IOException | InterruptedException | ExecutionException e) { + throw HyracksDataException.create(e); + } + } + + private void downloadFilesAndWait(Collection<FileReference> toDownload) + throws IOException, ExecutionException, InterruptedException { + List<Future<?>> downloads = new ArrayList<>(); + int maxPending = config.getRequestsMaxPendingHttpConnections(); + for (FileReference fileReference : toDownload) { + profiler.objectGet(); + FileUtils.createParentDirectories(fileReference.getFile()); + Future<?> future = executorService.submit(() -> { + try { + downloadFile(fileReference); + } catch (HyracksDataException e) { + throw new RuntimeException(e); + } + }); + downloads.add(future); + + if (maxPending > 0 && downloads.size() >= maxPending) { + waitForFileDownloads(downloads); + downloads.clear(); + } + } + if (!downloads.isEmpty()) { + waitForFileDownloads(downloads); + } + } + + private void waitForFileDownloads(List<Future<?>> downloads) throws ExecutionException, InterruptedException { + for (Future<?> download : downloads) { + download.get(); + } + } + + private void downloadFile(FileReference fileReference) throws HyracksDataException { + GetObjectRequest request = GetObjectRequest.builder().bucket(bucket) + .key(config.getPrefix() + fileReference.getRelativePath()).build(); + + Path targetPath = fileReference.getFile().toPath(); + try (ResponseInputStream<GetObjectResponse> response = s3Client.getObject(request); + OutputStream outputStream = Files.newOutputStream(targetPath, StandardOpenOption.CREATE, + StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) { + + response.transferTo(outputStream); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + @Override + public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) + throws HyracksDataException { + Set<FileReference> failedFiles; + try { + failedFiles = downloadDirectoriesAndWait(toDownload); + } catch (IOException | InterruptedException | ExecutionException e) { + throw HyracksDataException.create(e); + } + return failedFiles; + } + + private Set<FileReference> downloadDirectoriesAndWait(Collection<FileReference> toDownload) + throws IOException, ExecutionException, InterruptedException { + Set<FileReference> failedFiles = ConcurrentHashMap.newKeySet(); + List<Future<?>> downloads = new ArrayList<>(); + + int maxPending = config.getRequestsMaxPendingHttpConnections(); + List<S3Object> downloadObjects = new ArrayList<>(); + for (FileReference fileReference : toDownload) { + profiler.objectMultipartDownload(); + String prefix = config.getPrefix() + fileReference.getRelativePath(); + List<S3Object> objects = S3ClientUtils.listS3Objects(s3Client, bucket, prefix); + downloadObjects.addAll(objects); + } + + for (S3Object s3Object : downloadObjects) { + String key = createDiskSubPath(s3Object.key()); + FileReference targetFile = ioManager.resolve(key); + + FileUtils.createParentDirectories(targetFile.getFile()); + + Future<Void> future = executorService.submit(() -> { + try { + profiler.objectGet(); + downloadFile(targetFile); + } catch (IOException e) { + // Record failed file + failedFiles.add(targetFile); + LOGGER.debug("Failed to download file using sync client: file {} having s3Key: {}", targetFile, + s3Object.key(), e); + } + return null; + }); + downloads.add(future); + + if (maxPending > 0 && downloads.size() >= maxPending) { + waitForDirectoryFileDownloads(downloads); + downloads.clear(); + } + } + + if (!downloads.isEmpty()) { + waitForDirectoryFileDownloads(downloads); + } + + return failedFiles; + } + + private void waitForDirectoryFileDownloads(List<Future<?>> downloads) + throws ExecutionException, InterruptedException { + for (Future<?> download : downloads) { + download.get(); + } + } + + private String createDiskSubPath(String objectName) { + if (!objectName.startsWith(STORAGE_SUB_DIR)) { + objectName = objectName.substring(objectName.indexOf(STORAGE_SUB_DIR)); + } + return objectName; + } + + @Override + public void close() throws HyracksDataException { + s3Client.close(); + } +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java index 25bc217..364fb2a 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java @@ -45,7 +45,6 @@ import reactor.core.publisher.Mono; public class AzureParallelDownloader implements IParallelDownloader { - public static final String STORAGE_SUB_DIR = "storage"; private final IOManager ioManager; private final BlobContainerAsyncClient blobContainerAsyncClient; private final IRequestProfilerLimiter profiler; diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java index 1bb0f74..5f8ee8c 100644 --- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java +++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java @@ -67,8 +67,8 @@ client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build()); LOGGER.info("Client created successfully"); int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE); - S3ClientConfig config = - new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0, writeBufferSize, false); + S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0, + writeBufferSize, S3ClientConfig.S3ParallelDownloaderClientType.ASYNC); CLOUD_CLIENT = new S3CloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java index 4b6c928..50ec1f0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java @@ -20,6 +20,7 @@ import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN; import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE; +import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT; import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER; @@ -77,9 +78,10 @@ CLOUD_STORAGE_FORCE_PATH_STYLE(BOOLEAN, false), CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false), CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false), - CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT(BOOLEAN, (Function<IApplicationConfig, Boolean>) app -> { + CLOUD_STORAGE_S3_CLIENT_READ_TIMEOUT(INTEGER, -1), + CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE(STRING, (Function<IApplicationConfig, String>) app -> { String endpoint = app.getString(CLOUD_STORAGE_ENDPOINT); - return endpoint == null || endpoint.isEmpty(); + return (endpoint == null || endpoint.isEmpty()) ? "crt" : "async"; }); private final IOptionType interpreter; @@ -196,8 +198,10 @@ case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT: return "Indicates whether or not deleted objects may be contained in list operations for some time" + "after they are deleted"; - case CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT: - return "Indicates whether or not to use the AWS CRT S3 client for async requests"; + case CLOUD_STORAGE_S3_CLIENT_READ_TIMEOUT: + return "The read timeout (in seconds) for S3 sync client (-1 means SDK default)"; + case CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE: + return "The S3 client to use for parallel downloads (crt, async or sync)"; default: throw new IllegalStateException("NYI: " + this); } @@ -215,8 +219,8 @@ @Override public String usageDefaultOverride(IApplicationConfig accessor, Function<IOption, String> optionPrinter) { - if (this == CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT) { - return "true when no custom endpoint is set, otherwise false"; + if (this == CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE) { + return "crt if no custom endpoint is set; async otherwise"; } return IOption.super.usageDefaultOverride(accessor, optionPrinter); } @@ -325,7 +329,11 @@ return accessor.getBoolean(Option.CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT); } - public boolean isS3EnableCrtClient() { - return accessor.getBoolean(Option.CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT); + public String getS3ParallelDownloaderClientType() { + return accessor.getString(Option.CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE).toUpperCase(); + } + + public int getS3ReadTimeoutInSeconds() { + return accessor.getInt(Option.CLOUD_STORAGE_S3_CLIENT_READ_TIMEOUT); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20549?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: Id16c530916e7e223201e2395d7aa38a4640367b6 Gerrit-Change-Number: 20549 Gerrit-PatchSet: 8 Gerrit-Owner: Ritik Raj <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ritik Raj <[email protected]> Gerrit-CC: Michael Blow <[email protected]>
