>From Ritik Raj <[email protected]>:

Ritik Raj has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20507?usp=email )


Change subject: [NO ISSUE][CLOUD] Updating deleteObjects for AzureBlob
......................................................................

[NO ISSUE][CLOUD] Updating deleteObjects for AzureBlob

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Replacing the expensive LIST call while bulk deleting
with the async delete calls

Ext-ref: MB-68999
Change-Id: Ie66fe1c9d95684af0c79b9387194f4bba01891a6
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
2 files changed, 35 insertions(+), 66 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/07/20507/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
index 889a6a4..31b160d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
@@ -30,7 +30,7 @@

 public class AzBlobStorageClientConfig {
     // Ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
-    static final int DELETE_BATCH_SIZE = 256;
+    static final int MAX_CONCURRENT_REQUESTS = 20;

     private final int writeBufferSize;
     private final String region;
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
index 4a61f1c..0bd16a8 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
@@ -19,7 +19,7 @@

 package org.apache.asterix.cloud.clients.azure.blobstorage;

-import static 
org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig.DELETE_BATCH_SIZE;
+import static 
org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig.MAX_CONCURRENT_REQUESTS;

 import java.io.ByteArrayOutputStream;
 import java.io.FilenameFilter;
@@ -31,7 +31,6 @@
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Predicate;
@@ -60,14 +59,14 @@

 import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
 import com.azure.core.http.rest.PagedIterable;
-import com.azure.core.http.rest.Response;
 import com.azure.core.util.BinaryData;
+import com.azure.storage.blob.BlobAsyncClient;
 import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerAsyncClient;
 import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceAsyncClient;
 import com.azure.storage.blob.BlobServiceClient;
 import com.azure.storage.blob.BlobServiceClientBuilder;
-import com.azure.storage.blob.batch.BlobBatchClient;
-import com.azure.storage.blob.batch.BlobBatchClientBuilder;
 import com.azure.storage.blob.models.BlobErrorCode;
 import com.azure.storage.blob.models.BlobItem;
 import com.azure.storage.blob.models.BlobListDetails;
@@ -83,6 +82,8 @@
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 import reactor.netty.http.client.HttpClient;

 public class AzBlobStorageCloudClient implements ICloudClient {
@@ -91,12 +92,11 @@
     private static final String AZURITE_ACCOUNT_NAME = "devstoreaccount1";
     private static final String AZURITE_ACCOUNT_KEY =
             
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
-    private static final int SUCCESS_RESPONSE_CODE = 202;
     private final ICloudGuardian guardian;
     private final BlobContainerClient blobContainerClient;
+    private final BlobContainerAsyncClient blobContainerAsyncClient;
     private final AzBlobStorageClientConfig config;
     private final IRequestProfilerLimiter profiler;
-    private final BlobBatchClient blobBatchClient;
     private static final Logger LOGGER = LogManager.getLogger();

     public AzBlobStorageCloudClient(AzBlobStorageClientConfig config, 
ICloudGuardian guardian) {
@@ -106,6 +106,7 @@
     public AzBlobStorageCloudClient(AzBlobStorageClientConfig config, 
BlobServiceClient blobServiceClient,
             ICloudGuardian guardian) {
         this.blobContainerClient = 
blobServiceClient.getBlobContainerClient(config.getBucket());
+        this.blobContainerAsyncClient = 
buildAsyncClient(config).getBlobContainerAsyncClient(config.getBucket());
         this.config = config;
         this.guardian = guardian;
         long profilerInterval = config.getProfilerLogInterval();
@@ -116,7 +117,6 @@
             profiler = new RequestLimiterNoOpProfiler(limiter);
         }
         guardian.setCloudClient(this);
-        blobBatchClient = new 
BlobBatchClientBuilder(blobContainerClient.getServiceClient()).buildClient();
     }

     @Override
@@ -277,59 +277,25 @@
     public void deleteObjects(String bucket, Collection<String> paths) throws 
HyracksDataException {
         if (paths.isEmpty())
             return;
-        Set<BlobItem> blobsToDelete = getBlobsMatchingThesePaths(paths);
-        List<String> blobURLs = getBlobURLs(blobsToDelete);
-        if (blobURLs.isEmpty())
+
+        List<Mono<Boolean>> deleteMonos = new ArrayList<>();
+        for (String path : paths) {
+            if (path != null && !path.isEmpty()) {
+                BlobAsyncClient blobAsyncClient =
+                        
blobContainerAsyncClient.getBlobAsyncClient(config.getPrefix() + path);
+                deleteMonos.add(blobAsyncClient.deleteIfExists());
+            }
+        }
+
+        if (deleteMonos.isEmpty()) {
             return;
-        Collection<List<String>> batchedBlobURLs = 
getBatchedBlobURLs(blobURLs);
-        for (List<String> batch : batchedBlobURLs) {
-            PagedIterable<Response<Void>> responses = 
blobBatchClient.deleteBlobs(batch, null);
-            Iterator<String> deletePathIter = paths.iterator();
-            String deletedPath;
-            String failedDeletedPath = null;
-            for (Response<Void> response : responses) {
-                deletedPath = deletePathIter.next();
-                // The response.getStatusCode() method returns:
-                // - 202 (Accepted) if the delete operation is successful
-                // - exception if the delete operation fails
-                int statusCode = response.getStatusCode();
-                if (statusCode != SUCCESS_RESPONSE_CODE) {
-                    LOGGER.warn("Failed to delete blob: {} with status code: 
{} while deleting {}", deletedPath,
-                            statusCode, paths.toString());
-                    if (failedDeletedPath == null) {
-                        failedDeletedPath = deletedPath;
-                    }
-                }
-            }
-            if (failedDeletedPath != null) {
-                throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, 
"DELETE", failedDeletedPath,
-                        paths.toString());
-            }
         }
-    }

-    private Collection<List<String>> getBatchedBlobURLs(List<String> blobURLs) 
{
-        int startIdx = 0;
-        Collection<List<String>> batchedBLOBURLs = new ArrayList<>();
-        Iterator<String> iterator = blobURLs.iterator();
-        while (iterator.hasNext()) {
-            List<String> batch = new ArrayList<>();
-            while (startIdx < DELETE_BATCH_SIZE && iterator.hasNext()) {
-                batch.add(iterator.next());
-                startIdx++;
-            }
-            batchedBLOBURLs.add(batch);
-            startIdx = 0;
+        try {
+            Flux.fromIterable(deleteMonos).flatMap(mono -> mono, 
MAX_CONCURRENT_REQUESTS).then().block();
+        } catch (Exception ex) {
+            throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, 
"DELETE", ex, paths.toString());
         }
-        return batchedBLOBURLs;
-    }
-
-    private Set<BlobItem> getBlobsMatchingThesePaths(Collection<String> paths) 
{
-        List<String> pathWithPrefix =
-                paths.stream().map(path -> config.getPrefix() + 
path).collect(Collectors.toList());
-        PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs();
-        return blobItems.stream().filter(blobItem -> 
pathWithPrefix.contains(blobItem.getName()))
-                .collect(Collectors.toSet());
     }

     @Override
@@ -423,6 +389,16 @@
     }

     private static BlobServiceClient buildClient(AzBlobStorageClientConfig 
config) {
+        BlobServiceClientBuilder blobServiceClientBuilder = 
getBlobServiceClientBuilder(config);
+        return blobServiceClientBuilder.buildClient();
+    }
+
+    private static BlobServiceAsyncClient 
buildAsyncClient(AzBlobStorageClientConfig config) {
+        BlobServiceClientBuilder blobServiceClientBuilder = 
getBlobServiceClientBuilder(config);
+        return blobServiceClientBuilder.buildAsyncClient();
+    }
+
+    private static BlobServiceClientBuilder 
getBlobServiceClientBuilder(AzBlobStorageClientConfig config) {
         BlobServiceClientBuilder blobServiceClientBuilder = new 
BlobServiceClientBuilder();
         blobServiceClientBuilder.endpoint(getEndpoint(config));
         
blobServiceClientBuilder.httpLogOptions(AzureConstants.HTTP_LOG_OPTIONS);
@@ -444,8 +420,7 @@
                 throw new RuntimeException("Failed to disable SSL 
verification", e);
             }
         }
-
-        return blobServiceClientBuilder.buildClient();
+        return blobServiceClientBuilder;
     }

     private static void configCredentialsToAzClient(BlobServiceClientBuilder 
builder,
@@ -468,10 +443,4 @@
         return config.isAnonymousAuth() ? AZURITE_ENDPOINT + config.getBucket()
                 : config.getEndpoint() + "/" + config.getBucket();
     }
-
-    private List<String> getBlobURLs(Set<BlobItem> blobs) {
-        final String blobURLPrefix = blobContainerClient.getBlobContainerUrl() 
+ "/";
-        return blobs.stream().map(BlobItem::getName).map(blobName -> 
blobURLPrefix + blobName)
-                .collect(Collectors.toList());
-    }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20507?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Ie66fe1c9d95684af0c79b9387194f4bba01891a6
Gerrit-Change-Number: 20507
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>

Reply via email to