>From Ritik Raj <[email protected]>:
Ritik Raj has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20507?usp=email )
Change subject: [NO ISSUE][CLOUD] Updating deleteObjects for AzureBlob
......................................................................
[NO ISSUE][CLOUD] Updating deleteObjects for AzureBlob
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Replacing the expensive LIST call while bulk deleting
with the async delete calls
Ext-ref: MB-68999
Change-Id: Ie66fe1c9d95684af0c79b9387194f4bba01891a6
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
2 files changed, 35 insertions(+), 66 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/07/20507/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
index 889a6a4..31b160d 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
@@ -30,7 +30,7 @@
public class AzBlobStorageClientConfig {
// Ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
- static final int DELETE_BATCH_SIZE = 256;
+ static final int MAX_CONCURRENT_REQUESTS = 20;
private final int writeBufferSize;
private final String region;
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
index 4a61f1c..0bd16a8 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
@@ -19,7 +19,7 @@
package org.apache.asterix.cloud.clients.azure.blobstorage;
-import static
org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig.DELETE_BATCH_SIZE;
+import static
org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig.MAX_CONCURRENT_REQUESTS;
import java.io.ByteArrayOutputStream;
import java.io.FilenameFilter;
@@ -31,7 +31,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
@@ -60,14 +59,14 @@
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.http.rest.PagedIterable;
-import com.azure.core.http.rest.Response;
import com.azure.core.util.BinaryData;
+import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
-import com.azure.storage.blob.batch.BlobBatchClient;
-import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
@@ -83,6 +82,8 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
public class AzBlobStorageCloudClient implements ICloudClient {
@@ -91,12 +92,11 @@
private static final String AZURITE_ACCOUNT_NAME = "devstoreaccount1";
private static final String AZURITE_ACCOUNT_KEY =
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
- private static final int SUCCESS_RESPONSE_CODE = 202;
private final ICloudGuardian guardian;
private final BlobContainerClient blobContainerClient;
+ private final BlobContainerAsyncClient blobContainerAsyncClient;
private final AzBlobStorageClientConfig config;
private final IRequestProfilerLimiter profiler;
- private final BlobBatchClient blobBatchClient;
private static final Logger LOGGER = LogManager.getLogger();
public AzBlobStorageCloudClient(AzBlobStorageClientConfig config,
ICloudGuardian guardian) {
@@ -106,6 +106,7 @@
public AzBlobStorageCloudClient(AzBlobStorageClientConfig config,
BlobServiceClient blobServiceClient,
ICloudGuardian guardian) {
this.blobContainerClient =
blobServiceClient.getBlobContainerClient(config.getBucket());
+ this.blobContainerAsyncClient =
buildAsyncClient(config).getBlobContainerAsyncClient(config.getBucket());
this.config = config;
this.guardian = guardian;
long profilerInterval = config.getProfilerLogInterval();
@@ -116,7 +117,6 @@
profiler = new RequestLimiterNoOpProfiler(limiter);
}
guardian.setCloudClient(this);
- blobBatchClient = new
BlobBatchClientBuilder(blobContainerClient.getServiceClient()).buildClient();
}
@Override
@@ -277,59 +277,25 @@
public void deleteObjects(String bucket, Collection<String> paths) throws
HyracksDataException {
if (paths.isEmpty())
return;
- Set<BlobItem> blobsToDelete = getBlobsMatchingThesePaths(paths);
- List<String> blobURLs = getBlobURLs(blobsToDelete);
- if (blobURLs.isEmpty())
+
+ List<Mono<Boolean>> deleteMonos = new ArrayList<>();
+ for (String path : paths) {
+ if (path != null && !path.isEmpty()) {
+ BlobAsyncClient blobAsyncClient =
+
blobContainerAsyncClient.getBlobAsyncClient(config.getPrefix() + path);
+ deleteMonos.add(blobAsyncClient.deleteIfExists());
+ }
+ }
+
+ if (deleteMonos.isEmpty()) {
return;
- Collection<List<String>> batchedBlobURLs =
getBatchedBlobURLs(blobURLs);
- for (List<String> batch : batchedBlobURLs) {
- PagedIterable<Response<Void>> responses =
blobBatchClient.deleteBlobs(batch, null);
- Iterator<String> deletePathIter = paths.iterator();
- String deletedPath;
- String failedDeletedPath = null;
- for (Response<Void> response : responses) {
- deletedPath = deletePathIter.next();
- // The response.getStatusCode() method returns:
- // - 202 (Accepted) if the delete operation is successful
- // - exception if the delete operation fails
- int statusCode = response.getStatusCode();
- if (statusCode != SUCCESS_RESPONSE_CODE) {
- LOGGER.warn("Failed to delete blob: {} with status code:
{} while deleting {}", deletedPath,
- statusCode, paths.toString());
- if (failedDeletedPath == null) {
- failedDeletedPath = deletedPath;
- }
- }
- }
- if (failedDeletedPath != null) {
- throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE,
"DELETE", failedDeletedPath,
- paths.toString());
- }
}
- }
- private Collection<List<String>> getBatchedBlobURLs(List<String> blobURLs)
{
- int startIdx = 0;
- Collection<List<String>> batchedBLOBURLs = new ArrayList<>();
- Iterator<String> iterator = blobURLs.iterator();
- while (iterator.hasNext()) {
- List<String> batch = new ArrayList<>();
- while (startIdx < DELETE_BATCH_SIZE && iterator.hasNext()) {
- batch.add(iterator.next());
- startIdx++;
- }
- batchedBLOBURLs.add(batch);
- startIdx = 0;
+ try {
+ Flux.fromIterable(deleteMonos).flatMap(mono -> mono,
MAX_CONCURRENT_REQUESTS).then().block();
+ } catch (Exception ex) {
+ throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE,
"DELETE", ex, paths.toString());
}
- return batchedBLOBURLs;
- }
-
- private Set<BlobItem> getBlobsMatchingThesePaths(Collection<String> paths)
{
- List<String> pathWithPrefix =
- paths.stream().map(path -> config.getPrefix() +
path).collect(Collectors.toList());
- PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs();
- return blobItems.stream().filter(blobItem ->
pathWithPrefix.contains(blobItem.getName()))
- .collect(Collectors.toSet());
}
@Override
@@ -423,6 +389,16 @@
}
private static BlobServiceClient buildClient(AzBlobStorageClientConfig
config) {
+ BlobServiceClientBuilder blobServiceClientBuilder =
getBlobServiceClientBuilder(config);
+ return blobServiceClientBuilder.buildClient();
+ }
+
+ private static BlobServiceAsyncClient
buildAsyncClient(AzBlobStorageClientConfig config) {
+ BlobServiceClientBuilder blobServiceClientBuilder =
getBlobServiceClientBuilder(config);
+ return blobServiceClientBuilder.buildAsyncClient();
+ }
+
+ private static BlobServiceClientBuilder
getBlobServiceClientBuilder(AzBlobStorageClientConfig config) {
BlobServiceClientBuilder blobServiceClientBuilder = new
BlobServiceClientBuilder();
blobServiceClientBuilder.endpoint(getEndpoint(config));
blobServiceClientBuilder.httpLogOptions(AzureConstants.HTTP_LOG_OPTIONS);
@@ -444,8 +420,7 @@
throw new RuntimeException("Failed to disable SSL
verification", e);
}
}
-
- return blobServiceClientBuilder.buildClient();
+ return blobServiceClientBuilder;
}
private static void configCredentialsToAzClient(BlobServiceClientBuilder
builder,
@@ -468,10 +443,4 @@
return config.isAnonymousAuth() ? AZURITE_ENDPOINT + config.getBucket()
: config.getEndpoint() + "/" + config.getBucket();
}
-
- private List<String> getBlobURLs(Set<BlobItem> blobs) {
- final String blobURLPrefix = blobContainerClient.getBlobContainerUrl()
+ "/";
- return blobs.stream().map(BlobItem::getName).map(blobName ->
blobURLPrefix + blobName)
- .collect(Collectors.toList());
- }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20507?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Ie66fe1c9d95684af0c79b9387194f4bba01891a6
Gerrit-Change-Number: 20507
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>