>From Ritik Raj <[email protected]>: Ritik Raj has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20507?usp=email )
Change subject: [NO ISSUE][CLOUD] Updating deleteObjects for AzureBlob ...................................................................... [NO ISSUE][CLOUD] Updating deleteObjects for AzureBlob - user model changes: no - storage format changes: no - interface changes: no Details: Replacing the expensive LIST call while bulk deleting with the async delete calls Ext-ref: MB-68999 Change-Id: Ie66fe1c9d95684af0c79b9387194f4bba01891a6 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20507 Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java 2 files changed, 35 insertions(+), 66 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java index 889a6a4..31b160d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java @@ -30,7 +30,7 @@ public class AzBlobStorageClientConfig { // Ref: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id - static final int DELETE_BATCH_SIZE = 256; + static final int MAX_CONCURRENT_REQUESTS = 20; private final int writeBufferSize; private final String region; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java index 4a61f1c..0bd16a8 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java @@ -19,7 +19,7 @@ package org.apache.asterix.cloud.clients.azure.blobstorage; -import static org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig.DELETE_BATCH_SIZE; +import static org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig.MAX_CONCURRENT_REQUESTS; import java.io.ByteArrayOutputStream; import java.io.FilenameFilter; @@ -31,7 +31,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.function.Predicate; @@ -60,14 +59,14 @@ import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; import com.azure.core.http.rest.PagedIterable; -import com.azure.core.http.rest.Response; import com.azure.core.util.BinaryData; +import com.azure.storage.blob.BlobAsyncClient; import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceAsyncClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; -import com.azure.storage.blob.batch.BlobBatchClient; -import com.azure.storage.blob.batch.BlobBatchClientBuilder; import com.azure.storage.blob.models.BlobErrorCode; import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.BlobListDetails; @@ -83,6 +82,8 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; public class AzBlobStorageCloudClient implements ICloudClient { @@ -91,12 +92,11 @@ private static final String AZURITE_ACCOUNT_NAME = "devstoreaccount1"; private static final String AZURITE_ACCOUNT_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; - private static final int SUCCESS_RESPONSE_CODE = 202; private final ICloudGuardian guardian; private final BlobContainerClient blobContainerClient; + private final BlobContainerAsyncClient blobContainerAsyncClient; private final AzBlobStorageClientConfig config; private final IRequestProfilerLimiter profiler; - private final BlobBatchClient blobBatchClient; private static final Logger LOGGER = LogManager.getLogger(); public AzBlobStorageCloudClient(AzBlobStorageClientConfig config, ICloudGuardian guardian) { @@ -106,6 +106,7 @@ public AzBlobStorageCloudClient(AzBlobStorageClientConfig config, BlobServiceClient blobServiceClient, ICloudGuardian guardian) { this.blobContainerClient = blobServiceClient.getBlobContainerClient(config.getBucket()); + this.blobContainerAsyncClient = buildAsyncClient(config).getBlobContainerAsyncClient(config.getBucket()); this.config = config; this.guardian = guardian; long profilerInterval = config.getProfilerLogInterval(); @@ -116,7 +117,6 @@ profiler = new RequestLimiterNoOpProfiler(limiter); } guardian.setCloudClient(this); - blobBatchClient = new BlobBatchClientBuilder(blobContainerClient.getServiceClient()).buildClient(); } @Override @@ -277,59 +277,25 @@ public void deleteObjects(String bucket, Collection<String> paths) throws HyracksDataException { if (paths.isEmpty()) return; - Set<BlobItem> blobsToDelete = getBlobsMatchingThesePaths(paths); - List<String> blobURLs = getBlobURLs(blobsToDelete); - if (blobURLs.isEmpty()) + + List<Mono<Boolean>> deleteMonos = new ArrayList<>(); + for (String path : paths) { + if (path != null && !path.isEmpty()) { + BlobAsyncClient blobAsyncClient = + blobContainerAsyncClient.getBlobAsyncClient(config.getPrefix() + path); + deleteMonos.add(blobAsyncClient.deleteIfExists()); + } + } + + if (deleteMonos.isEmpty()) { return; - Collection<List<String>> batchedBlobURLs = getBatchedBlobURLs(blobURLs); - for (List<String> batch : batchedBlobURLs) { - PagedIterable<Response<Void>> responses = blobBatchClient.deleteBlobs(batch, null); - Iterator<String> deletePathIter = paths.iterator(); - String deletedPath; - String failedDeletedPath = null; - for (Response<Void> response : responses) { - deletedPath = deletePathIter.next(); - // The response.getStatusCode() method returns: - // - 202 (Accepted) if the delete operation is successful - // - exception if the delete operation fails - int statusCode = response.getStatusCode(); - if (statusCode != SUCCESS_RESPONSE_CODE) { - LOGGER.warn("Failed to delete blob: {} with status code: {} while deleting {}", deletedPath, - statusCode, paths.toString()); - if (failedDeletedPath == null) { - failedDeletedPath = deletedPath; - } - } - } - if (failedDeletedPath != null) { - throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, "DELETE", failedDeletedPath, - paths.toString()); - } } - } - private Collection<List<String>> getBatchedBlobURLs(List<String> blobURLs) { - int startIdx = 0; - Collection<List<String>> batchedBLOBURLs = new ArrayList<>(); - Iterator<String> iterator = blobURLs.iterator(); - while (iterator.hasNext()) { - List<String> batch = new ArrayList<>(); - while (startIdx < DELETE_BATCH_SIZE && iterator.hasNext()) { - batch.add(iterator.next()); - startIdx++; - } - batchedBLOBURLs.add(batch); - startIdx = 0; + try { + Flux.fromIterable(deleteMonos).flatMap(mono -> mono, MAX_CONCURRENT_REQUESTS).then().block(); + } catch (Exception ex) { + throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, "DELETE", ex, paths.toString()); } - return batchedBLOBURLs; - } - - private Set<BlobItem> getBlobsMatchingThesePaths(Collection<String> paths) { - List<String> pathWithPrefix = - paths.stream().map(path -> config.getPrefix() + path).collect(Collectors.toList()); - PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs(); - return blobItems.stream().filter(blobItem -> pathWithPrefix.contains(blobItem.getName())) - .collect(Collectors.toSet()); } @Override @@ -423,6 +389,16 @@ } private static BlobServiceClient buildClient(AzBlobStorageClientConfig config) { + BlobServiceClientBuilder blobServiceClientBuilder = getBlobServiceClientBuilder(config); + return blobServiceClientBuilder.buildClient(); + } + + private static BlobServiceAsyncClient buildAsyncClient(AzBlobStorageClientConfig config) { + BlobServiceClientBuilder blobServiceClientBuilder = getBlobServiceClientBuilder(config); + return blobServiceClientBuilder.buildAsyncClient(); + } + + private static BlobServiceClientBuilder getBlobServiceClientBuilder(AzBlobStorageClientConfig config) { BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder(); blobServiceClientBuilder.endpoint(getEndpoint(config)); blobServiceClientBuilder.httpLogOptions(AzureConstants.HTTP_LOG_OPTIONS); @@ -444,8 +420,7 @@ throw new RuntimeException("Failed to disable SSL verification", e); } } - - return blobServiceClientBuilder.buildClient(); + return blobServiceClientBuilder; } private static void configCredentialsToAzClient(BlobServiceClientBuilder builder, @@ -468,10 +443,4 @@ return config.isAnonymousAuth() ? AZURITE_ENDPOINT + config.getBucket() : config.getEndpoint() + "/" + config.getBucket(); } - - private List<String> getBlobURLs(Set<BlobItem> blobs) { - final String blobURLPrefix = blobContainerClient.getBlobContainerUrl() + "/"; - return blobs.stream().map(BlobItem::getName).map(blobName -> blobURLPrefix + blobName) - .collect(Collectors.toList()); - } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20507?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: Ie66fe1c9d95684af0c79b9387194f4bba01891a6 Gerrit-Change-Number: 20507 Gerrit-PatchSet: 2 Gerrit-Owner: Ritik Raj <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ritik Raj <[email protected]>
