>From Murtadha Hubail <[email protected]>:
Murtadha Hubail has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20514?usp=email )
Change subject: [NO ISSUE][STO] Set storage tier on azure uploads
......................................................................
[NO ISSUE][STO] Set storage tier on azure uploads
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- When uploading files to azure, set the storage tier to hot
for internal storage files.
Ext-ref: MB-69062
Change-Id: If255019dbbd68d5b931248083b70f98e73dc3292
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
M
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
4 files changed, 41 insertions(+), 16 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/14/20514/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
index 2a79c86..d2054c6d 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
@@ -20,6 +20,7 @@
package org.apache.asterix.cloud.clients.azure.blobstorage;
import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -37,11 +38,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import com.azure.core.util.BinaryData;
+import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.AccessTier;
+import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.blob.specialized.BlockBlobClient;
+import com.azure.storage.common.implementation.Constants;
public class AzBlobStorageBufferedWriter implements ICloudBufferedWriter {
private static final String PUT_UPLOAD_ID = "putUploadId";
@@ -49,8 +54,8 @@
private static final Logger LOGGER = LogManager.getLogger();
private final List<String> blockIDArrayList;
private final ICloudGuardian guardian;
- private int blockNumber;
private final String path;
+ private final AccessTier accessTier;
private String uploadID;
private final BlobContainerClient blobContainerClient;
@@ -60,13 +65,14 @@
private final String bucket;
public AzBlobStorageBufferedWriter(BlobContainerClient
blobContainerClient, IRequestProfilerLimiter profiler,
- ICloudGuardian guardian, String bucket, String path) {
+ ICloudGuardian guardian, String bucket, String path, AccessTier
accessTier) {
this.blobContainerClient = blobContainerClient;
this.profiler = profiler;
this.guardian = guardian;
this.bucket = bucket;
this.path = path;
this.blockIDArrayList = new ArrayList<>();
+ this.accessTier = accessTier;
}
@Override
@@ -90,13 +96,11 @@
LOGGER.error("Error while uploading blocks of data: {}",
e.getMessage());
throw new RuntimeException(e);
}
- blockNumber++;
}
private void initBlockBlobUploads(String blockID) {
if (this.uploadID == null) {
this.uploadID = blockID;
- this.blockNumber = 1;
}
}
@@ -105,8 +109,10 @@
if (uploadID == null) {
profiler.objectWrite();
BlobClient blobClient = blobContainerClient.getBlobClient(path);
- BinaryData binaryData =
BinaryData.fromBytes(getDataFromBuffer(buffer));
- blobClient.upload(binaryData);
+ BlobParallelUploadOptions options =
+ new BlobParallelUploadOptions(new
ByteArrayInputStream(getDataFromBuffer(buffer)))
+ .setTier(accessTier);
+ blobClient.uploadWithResponse(options, null, null);
uploadID = PUT_UPLOAD_ID; // uploadID should be updated if the
put-object operation succeeds
} else {
upload(stream, buffer.limit());
@@ -137,7 +143,9 @@
try {
guardian.checkWriteAccess(bucket, path);
profiler.objectMultipartUpload();
- blockBlobClient.commitBlockList(blockIDArrayList);
+ blockBlobClient.commitBlockListWithResponse(blockIDArrayList,
null, null, accessTier,
+ new
BlobRequestConditions().setIfNoneMatch(Constants.HeaderConstants.ETAG_WILDCARD),
null,
+ Context.NONE);
break;
} catch (BlobStorageException e) {
currRetryAttempt++;
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
index 31b160d..0cf292c 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
@@ -27,11 +27,13 @@
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.azure.storage.blob.models.AccessTier;
public class AzBlobStorageClientConfig {
// Ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
static final int MAX_CONCURRENT_REQUESTS = 20;
+ private static final AccessTier INTERNAL_STORAGE_DEFAULT_ACCESS_TIER =
AccessTier.HOT;
private final int writeBufferSize;
private final String region;
private final String endpoint;
@@ -44,15 +46,18 @@
private final int writeMaxRequestsPerSeconds;
private final int readMaxRequestsPerSeconds;
private final boolean storageDisableSSLVerify;
+ private final AccessTier accessTier;
public AzBlobStorageClientConfig(String region, String endpoint, String
prefix, boolean anonymousAuth,
long profilerLogInterval, String bucket, int writeBufferSize) {
- this(region, endpoint, prefix, anonymousAuth, profilerLogInterval,
bucket, 1, 0, 0, writeBufferSize, false);
+ this(region, endpoint, prefix, anonymousAuth, profilerLogInterval,
bucket, 1, 0, 0, writeBufferSize, false,
+ null);
}
public AzBlobStorageClientConfig(String region, String endpoint, String
prefix, boolean anonymousAuth,
long profilerLogInterval, String bucket, long tokenAcquireTimeout,
int writeMaxRequestsPerSeconds,
- int readMaxRequestsPerSeconds, int writeBufferSize, boolean
storageDisableSSLVerify) {
+ int readMaxRequestsPerSeconds, int writeBufferSize, boolean
storageDisableSSLVerify,
+ AccessTier accessTier) {
this.region = Objects.requireNonNull(region, "region");
this.endpoint = endpoint;
this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -64,6 +69,7 @@
this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
this.writeBufferSize = writeBufferSize;
this.storageDisableSSLVerify = storageDisableSSLVerify;
+ this.accessTier = accessTier;
}
public static AzBlobStorageClientConfig of(CloudProperties
cloudProperties) {
@@ -72,7 +78,7 @@
cloudProperties.getProfilerLogInterval(),
cloudProperties.getStorageBucket(),
cloudProperties.getTokenAcquireTimeout(),
cloudProperties.getWriteMaxRequestsPerSecond(),
cloudProperties.getReadMaxRequestsPerSecond(),
cloudProperties.getWriteBufferSize(),
- cloudProperties.isStorageDisableSSLVerify());
+ cloudProperties.isStorageDisableSSLVerify(),
INTERNAL_STORAGE_DEFAULT_ACCESS_TIER);
}
public static AzBlobStorageClientConfig of(Map<String, String>
configuration, int writeBufferSize) {
@@ -138,4 +144,8 @@
public int getWriteBufferSize() {
return writeBufferSize;
}
+
+ public AccessTier getAccessTier() {
+ return accessTier;
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
index 1cd0556..a5b0ff5 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
@@ -21,6 +21,7 @@
import static
org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig.MAX_CONCURRENT_REQUESTS;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -67,12 +68,14 @@
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -87,6 +90,7 @@
import reactor.netty.http.client.HttpClient;
public class AzBlobStorageCloudClient implements ICloudClient {
+
private static final String BUCKET_ROOT_PATH = "";
public static final String AZURITE_ENDPOINT =
"http://127.0.0.1:15055/devstoreaccount1/";
private static final String AZURITE_ACCOUNT_NAME = "devstoreaccount1";
@@ -98,6 +102,7 @@
private final AzBlobStorageClientConfig config;
private final IRequestProfilerLimiter profiler;
private static final Logger LOGGER = LogManager.getLogger();
+ private final AccessTier accessTier;
public AzBlobStorageCloudClient(AzBlobStorageClientConfig config,
ICloudGuardian guardian) {
this(config, buildClient(config), buildAsyncClient(config), guardian);
@@ -109,6 +114,7 @@
this.blobContainerAsyncClient =
asyncBlobServiceClient.getBlobContainerAsyncClient(config.getBucket());
this.config = config;
this.guardian = guardian;
+ this.accessTier = config.getAccessTier();
long profilerInterval = config.getProfilerLogInterval();
AzureRequestRateLimiter limiter = new AzureRequestRateLimiter(config);
if (profilerInterval > 0) {
@@ -132,7 +138,7 @@
@Override
public ICloudWriter createWriter(String bucket, String path,
IWriteBufferProvider bufferProvider) {
ICloudBufferedWriter bufferedWriter = new
AzBlobStorageBufferedWriter(blobContainerClient, profiler, guardian,
- bucket, config.getPrefix() + path);
+ bucket, config.getPrefix() + path, config.getAccessTier());
return new CloudResettableInputStream(bufferedWriter, bufferProvider);
}
@@ -241,9 +247,10 @@
public void write(String bucket, String path, byte[] data) {
guardian.checkWriteAccess(bucket, path);
profiler.objectWrite();
- BinaryData binaryData = BinaryData.fromBytes(data);
BlobClient blobClient =
blobContainerClient.getBlobClient(config.getPrefix() + path);
- blobClient.upload(binaryData, true);
+ BlobParallelUploadOptions options =
+ new BlobParallelUploadOptions(new
ByteArrayInputStream(data)).setTier(accessTier);
+ blobClient.uploadWithResponse(options, null, null);
}
@Override
@@ -255,7 +262,7 @@
profiler.objectCopy();
guardian.checkWriteAccess(bucket, destPath.getRelativePath());
BlobClient destBlobClient =
blobContainerClient.getBlobClient(destPath.getFile().getPath());
- destBlobClient.beginCopy(srcBlobUrl, null);
+ destBlobClient.beginCopy(srcBlobUrl, null, null, null, null, null,
null);
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
index 44b4856..d9424a8 100644
---
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
@@ -117,7 +117,7 @@
URI blobStore = URI.create(blobServiceClient.getAccountUrl());
String endpoint = blobStore.getScheme() + "://" +
blobStore.getAuthority() + "/devstoreaccount1";
AzBlobStorageClientConfig config = new
AzBlobStorageClientConfig(MOCK_SERVER_REGION, endpoint, "", false, 0,
- PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize, true);
+ PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize, true, null);
CLOUD_CLIENT = new AzBlobStorageCloudClient(config,
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20514?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: If255019dbbd68d5b931248083b70f98e73dc3292
Gerrit-Change-Number: 20514
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <[email protected]>