>From Ritik Raj <[email protected]>:
Ritik Raj has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20513?usp=email )
Change subject: [NO ISSUE][CLOUD] Async parallel downloader APIS
......................................................................
[NO ISSUE][CLOUD] Async parallel downloader APIS
Change-Id: Iaab692c8d70019175738fdb39699796cf9ef72b8
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
M
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
4 files changed, 137 insertions(+), 59 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/13/20513/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
index 31b160d..37a8fd2 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
@@ -44,15 +44,21 @@
private final int writeMaxRequestsPerSeconds;
private final int readMaxRequestsPerSeconds;
private final boolean storageDisableSSLVerify;
+ private final int requestsMaxHttpConnections;
+ private final int requestsMaxPendingHttpConnections;
+ private final int requestsHttpConnectionAcquireTimeout;
public AzBlobStorageClientConfig(String region, String endpoint, String
prefix, boolean anonymousAuth,
long profilerLogInterval, String bucket, int writeBufferSize) {
- this(region, endpoint, prefix, anonymousAuth, profilerLogInterval,
bucket, 1, 0, 0, writeBufferSize, false);
+ this(region, endpoint, prefix, anonymousAuth, profilerLogInterval,
bucket, 1, 0, 0, writeBufferSize, false,
+ 1000, 10000, 120);
}
public AzBlobStorageClientConfig(String region, String endpoint, String
prefix, boolean anonymousAuth,
long profilerLogInterval, String bucket, long tokenAcquireTimeout,
int writeMaxRequestsPerSeconds,
- int readMaxRequestsPerSeconds, int writeBufferSize, boolean
storageDisableSSLVerify) {
+ int readMaxRequestsPerSeconds, int writeBufferSize, boolean
storageDisableSSLVerify,
+ int requestsMaxHttpConnections, int
requestsMaxPendingHttpConnections,
+ int requestsHttpConnectionAcquireTimeout) {
this.region = Objects.requireNonNull(region, "region");
this.endpoint = endpoint;
this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -64,6 +70,9 @@
this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
this.writeBufferSize = writeBufferSize;
this.storageDisableSSLVerify = storageDisableSSLVerify;
+ this.requestsMaxHttpConnections = requestsMaxHttpConnections;
+ this.requestsMaxPendingHttpConnections =
requestsMaxPendingHttpConnections;
+ this.requestsHttpConnectionAcquireTimeout =
requestsHttpConnectionAcquireTimeout;
}
public static AzBlobStorageClientConfig of(CloudProperties
cloudProperties) {
@@ -72,7 +81,9 @@
cloudProperties.getProfilerLogInterval(),
cloudProperties.getStorageBucket(),
cloudProperties.getTokenAcquireTimeout(),
cloudProperties.getWriteMaxRequestsPerSecond(),
cloudProperties.getReadMaxRequestsPerSecond(),
cloudProperties.getWriteBufferSize(),
- cloudProperties.isStorageDisableSSLVerify());
+ cloudProperties.isStorageDisableSSLVerify(),
cloudProperties.getRequestsMaxHttpConnections(),
+ cloudProperties.getRequestsMaxPendingHttpConnections(),
+ cloudProperties.getRequestsHttpConnectionAcquireTimeout());
}
public static AzBlobStorageClientConfig of(Map<String, String>
configuration, int writeBufferSize) {
@@ -138,4 +149,16 @@
public int getWriteBufferSize() {
return writeBufferSize;
}
+
+ public int getRequestsMaxHttpConnections() {
+ return requestsMaxHttpConnections;
+ }
+
+ public int getRequestsMaxPendingHttpConnections() {
+ return requestsMaxPendingHttpConnections;
+ }
+
+ public int getRequestsHttpConnectionAcquireTimeout() {
+ return requestsHttpConnectionAcquireTimeout;
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
index 1cd0556..9c7497f 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
@@ -347,7 +347,7 @@
@Override
public IParallelDownloader createParallelDownloader(String bucket,
IOManager ioManager) {
- return new AzureParallelDownloader(ioManager, blobContainerClient,
profiler, config);
+ return new AzureParallelDownloader(ioManager,
blobContainerAsyncClient, profiler, config);
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 4980587..db66e79 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -19,16 +19,16 @@
package org.apache.asterix.cloud.clients.azure.blobstorage;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import org.apache.asterix.cloud.clients.IParallelDownloader;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
@@ -38,85 +38,148 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import com.azure.core.http.rest.PagedIterable;
-import com.azure.storage.blob.BlobClient;
-import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.ListBlobsOptions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
public class AzureParallelDownloader implements IParallelDownloader {
public static final String STORAGE_SUB_DIR = "storage";
private final IOManager ioManager;
- private final BlobContainerClient blobContainerClient;
+ private final BlobContainerAsyncClient blobContainerAsyncClient;
private final IRequestProfilerLimiter profiler;
private final AzBlobStorageClientConfig config;
private static final Logger LOGGER = LogManager.getLogger();
- public AzureParallelDownloader(IOManager ioManager, BlobContainerClient
blobContainerClient,
+ public AzureParallelDownloader(IOManager ioManager,
BlobContainerAsyncClient blobContainerAsyncClient,
IRequestProfilerLimiter profiler, AzBlobStorageClientConfig
config) {
this.ioManager = ioManager;
- this.blobContainerClient = blobContainerClient;
+ this.blobContainerAsyncClient = blobContainerAsyncClient;
this.profiler = profiler;
this.config = config;
}
@Override
public void downloadFiles(Collection<FileReference> toDownload) throws
HyracksDataException {
- for (FileReference fileReference : toDownload) {
- BlobClient blobClient =
- blobContainerClient.getBlobClient(config.getPrefix() +
fileReference.getRelativePath());
- Path absPath = Path.of(fileReference.getAbsolutePath());
- Path parentPath = absPath.getParent();
- OutputStream fileOutputStream = null;
- try {
- createDirectories(parentPath);
- fileOutputStream = Files.newOutputStream(absPath);
- blobClient.downloadStream(fileOutputStream);
- fileOutputStream.close();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- } finally {
- closeOutputStream(fileOutputStream);
- }
+ try {
+ downloadFilesAndWait(toDownload);
+ } catch (IOException | ExecutionException | InterruptedException e) {
+ throw HyracksDataException.create(e);
}
}
- private static void closeOutputStream(OutputStream fileOutputStream)
throws HyracksDataException {
- if (fileOutputStream != null) {
- try {
- fileOutputStream.close();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
+ private void downloadFilesAndWait(Collection<FileReference> toDownload)
+ throws IOException, ExecutionException, InterruptedException {
+ List<Mono<Void>> downloads = new ArrayList<>();
+ int maxConcurrent = config.getRequestsMaxPendingHttpConnections();
+
+ for (FileReference fileReference : toDownload) {
+ profiler.objectGet();
+
+ // Create parent directories
+ Path absPath = Path.of(fileReference.getAbsolutePath());
+ Path parentPath = absPath.getParent();
+ createDirectories(parentPath);
+
+ // Create async download task using Mono
+ BlobAsyncClient blobAsyncClient =
+
blobContainerAsyncClient.getBlobAsyncClient(config.getPrefix() +
fileReference.getRelativePath());
+
+ Mono<Void> downloadTask = downloadFileAsync(blobAsyncClient,
absPath);
+ downloads.add(downloadTask);
+
+ // Control concurrency with batching
+ if (maxConcurrent > 0 && downloads.size() >= maxConcurrent) {
+ waitForFileDownloads(downloads);
+ downloads.clear();
}
}
+
+ // Wait for remaining downloads
+ if (!downloads.isEmpty()) {
+ waitForFileDownloads(downloads);
+ }
+ }
+
+ private Mono<Void> downloadFileAsync(BlobAsyncClient blobAsyncClient, Path
destinationPath) {
+ return blobAsyncClient.downloadToFile(destinationPath.toString())
+ .doOnError(error -> LOGGER.error("Failed to download file:
{}", destinationPath, error)).then();
+ }
+
+ private void waitForFileDownloads(List<Mono<Void>> downloads) throws
ExecutionException, InterruptedException {
+ try {
+ int maxConcurrent = config.getRequestsMaxPendingHttpConnections();
+ Flux.fromIterable(downloads).flatMap(mono -> mono, maxConcurrent >
0 ? maxConcurrent : downloads.size())
+ .then().block();
+ } catch (Exception e) {
+ throw new ExecutionException("Error waiting for file downloads",
e);
+ }
}
@Override
public Collection<FileReference>
downloadDirectories(Collection<FileReference> directories)
throws HyracksDataException {
Set<FileReference> failedFiles = new HashSet<>();
+ List<Mono<Void>> directoryDownloads = new ArrayList<>();
+
for (FileReference directory : directories) {
- PagedIterable<BlobItem> blobsInDir = getBlobItems(directory);
- for (BlobItem blobItem : blobsInDir) {
- profiler.objectGet();
- download(blobItem, failedFiles);
- }
+ Mono<Void> directoryTask = downloadDirectoryAsync(directory,
failedFiles);
+ directoryDownloads.add(directoryTask);
}
+
+ try {
+ // Wait for all directory downloads to complete using Flux
+ Flux.fromIterable(directoryDownloads).flatMap(mono -> mono,
config.getRequestsMaxPendingHttpConnections())
+ .then().block();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+
return failedFiles;
}
- private void download(BlobItem blobItem, Set<FileReference> failedFiles)
throws HyracksDataException {
- BlobClient blobClient =
blobContainerClient.getBlobClient(blobItem.getName());
- FileReference diskDestFile =
ioManager.resolve(createDiskSubPath(blobItem.getName()));
- Path absDiskBlobPath = getDiskDestPath(diskDestFile);
- Path parentDiskPath = absDiskBlobPath.getParent();
- createDirectories(parentDiskPath);
- FileOutputStream outputStreamToDest =
getOutputStreamToDest(diskDestFile);
+ private Mono<Void> downloadDirectoryAsync(FileReference directory,
Set<FileReference> failedFiles) {
+ return getBlobItems(directory).flatMap(blobItem -> {
+ profiler.objectGet();
+ try {
+ return downloadBlobAsync(blobItem, failedFiles);
+ } catch (HyracksDataException e) {
+ return Mono.error(e);
+ }
+ },
config.getRequestsMaxPendingHttpConnections()).then().onErrorResume(error -> {
+ LOGGER.error("Error downloading directory: {}",
directory.getRelativePath(), error);
+ failedFiles.add(directory);
+ return Mono.empty();
+ });
+ }
+
+ private Mono<Void> downloadBlobAsync(BlobItem blobItem, Set<FileReference>
failedFiles)
+ throws HyracksDataException {
try {
- blobClient.downloadStream(outputStreamToDest);
+ BlobAsyncClient blobAsyncClient =
blobContainerAsyncClient.getBlobAsyncClient(blobItem.getName());
+ FileReference diskDestFile =
ioManager.resolve(createDiskSubPath(blobItem.getName()));
+ Path absDiskBlobPath = getDiskDestPath(diskDestFile);
+ Path parentDiskPath = absDiskBlobPath.getParent();
+
+ createDirectories(parentDiskPath);
+ return
blobAsyncClient.downloadToFile(absDiskBlobPath.toString()).doOnError(error -> {
+ LOGGER.error("Error downloading blob: {}", blobItem.getName(),
error);
+ FileReference failedFile = null;
+ try {
+ failedFile = ioManager.resolve(blobItem.getName());
+ } catch (HyracksDataException e) {
+ // ignore and capture all the failed blobs
+ }
+ failedFiles.add(failedFile);
+ }).then();
} catch (Exception e) {
+ LOGGER.error("Error downloading blob: {}", blobItem.getName(), e);
FileReference failedFile = ioManager.resolve(blobItem.getName());
failedFiles.add(failedFile);
+ return Mono.empty();
}
}
@@ -127,14 +190,6 @@
return blobName;
}
- private FileOutputStream getOutputStreamToDest(FileReference destFile)
throws HyracksDataException {
- try {
- return new FileOutputStream(destFile.getAbsolutePath());
- } catch (FileNotFoundException ex) {
- throw HyracksDataException.create(ex);
- }
- }
-
private void createDirectories(Path parentPath) throws
HyracksDataException {
if (Files.notExists(parentPath))
try {
@@ -152,10 +207,10 @@
}
}
- private PagedIterable<BlobItem> getBlobItems(FileReference
directoryToDownload) {
+ private Flux<BlobItem> getBlobItems(FileReference directoryToDownload) {
ListBlobsOptions listBlobsOptions =
new ListBlobsOptions().setPrefix(config.getPrefix() +
directoryToDownload.getRelativePath());
- return blobContainerClient.listBlobs(listBlobsOptions, null);
+ return blobContainerAsyncClient.listBlobs(listBlobsOptions);
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
index 44b4856..ec76b8f 100644
---
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
@@ -117,7 +117,7 @@
URI blobStore = URI.create(blobServiceClient.getAccountUrl());
String endpoint = blobStore.getScheme() + "://" +
blobStore.getAuthority() + "/devstoreaccount1";
AzBlobStorageClientConfig config = new
AzBlobStorageClientConfig(MOCK_SERVER_REGION, endpoint, "", false, 0,
- PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize, true);
+ PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize, true, 1000,
10000, 120);
CLOUD_CLIENT = new AzBlobStorageCloudClient(config,
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20513?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Iaab692c8d70019175738fdb39699796cf9ef72b8
Gerrit-Change-Number: 20513
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>