>From Ritik Raj <[email protected]>:

Ritik Raj has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20513?usp=email )


Change subject: [NO ISSUE][CLOUD] Async parallel downloader APIS
......................................................................

[NO ISSUE][CLOUD] Async parallel downloader APIS

Change-Id: Iaab692c8d70019175738fdb39699796cf9ef72b8
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
M 
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
4 files changed, 137 insertions(+), 59 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/13/20513/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
index 31b160d..37a8fd2 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
@@ -44,15 +44,21 @@
     private final int writeMaxRequestsPerSeconds;
     private final int readMaxRequestsPerSeconds;
     private final boolean storageDisableSSLVerify;
+    private final int requestsMaxHttpConnections;
+    private final int requestsMaxPendingHttpConnections;
+    private final int requestsHttpConnectionAcquireTimeout;

     public AzBlobStorageClientConfig(String region, String endpoint, String 
prefix, boolean anonymousAuth,
             long profilerLogInterval, String bucket, int writeBufferSize) {
-        this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, 
bucket, 1, 0, 0, writeBufferSize, false);
+        this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, 
bucket, 1, 0, 0, writeBufferSize, false,
+                1000, 10000, 120);
     }

     public AzBlobStorageClientConfig(String region, String endpoint, String 
prefix, boolean anonymousAuth,
             long profilerLogInterval, String bucket, long tokenAcquireTimeout, 
int writeMaxRequestsPerSeconds,
-            int readMaxRequestsPerSeconds, int writeBufferSize, boolean 
storageDisableSSLVerify) {
+            int readMaxRequestsPerSeconds, int writeBufferSize, boolean 
storageDisableSSLVerify,
+            int requestsMaxHttpConnections, int 
requestsMaxPendingHttpConnections,
+            int requestsHttpConnectionAcquireTimeout) {
         this.region = Objects.requireNonNull(region, "region");
         this.endpoint = endpoint;
         this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -64,6 +70,9 @@
         this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
         this.writeBufferSize = writeBufferSize;
         this.storageDisableSSLVerify = storageDisableSSLVerify;
+        this.requestsMaxHttpConnections = requestsMaxHttpConnections;
+        this.requestsMaxPendingHttpConnections = 
requestsMaxPendingHttpConnections;
+        this.requestsHttpConnectionAcquireTimeout = 
requestsHttpConnectionAcquireTimeout;
     }

     public static AzBlobStorageClientConfig of(CloudProperties 
cloudProperties) {
@@ -72,7 +81,9 @@
                 cloudProperties.getProfilerLogInterval(), 
cloudProperties.getStorageBucket(),
                 cloudProperties.getTokenAcquireTimeout(), 
cloudProperties.getWriteMaxRequestsPerSecond(),
                 cloudProperties.getReadMaxRequestsPerSecond(), 
cloudProperties.getWriteBufferSize(),
-                cloudProperties.isStorageDisableSSLVerify());
+                cloudProperties.isStorageDisableSSLVerify(), 
cloudProperties.getRequestsMaxHttpConnections(),
+                cloudProperties.getRequestsMaxPendingHttpConnections(),
+                cloudProperties.getRequestsHttpConnectionAcquireTimeout());
     }

     public static AzBlobStorageClientConfig of(Map<String, String> 
configuration, int writeBufferSize) {
@@ -138,4 +149,16 @@
     public int getWriteBufferSize() {
         return writeBufferSize;
     }
+
+    public int getRequestsMaxHttpConnections() {
+        return requestsMaxHttpConnections;
+    }
+
+    public int getRequestsMaxPendingHttpConnections() {
+        return requestsMaxPendingHttpConnections;
+    }
+
+    public int getRequestsHttpConnectionAcquireTimeout() {
+        return requestsHttpConnectionAcquireTimeout;
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
index 1cd0556..9c7497f 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
@@ -347,7 +347,7 @@

     @Override
     public IParallelDownloader createParallelDownloader(String bucket, 
IOManager ioManager) {
-        return new AzureParallelDownloader(ioManager, blobContainerClient, 
profiler, config);
+        return new AzureParallelDownloader(ioManager, 
blobContainerAsyncClient, profiler, config);
     }

     @Override
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 4980587..db66e79 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -19,16 +19,16 @@

 package org.apache.asterix.cloud.clients.azure.blobstorage;

-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.InvalidPathException;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;

 import org.apache.asterix.cloud.clients.IParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
@@ -38,85 +38,148 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

-import com.azure.core.http.rest.PagedIterable;
-import com.azure.storage.blob.BlobClient;
-import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobContainerAsyncClient;
 import com.azure.storage.blob.models.BlobItem;
 import com.azure.storage.blob.models.ListBlobsOptions;

+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class AzureParallelDownloader implements IParallelDownloader {
     public static final String STORAGE_SUB_DIR = "storage";
     private final IOManager ioManager;
-    private final BlobContainerClient blobContainerClient;
+    private final BlobContainerAsyncClient blobContainerAsyncClient;
     private final IRequestProfilerLimiter profiler;
     private final AzBlobStorageClientConfig config;
     private static final Logger LOGGER = LogManager.getLogger();

-    public AzureParallelDownloader(IOManager ioManager, BlobContainerClient 
blobContainerClient,
+    public AzureParallelDownloader(IOManager ioManager, 
BlobContainerAsyncClient blobContainerAsyncClient,
             IRequestProfilerLimiter profiler, AzBlobStorageClientConfig 
config) {
         this.ioManager = ioManager;
-        this.blobContainerClient = blobContainerClient;
+        this.blobContainerAsyncClient = blobContainerAsyncClient;
         this.profiler = profiler;
         this.config = config;
     }

     @Override
     public void downloadFiles(Collection<FileReference> toDownload) throws 
HyracksDataException {
-        for (FileReference fileReference : toDownload) {
-            BlobClient blobClient =
-                    blobContainerClient.getBlobClient(config.getPrefix() + 
fileReference.getRelativePath());
-            Path absPath = Path.of(fileReference.getAbsolutePath());
-            Path parentPath = absPath.getParent();
-            OutputStream fileOutputStream = null;
-            try {
-                createDirectories(parentPath);
-                fileOutputStream = Files.newOutputStream(absPath);
-                blobClient.downloadStream(fileOutputStream);
-                fileOutputStream.close();
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
-            } finally {
-                closeOutputStream(fileOutputStream);
-            }
+        try {
+            downloadFilesAndWait(toDownload);
+        } catch (IOException | ExecutionException | InterruptedException e) {
+            throw HyracksDataException.create(e);
         }
     }

-    private static void closeOutputStream(OutputStream fileOutputStream) 
throws HyracksDataException {
-        if (fileOutputStream != null) {
-            try {
-                fileOutputStream.close();
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
+    private void downloadFilesAndWait(Collection<FileReference> toDownload)
+            throws IOException, ExecutionException, InterruptedException {
+        List<Mono<Void>> downloads = new ArrayList<>();
+        int maxConcurrent = config.getRequestsMaxPendingHttpConnections();
+
+        for (FileReference fileReference : toDownload) {
+            profiler.objectGet();
+
+            // Create parent directories
+            Path absPath = Path.of(fileReference.getAbsolutePath());
+            Path parentPath = absPath.getParent();
+            createDirectories(parentPath);
+
+            // Create async download task using Mono
+            BlobAsyncClient blobAsyncClient =
+                    
blobContainerAsyncClient.getBlobAsyncClient(config.getPrefix() + 
fileReference.getRelativePath());
+
+            Mono<Void> downloadTask = downloadFileAsync(blobAsyncClient, 
absPath);
+            downloads.add(downloadTask);
+
+            // Control concurrency with batching
+            if (maxConcurrent > 0 && downloads.size() >= maxConcurrent) {
+                waitForFileDownloads(downloads);
+                downloads.clear();
             }
         }
+
+        // Wait for remaining downloads
+        if (!downloads.isEmpty()) {
+            waitForFileDownloads(downloads);
+        }
+    }
+
+    private Mono<Void> downloadFileAsync(BlobAsyncClient blobAsyncClient, Path 
destinationPath) {
+        return blobAsyncClient.downloadToFile(destinationPath.toString())
+                .doOnError(error -> LOGGER.error("Failed to download file: 
{}", destinationPath, error)).then();
+    }
+
+    private void waitForFileDownloads(List<Mono<Void>> downloads) throws 
ExecutionException, InterruptedException {
+        try {
+            int maxConcurrent = config.getRequestsMaxPendingHttpConnections();
+            Flux.fromIterable(downloads).flatMap(mono -> mono, maxConcurrent > 
0 ? maxConcurrent : downloads.size())
+                    .then().block();
+        } catch (Exception e) {
+            throw new ExecutionException("Error waiting for file downloads", 
e);
+        }
     }

     @Override
     public Collection<FileReference> 
downloadDirectories(Collection<FileReference> directories)
             throws HyracksDataException {
         Set<FileReference> failedFiles = new HashSet<>();
+        List<Mono<Void>> directoryDownloads = new ArrayList<>();
+
         for (FileReference directory : directories) {
-            PagedIterable<BlobItem> blobsInDir = getBlobItems(directory);
-            for (BlobItem blobItem : blobsInDir) {
-                profiler.objectGet();
-                download(blobItem, failedFiles);
-            }
+            Mono<Void> directoryTask = downloadDirectoryAsync(directory, 
failedFiles);
+            directoryDownloads.add(directoryTask);
         }
+
+        try {
+            // Wait for all directory downloads to complete using Flux
+            Flux.fromIterable(directoryDownloads).flatMap(mono -> mono, 
config.getRequestsMaxPendingHttpConnections())
+                    .then().block();
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+
         return failedFiles;
     }

-    private void download(BlobItem blobItem, Set<FileReference> failedFiles) 
throws HyracksDataException {
-        BlobClient blobClient = 
blobContainerClient.getBlobClient(blobItem.getName());
-        FileReference diskDestFile = 
ioManager.resolve(createDiskSubPath(blobItem.getName()));
-        Path absDiskBlobPath = getDiskDestPath(diskDestFile);
-        Path parentDiskPath = absDiskBlobPath.getParent();
-        createDirectories(parentDiskPath);
-        FileOutputStream outputStreamToDest = 
getOutputStreamToDest(diskDestFile);
+    private Mono<Void> downloadDirectoryAsync(FileReference directory, 
Set<FileReference> failedFiles) {
+        return getBlobItems(directory).flatMap(blobItem -> {
+            profiler.objectGet();
+            try {
+                return downloadBlobAsync(blobItem, failedFiles);
+            } catch (HyracksDataException e) {
+                return Mono.error(e);
+            }
+        }, 
config.getRequestsMaxPendingHttpConnections()).then().onErrorResume(error -> {
+            LOGGER.error("Error downloading directory: {}", 
directory.getRelativePath(), error);
+            failedFiles.add(directory);
+            return Mono.empty();
+        });
+    }
+
+    private Mono<Void> downloadBlobAsync(BlobItem blobItem, Set<FileReference> 
failedFiles)
+            throws HyracksDataException {
         try {
-            blobClient.downloadStream(outputStreamToDest);
+            BlobAsyncClient blobAsyncClient = 
blobContainerAsyncClient.getBlobAsyncClient(blobItem.getName());
+            FileReference diskDestFile = 
ioManager.resolve(createDiskSubPath(blobItem.getName()));
+            Path absDiskBlobPath = getDiskDestPath(diskDestFile);
+            Path parentDiskPath = absDiskBlobPath.getParent();
+
+            createDirectories(parentDiskPath);
+            return 
blobAsyncClient.downloadToFile(absDiskBlobPath.toString()).doOnError(error -> {
+                LOGGER.error("Error downloading blob: {}", blobItem.getName(), 
error);
+                FileReference failedFile = null;
+                try {
+                    failedFile = ioManager.resolve(blobItem.getName());
+                } catch (HyracksDataException e) {
+                    // ignore and capture all the failed blobs
+                }
+                failedFiles.add(failedFile);
+            }).then();
         } catch (Exception e) {
+            LOGGER.error("Error downloading blob: {}", blobItem.getName(), e);
             FileReference failedFile = ioManager.resolve(blobItem.getName());
             failedFiles.add(failedFile);
+            return Mono.empty();
         }
     }

@@ -127,14 +190,6 @@
         return blobName;
     }

-    private FileOutputStream getOutputStreamToDest(FileReference destFile) 
throws HyracksDataException {
-        try {
-            return new FileOutputStream(destFile.getAbsolutePath());
-        } catch (FileNotFoundException ex) {
-            throw HyracksDataException.create(ex);
-        }
-    }
-
     private void createDirectories(Path parentPath) throws 
HyracksDataException {
         if (Files.notExists(parentPath))
             try {
@@ -152,10 +207,10 @@
         }
     }

-    private PagedIterable<BlobItem> getBlobItems(FileReference 
directoryToDownload) {
+    private Flux<BlobItem> getBlobItems(FileReference directoryToDownload) {
         ListBlobsOptions listBlobsOptions =
                 new ListBlobsOptions().setPrefix(config.getPrefix() + 
directoryToDownload.getRelativePath());
-        return blobContainerClient.listBlobs(listBlobsOptions, null);
+        return blobContainerAsyncClient.listBlobs(listBlobsOptions);
     }

     @Override
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
index 44b4856..ec76b8f 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
@@ -117,7 +117,7 @@
         URI blobStore = URI.create(blobServiceClient.getAccountUrl());
         String endpoint = blobStore.getScheme() + "://" + 
blobStore.getAuthority() + "/devstoreaccount1";
         AzBlobStorageClientConfig config = new 
AzBlobStorageClientConfig(MOCK_SERVER_REGION, endpoint, "", false, 0,
-                PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize, true);
+                PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize, true, 1000, 
10000, 120);
         CLOUD_CLIENT = new AzBlobStorageCloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20513?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Iaab692c8d70019175738fdb39699796cf9ef72b8
Gerrit-Change-Number: 20513
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>

Reply via email to