>From Murtadha Hubail <[email protected]>: Murtadha Hubail has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20514?usp=email )
Change subject: [NO ISSUE][STO] Set storage tier on azure uploads ...................................................................... [NO ISSUE][STO] Set storage tier on azure uploads - user model changes: no - storage format changes: no - interface changes: no Details: - When uploading files to azure, set the storage tier to hot for internal storage files. Ext-ref: MB-69062 Change-Id: If255019dbbd68d5b931248083b70f98e73dc3292 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20514 Tested-by: Murtadha Hubail <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java M asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java 4 files changed, 41 insertions(+), 16 deletions(-) Approvals: Jenkins: Verified Ali Alsuliman: Looks good to me, approved Murtadha Hubail: Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java index 2a79c86..d2054c6d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java @@ -20,6 +20,7 @@ package org.apache.asterix.cloud.clients.azure.blobstorage; import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -37,11 +38,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.models.AccessTier; +import com.azure.storage.blob.models.BlobRequestConditions; import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.options.BlobParallelUploadOptions; import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.common.implementation.Constants; public class AzBlobStorageBufferedWriter implements ICloudBufferedWriter { private static final String PUT_UPLOAD_ID = "putUploadId"; @@ -49,8 +54,8 @@ private static final Logger LOGGER = LogManager.getLogger(); private final List<String> blockIDArrayList; private final ICloudGuardian guardian; - private int blockNumber; private final String path; + private final AccessTier accessTier; private String uploadID; private final BlobContainerClient blobContainerClient; @@ -60,13 +65,14 @@ private final String bucket; public AzBlobStorageBufferedWriter(BlobContainerClient blobContainerClient, IRequestProfilerLimiter profiler, - ICloudGuardian guardian, String bucket, String path) { + ICloudGuardian guardian, String bucket, String path, AccessTier accessTier) { this.blobContainerClient = blobContainerClient; this.profiler = profiler; this.guardian = guardian; this.bucket = bucket; this.path = path; this.blockIDArrayList = new ArrayList<>(); + this.accessTier = accessTier; } @Override @@ -90,13 +96,11 @@ LOGGER.error("Error while uploading blocks of data: {}", e.getMessage()); throw new RuntimeException(e); } - blockNumber++; } private void initBlockBlobUploads(String blockID) { if (this.uploadID == null) { this.uploadID = blockID; - this.blockNumber = 1; } } @@ -105,8 +109,10 @@ if (uploadID == null) { profiler.objectWrite(); BlobClient blobClient = blobContainerClient.getBlobClient(path); - BinaryData binaryData = BinaryData.fromBytes(getDataFromBuffer(buffer)); - blobClient.upload(binaryData); + BlobParallelUploadOptions options = + new BlobParallelUploadOptions(new ByteArrayInputStream(getDataFromBuffer(buffer))) + .setTier(accessTier); + blobClient.uploadWithResponse(options, null, null); uploadID = PUT_UPLOAD_ID; // uploadID should be updated if the put-object operation succeeds } else { upload(stream, buffer.limit()); @@ -137,7 +143,9 @@ try { guardian.checkWriteAccess(bucket, path); profiler.objectMultipartUpload(); - blockBlobClient.commitBlockList(blockIDArrayList); + blockBlobClient.commitBlockListWithResponse(blockIDArrayList, null, null, accessTier, + new BlobRequestConditions().setIfNoneMatch(Constants.HeaderConstants.ETAG_WILDCARD), null, + Context.NONE); break; } catch (BlobStorageException e) { currRetryAttempt++; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java index 31b160d..9abfbcc 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java @@ -27,11 +27,13 @@ import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.models.AccessTier; public class AzBlobStorageClientConfig { // Ref: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id static final int MAX_CONCURRENT_REQUESTS = 20; + private static final AccessTier INTERNAL_STORAGE_ACCESS_TIER = AccessTier.HOT; private final int writeBufferSize; private final String region; private final String endpoint; @@ -44,15 +46,18 @@ private final int writeMaxRequestsPerSeconds; private final int readMaxRequestsPerSeconds; private final boolean storageDisableSSLVerify; + private final AccessTier accessTier; public AzBlobStorageClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, long profilerLogInterval, String bucket, int writeBufferSize) { - this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, bucket, 1, 0, 0, writeBufferSize, false); + this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, bucket, 1, 0, 0, writeBufferSize, false, + null); } public AzBlobStorageClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, long profilerLogInterval, String bucket, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, - int readMaxRequestsPerSeconds, int writeBufferSize, boolean storageDisableSSLVerify) { + int readMaxRequestsPerSeconds, int writeBufferSize, boolean storageDisableSSLVerify, + AccessTier accessTier) { this.region = Objects.requireNonNull(region, "region"); this.endpoint = endpoint; this.prefix = Objects.requireNonNull(prefix, "prefix"); @@ -64,6 +69,7 @@ this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds; this.writeBufferSize = writeBufferSize; this.storageDisableSSLVerify = storageDisableSSLVerify; + this.accessTier = accessTier; } public static AzBlobStorageClientConfig of(CloudProperties cloudProperties) { @@ -72,7 +78,7 @@ cloudProperties.getProfilerLogInterval(), cloudProperties.getStorageBucket(), cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(), cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize(), - cloudProperties.isStorageDisableSSLVerify()); + cloudProperties.isStorageDisableSSLVerify(), INTERNAL_STORAGE_ACCESS_TIER); } public static AzBlobStorageClientConfig of(Map<String, String> configuration, int writeBufferSize) { @@ -138,4 +144,8 @@ public int getWriteBufferSize() { return writeBufferSize; } + + public AccessTier getAccessTier() { + return accessTier; + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java index 1cd0556..43e5fd2 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java @@ -21,6 +21,7 @@ import static org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig.MAX_CONCURRENT_REQUESTS; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.FilenameFilter; import java.io.IOException; @@ -67,12 +68,14 @@ import com.azure.storage.blob.BlobServiceAsyncClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.AccessTier; import com.azure.storage.blob.models.BlobErrorCode; import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.BlobListDetails; import com.azure.storage.blob.models.BlobRange; import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.models.ListBlobsOptions; +import com.azure.storage.blob.options.BlobParallelUploadOptions; import com.azure.storage.common.StorageSharedKeyCredential; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -87,6 +90,7 @@ import reactor.netty.http.client.HttpClient; public class AzBlobStorageCloudClient implements ICloudClient { + private static final String BUCKET_ROOT_PATH = ""; public static final String AZURITE_ENDPOINT = "http://127.0.0.1:15055/devstoreaccount1/"; private static final String AZURITE_ACCOUNT_NAME = "devstoreaccount1"; @@ -98,6 +102,7 @@ private final AzBlobStorageClientConfig config; private final IRequestProfilerLimiter profiler; private static final Logger LOGGER = LogManager.getLogger(); + private final AccessTier accessTier; public AzBlobStorageCloudClient(AzBlobStorageClientConfig config, ICloudGuardian guardian) { this(config, buildClient(config), buildAsyncClient(config), guardian); @@ -109,6 +114,7 @@ this.blobContainerAsyncClient = asyncBlobServiceClient.getBlobContainerAsyncClient(config.getBucket()); this.config = config; this.guardian = guardian; + this.accessTier = config.getAccessTier(); long profilerInterval = config.getProfilerLogInterval(); AzureRequestRateLimiter limiter = new AzureRequestRateLimiter(config); if (profilerInterval > 0) { @@ -132,7 +138,7 @@ @Override public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) { ICloudBufferedWriter bufferedWriter = new AzBlobStorageBufferedWriter(blobContainerClient, profiler, guardian, - bucket, config.getPrefix() + path); + bucket, config.getPrefix() + path, config.getAccessTier()); return new CloudResettableInputStream(bufferedWriter, bufferProvider); } @@ -241,9 +247,10 @@ public void write(String bucket, String path, byte[] data) { guardian.checkWriteAccess(bucket, path); profiler.objectWrite(); - BinaryData binaryData = BinaryData.fromBytes(data); BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path); - blobClient.upload(binaryData, true); + BlobParallelUploadOptions options = + new BlobParallelUploadOptions(new ByteArrayInputStream(data)).setTier(accessTier); + blobClient.uploadWithResponse(options, null, null); } @Override @@ -255,7 +262,7 @@ profiler.objectCopy(); guardian.checkWriteAccess(bucket, destPath.getRelativePath()); BlobClient destBlobClient = blobContainerClient.getBlobClient(destPath.getFile().getPath()); - destBlobClient.beginCopy(srcBlobUrl, null); + destBlobClient.beginCopy(srcBlobUrl, null, accessTier, null, null, null, null); } @Override diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java index 44b4856..d9424a8 100644 --- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java +++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java @@ -117,7 +117,7 @@ URI blobStore = URI.create(blobServiceClient.getAccountUrl()); String endpoint = blobStore.getScheme() + "://" + blobStore.getAuthority() + "/devstoreaccount1"; AzBlobStorageClientConfig config = new AzBlobStorageClientConfig(MOCK_SERVER_REGION, endpoint, "", false, 0, - PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize, true); + PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize, true, null); CLOUD_CLIENT = new AzBlobStorageCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20514?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: If255019dbbd68d5b931248083b70f98e73dc3292 Gerrit-Change-Number: 20514 Gerrit-PatchSet: 3 Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
