>From Mohammad Nawazish Khan <[email protected]>:
Mohammad Nawazish Khan has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18235 )
Change subject: ASTERIXDB-3379 Provide Azure Blob Storage support for the Engine
......................................................................
ASTERIXDB-3379 Provide Azure Blob Storage support for the Engine
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- In order to provide compute/storage separation, the engine is to support
reading and writing it's data from Azure Blob Storage.
Change-Id: Ie9deece8ba98a5b8be7c817baea286cd50d88e68
---
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientProvider.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzParallelDownloader.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageGateway.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClient.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
7 files changed, 550 insertions(+), 4 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/35/18235/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index 21450c4..015eebc 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -18,8 +18,7 @@
*/
package org.apache.asterix.cloud.clients;
-import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
-import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClient;
import org.apache.asterix.common.config.CloudProperties;
public class CloudClientProvider {
@@ -31,8 +30,10 @@
public static ICloudClient getClient(CloudProperties cloudProperties) {
String storageScheme = cloudProperties.getStorageScheme();
if ("s3".equalsIgnoreCase(storageScheme)) {
- S3ClientConfig config = S3ClientConfig.of(cloudProperties);
- return new S3CloudClient(config);
+ /*S3ClientConfig config = S3ClientConfig.of(cloudProperties);
+ return new S3CloudClient(config);*/
+ System.out.println("Retrieving Azure Blob Storage Client");
+ return new AzBlobStorageClient("devstoreaccount1",
"cloud-storage-container");
}
throw new IllegalStateException("unsupported cloud storage scheme: " +
storageScheme);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
new file mode 100644
index 0000000..0ac5ad4
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
@@ -0,0 +1,70 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.azure.storage.blob.specialized.BlockBlobClient;
+
+public class AzBlobStorageBufferedWriter implements ICloudBufferedWriter {
+ private final AzBlobStorageClientProvider azBlobStorageClientProvider;
+ //private final AzBlobStorageGateway azBlobStorageGateway;
+ private final List<String> blockIDArrayList;
+ private int blockNumber;
+ private final String path;
+ private String uploadID;
+
+ public AzBlobStorageBufferedWriter(
+ AzBlobStorageClientProvider azBlobStorageClientProvider
/*AzBlobStorageGateway azBlobStorageGateway*/,
+ List<String> blockIDArrayList, String path) {
+ this.azBlobStorageClientProvider = azBlobStorageClientProvider;
+ //this.azBlobStorageGateway = azBlobStorageGateway;
+ this.blockIDArrayList = blockIDArrayList;
+ this.path = path;
+ }
+
+ @Override
+ public int upload(InputStream stream, int length) {
+ BlockBlobClient blockBlobClient =
azBlobStorageClientProvider.getBlobClient(this.path).getBlockBlobClient();
+ //BlockBlobClient blockBlobClient =
azBlobStorageGateway.getBlobClient(this.path).getBlockBlobClient();
+ try {
+ byte[] blockData = stream.readNBytes(length);
+ ByteArrayInputStream blockStream = new
ByteArrayInputStream(blockData);
+ String blockID =
+
Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
+ this.uploadID = blockID;
+ blockIDArrayList.add(blockID);
+ blockBlobClient.stageBlock(blockID, blockStream, blockData.length);
+ } catch (IOException e) {
+ System.out.println("Error while uploading blocks of data: " +
e.getMessage());
+ }
+ return blockNumber++;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return this.uploadID == null;
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ if (this.uploadID == null) {
+ throw new IllegalStateException("Cannot finish without writing any
bytes");
+ }
+ BlockBlobClient blockBlobClient =
azBlobStorageClientProvider.getBlobClient(this.path).getBlockBlobClient();
+ //BlockBlobClient blockBlobClient =
azBlobStorageGateway.getBlobClient(this.path).getBlockBlobClient();
+ blockBlobClient.commitBlockList(blockIDArrayList); // Todo: Add retry
as in S3 implementation. client has this feature probably
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ // Todo
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClient.java
new file mode 100644
index 0000000..5e6bcb2
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClient.java
@@ -0,0 +1,219 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.core.util.BinaryData;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobRange;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+// Todo: Profiler not yet implemented which is available in S3 implementation
+public class AzBlobStorageClient implements ICloudClient {
+ private static final String BUCKET_ROOT_PATH = "";
+ //private final String azServiceAccount;
+ private BlobClient blobClient;
+ private BlobServiceClient blobServiceClient;
+ private BlobContainerClient blobContainerClient;
+ private /*final*/ AzBlobStorageClientProvider azBlobStorageClientProvider;
// Todo: This is not abs client provider
+
+ private /*final*/ AzBlobStorageGateway azBlobStorageGateway;
+
+ public AzBlobStorageClient(String bucket, String path) {
+ this.azBlobStorageClientProvider = new
AzBlobStorageClientProvider("devstoreaccount1"); // Todo: @nawazish-couchbase
from where to get the service account
+ //this.blobServiceClient =
azBlobStorageClientProvider.getBlobServiceClient();
+ //this.blobContainerClient =
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+ //this.blobClient = azBlobStorageClientProvider.getBlobClient(path);
// The "path" here signifies the virtual directory structure in ABS
+ //this.azBlobStorageGateway = new
AzBlobStorageGateway("devstoreaccount1", bucket);
+ System.out.println("Az Gateway created successfully...");
+ }
+
+ public AzBlobStorageClient() {
+ //this.azBlobStorageClientProvider = new
AzBlobStorageClientProvider("2431722c5003454b863ea79d");
+ this.azBlobStorageGateway = new
AzBlobStorageGateway("devstoreaccount1", "cloud-storage-container");
+ }
+
+ /*public AzBlobStorageClient(final String azServiceAccount, final String
bucket) {
+ this.azServiceAccount = azServiceAccount;
+ this.azBlobStorageGateway = new AzBlobStorageGateway(azServiceAccount,
bucket);
+ }*/
+ @Override
+ public ICloudBufferedWriter createBufferedWriter(String bucket, String
path) {
+ //AzBlobStorageGateway azBlobStorageGateway = new
AzBlobStorageGateway(this.azServiceAccount, bucket);
+ return new
AzBlobStorageBufferedWriter(this.azBlobStorageClientProvider
/*this.azBlobStorageGateway*/,
+ new ArrayList<>(), path);
+ }
+
+ @Override
+ public Set<String> listObjects(String bucket, String path, FilenameFilter
filter) {
+ PagedIterable<BlobItem> blobItems = getBlobItems(bucket, path);
+ if (filter == null) {
+ return
blobItems.stream().map(BlobItem::getName).collect(Collectors.toSet());
+ }
+ return blobItems.stream().map(BlobItem::getName).filter(blobItem ->
filter.accept(null, blobItem))
+ .collect(Collectors.toSet());
+ }
+
+ private PagedIterable<BlobItem> getBlobItems(String bucket, String path) {
+ ListBlobsOptions options = new ListBlobsOptions().setPrefix(path);
+ BlobContainerClient blobContainerClient =
this.azBlobStorageClientProvider.getBlobContainerClient(bucket);
+ //BlobContainerClient blobContainerClient =
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+ return blobContainerClient.listBlobs(options, null);
+ }
+
+ @Override
+ public int read(String bucket, String path, long offset, ByteBuffer
buffer) throws HyracksDataException {
+ BlobContainerClient blobContainerClient =
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+ BlobClient blobClient =
azBlobStorageClientProvider.getBlobClient(path);
+ /*BlobContainerClient blobContainerClient =
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+ BlobClient blobClient = blobContainerClient.getBlobClient(path);*/
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream(buffer.capacity());
+ long rem = buffer.remaining();
+ BlobRange blobRange = new BlobRange(offset, rem);
+ blobClient.downloadStreamWithResponse(byteArrayOutputStream,
blobRange, null, null, false, null, null);
+ byte[] byteArray = byteArrayOutputStream.toByteArray();
+ buffer.put(byteArray); // Todo: throws runtime exception overflow and
read only buffer exceptions. handle it.
+ try {
+ byteArrayOutputStream.close(); //Todo: throw relevant
HyracksDataException
+ } catch (IOException e) {
+ System.out.println("Failed to close o/p stream on which blob range
was read");
+ }
+ return ((int) rem - buffer.remaining());
+ }
+
+ @Override
+ public byte[] readAllBytes(String bucket, String path) throws
HyracksDataException {
+ BlobContainerClient blobContainerClient =
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+ BlobClient blobClient =
azBlobStorageClientProvider.getBlobClient(path);
+ /*BlobContainerClient blobContainerClient =
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+ BlobClient blobClient = blobContainerClient.getBlobClient(path);*/
+ BinaryData binaryData = blobClient.downloadContent();
+ return binaryData.toBytes(); // Todo: check for any runtime exceptions
in these methods.
+ }
+
+ @Override
+ public InputStream getObjectStream(String bucket, String path) {
+ BlobContainerClient blobContainerClient =
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+ BlobClient blobClient =
azBlobStorageClientProvider.getBlobClient(path);
+ /*BlobContainerClient blobContainerClient =
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+ BlobClient blobClient = blobContainerClient.getBlobClient(path);*/
+ return blobClient.openInputStream();
+ }
+
+ @Override
+ public void write(String bucket, String path, byte[] data) {
+ BinaryData binaryData = BinaryData.fromBytes(data);
+ BlobClient blobClient =
this.azBlobStorageClientProvider.getBlobClient(path);
+ //BlobClient blobClient =
this.azBlobStorageGateway.getBlobClient(path);
+ blobClient.upload(binaryData);
+ }
+
+ @Override
+ public void copy(String bucket, String srcPath, FileReference destPath) {
+ BlobContainerClient blobContainerClient =
azBlobStorageClientProvider.getBlobContainerClient(bucket);// Todo might not
need to create a bucket in case if bucket is not already present.
+ BlobClient srcBlobClient =
azBlobStorageClientProvider.getBlobClient(srcPath);
+ /*BlobContainerClient blobContainerClient =
this.azBlobStorageGateway.getBlobContainerClient(bucket);// Todo might not need
to create a bucket in case if bucket is not already present.
+ BlobClient srcBlobClient =
blobContainerClient.getBlobClient(srcPath);*/
+ String srcBlobUrl = srcBlobClient.getBlobUrl();
+ BlobClient destBlobClient =
blobContainerClient.getBlobClient(destPath.getFile().getPath());
+ destBlobClient.beginCopy(srcBlobUrl, null);
+ }
+
+ @Override
+ public void deleteObjects(String bucket, Collection<String> paths) {
+ BlobContainerClient blobContainerClient =
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+ //BlobContainerClient blobContainerClient =
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+ PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs();
+ Set<BlobItem> blobsToDelete =
+ blobItems.stream().filter(blobItem ->
paths.contains(blobItem.getName())).collect(Collectors.toSet());
+ Set<Object> deletedBlobs = blobsToDelete.stream().map(blobItem -> { //
Todo: ugly lambda?
+ String name = blobItem.getName();
+ BlobClient blobClient = blobContainerClient.getBlobClient(name);
+ blobClient.delete();
+ return name;
+ }).collect(Collectors.toSet());
+ System.out.println("total number of blobs deleted: " +
deletedBlobs.size());
+ // Todo this method might need some clean coding. Method references
might be helpful.
+ }
+
+ @Override
+ public long getObjectSize(String bucket, String path) throws
HyracksDataException {
+ BlobContainerClient blobContainerClient =
azBlobStorageClientProvider.getBlobContainerClient(bucket); //Todo: why the
call in first line; side effect wala code.
+ BlobClient blobClient =
azBlobStorageClientProvider.getBlobClient(path);
+ /*BlobContainerClient blobContainerClient =
this.azBlobStorageGateway.getBlobContainerClient(bucket); // Todo: careful with
the following two lines. The container client fetched for a bucket might not be
the same when getBlobClient is called in the subsequent line.
+ BlobClient blobClient = blobContainerClient.getBlobClient(path);*/
+ return blobClient.getProperties().getBlobSize();
+ // Todo: where is the handling for HyracksDataException
+ }
+
+ @Override
+ public boolean exists(String bucket, String path) throws
HyracksDataException {
+ BlobContainerClient blobContainerClient =
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+ BlobClient blobClient =
azBlobStorageClientProvider.getBlobClient(path);
+ /*BlobContainerClient blobContainerClient =
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+ BlobClient blobClient = blobContainerClient.getBlobClient(path);*/
+ return blobClient.exists();
+ // Todo: Handle Hyracks Data Exception thrown in the signature
+ }
+
+ @Override
+ public boolean isEmptyPrefix(String bucket, String path) throws
HyracksDataException {
+ ListBlobsOptions listBlobsOptions = new
ListBlobsOptions().setPrefix(path);
+ BlobContainerClient blobContainerClient =
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+ PagedIterable<BlobItem> blobItems =
blobContainerClient.listBlobs(listBlobsOptions, null);// Todo: what is this
null thingy; bad code
+ return blobItems.stream().findAny().isEmpty(); // Todo: handle the
Hyracks data exception
+ }
+
+ @Override
+ public IParallelDownloader createParallelDownloader(String bucket,
IOManager ioManager) {
+ return new AzParallelDownloader(bucket, ioManager,
+ this.azBlobStorageGateway/*,
this.azBlobStorageClientProvider*/);
+ }
+
+ @Override
+ public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+ PagedIterable<BlobItem> blobItems = this.getBlobItems(bucket,
BUCKET_ROOT_PATH);
+ List<BlobItem> blobs =
blobItems.stream().distinct().collect(Collectors.toList());
+ ArrayNode objectsInfo = objectMapper.createArrayNode();
+ blobs = blobs.stream()
+ .sorted((blob1, blob2) ->
String.CASE_INSENSITIVE_ORDER.compare(blob1.getName(), blob2.getName()))
+ .collect(Collectors.toList());
+ for (BlobItem blob : blobs) {
+ ObjectNode objectInfo = objectsInfo.addObject();
+ objectInfo.put("path", blob.getName());
+ objectInfo.put("size", blob.getProperties().getContentLength());
+ }
+ return objectsInfo;
+ }
+
+ @Override
+ public void close() {
+ // Closing Azure Blob Clients is not required as the underlying netty
connection pool
+ // handles the same for the apps.
+ // Ref: https://github.com/Azure/azure-sdk-for-java/issues/17903
+ // Hence this implementation is a no op.
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
new file mode 100644
index 0000000..c915ce3
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
@@ -0,0 +1,4 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+public class AzBlobStorageClientConfig {
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientProvider.java
new file mode 100644
index 0000000..f5d6a8f
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientProvider.java
@@ -0,0 +1,52 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import com.azure.storage.blob.*;
+import com.azure.storage.common.StorageSharedKeyCredential;
+
+public class AzBlobStorageClientProvider {
+ private BlobServiceClient blobServiceClient;
+ private BlobContainerClient blobContainerClient;
+
+ private /*final*/ AzBlobStorageGateway azBlobStorageGateway;
+
+ public AzBlobStorageClientProvider(final String azServiceAccount) {
+ final String containerName = "cloud-storage-container"; // Todo: this
is already defined as a constant
+ //this.azBlobStorageGateway = new
AzBlobStorageGateway(azServiceAccount, containerName);
+ final String accKey =
+
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+ final String accName = "devstoreaccount1";
+ StorageSharedKeyCredential creds = new
StorageSharedKeyCredential(accName, accKey);
+ //String endpointString =
String.format("https://%s.blob.core.windows.net", azServiceAccount);
+ final String endpointString =
"http://127.0.0.1:8001/devstoreaccount1/" + containerName;
+ BlobContainerClient b = new
BlobContainerClientBuilder().containerName(containerName).credential(creds)
+ .endpoint(endpointString).buildClient();
+
+ if (!b.exists())
+ b.create();
+
+ System.out.printf("Is container %s created: %b\n", containerName,
b.exists());
+ this.blobContainerClient = b;
+ System.out.println("ABS client provider created successfully...");
+ }
+
+ // Todo: @nawazish-couchbase null check against blobServiceClient
+ public BlobServiceClient getBlobServiceClient() {
+ return this.blobServiceClient;
+ }
+
+ // Todo: @nawazish-couchbase null check against blobServiceClient?
+ public BlobContainerClient getBlobContainerClient(String containerName) {
+ //this.blobServiceClient.createBlobContainerIfNotExists(containerName);
+ //this.blobContainerClient =
this.blobServiceClient.getBlobContainerClient(containerName);
+ return this.blobContainerClient;
+ }
+
+ public BlobClient getBlobClient(String path) {
+ return this.blobContainerClient.getBlobClient(path);
+ }
+
+ public BlobAsyncClient getBlobAsyncClient(String path) {
+ return null;
+ // Todo implement the Async part of this
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageGateway.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageGateway.java
new file mode 100644
index 0000000..3c57ab1
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageGateway.java
@@ -0,0 +1,75 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.util.Objects;
+
+import com.azure.storage.blob.*;
+import com.azure.storage.common.StorageSharedKeyCredential;
+
+public class AzBlobStorageGateway {
+
+ private final String serviceAccount;
+ private final String bucket;
+ private final static String AZ_SERVICE_ACCOUNT =
"2431722c5003454b863ea79d"; // Todo: remove
+ private final static String AZ_SERVICE_ACCOUNT_KEY =
+
"DgB5H3MH5Vm0UNxh8OhzvYZyBua7qIpFfzG1zwCiSTcsPlKYU7TGmqvEB97DC/kV3piNkwlNaw4m+AStgKxgcg==";
// Todo: remove
+ private final BlobServiceClient blobServiceClient;
+ //private final BlobContainerClient blobContainerClient;
+
+ public AzBlobStorageGateway(final String serviceAccount, final String
bucket) {
+ sanitizeInputs(serviceAccount, bucket);
+ this.serviceAccount = serviceAccount;
+ this.bucket = bucket;
+ this.blobServiceClient = buildBlobServiceClient();
+ //this.blobContainerClient = buildBlobContainerClient(bucket);
+ }
+
+ private static void sanitizeInputs(final String serviceAccount, final
String bucket) {
+ Objects.requireNonNull(serviceAccount, "Azure Service Account cannot
be null");
+ Objects.requireNonNull(bucket, "Azure Container name cannot be null");
+ if (serviceAccount.isEmpty())
+ throw new IllegalArgumentException("Azure Service Account cannot
be empty string");
+ if (bucket.isEmpty())
+ throw new IllegalArgumentException("Azure Container name cannot be
empty string");
+ }
+
+ private BlobServiceClient buildBlobServiceClient() {
+ //String endpointString =
String.format("https://%s.blob.core.windows.net", this.serviceAccount);
+ final String accKey =
+
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+ final String accName = "devstoreaccount1";
+ String endpointString =
"http://127.0.0.1:8001/devstoreaccount1/instance-8ekq10wlu-multinodeapi-bucket";
+ BlobServiceClient client = new
BlobServiceClientBuilder().endpoint(endpointString)
+ //.credential(new DefaultAzureCredentialBuilder().build()) //
Todo: Would be used in real scenario and testing
+ .credential(new StorageSharedKeyCredential(accName,
accKey)).buildClient();
+ return client;
+ }
+
+ /*private BlobContainerClient buildBlobContainerClient(final String
bucket) {
+ return this.blobServiceClient.getBlobContainerClient(bucket);
+ }*/
+
+ public BlobClient getBlobClient(final String path) {
+ return this.getBlobContainerClient(this.bucket).getBlobClient(path);
+ }
+
+ public BlobContainerClient getBlobContainerClient(final String bucket) {
+ return this.blobServiceClient.getBlobContainerClient(bucket);
+ }
+
+ public BlobContainerAsyncClient getBlobContainerAsyncClient(final String
bucket) {
+ final String accKey =
+
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+ final String accName = "devstoreaccount1";
+ //final String endpoint =
"http://127.0.0.1:10000/devstoreaccount1/"+containerName;
+ final String endpoint = "http://127.0.0.1:8001/devstoreaccount1/" +
bucket;
+ StorageSharedKeyCredential creds = new
StorageSharedKeyCredential(accName, accKey);
+ BlobContainerAsyncClient b = new
BlobContainerClientBuilder().containerName(bucket).credential(creds)
+ .endpoint(endpoint).buildAsyncClient();
+ //b.createIfNotExists();
+ return b;
+ }
+
+ public BlobAsyncClient getBlobAsyncClient(final String path) {
+ return null;
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzParallelDownloader.java
new file mode 100644
index 0000000..2c650cd
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzParallelDownloader.java
@@ -0,0 +1,109 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerAsyncClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+
+public class AzParallelDownloader implements IParallelDownloader {
+ private final String bucket; // Todo: never used.
+ private final IOManager ioManager;
+ //private AzBlobStorageClientProvider azBlobStorageClientProvider;
+ private AzBlobStorageGateway azBlobStorageGateway;
+
+ public AzParallelDownloader(String bucket, IOManager ioManager,
+ AzBlobStorageGateway azBlobStorageGateway/*,
+
AzBlobStorageClientProvider azBlobStorageClientProvider*/) { // Todo: 3 params
one too many?
+ this.bucket = bucket;
+ this.ioManager = ioManager;
+ this.azBlobStorageGateway = azBlobStorageGateway;
+ //this.azBlobStorageClientProvider = azBlobStorageClientProvider;
+ }
+
+ @Override
+ public void downloadFiles(Collection<FileReference> toDownload) throws
HyracksDataException {
+ // Todo: handle HyracksDataException
+ toDownload.parallelStream().forEach(fileReference -> {
+ //BlobAsyncClient blobAsyncClient =
azBlobStorageClientProvider.getBlobAsyncClient(fileReference.getName());
+ BlobContainerAsyncClient blobContainerAsyncClient =
+
azBlobStorageGateway.getBlobContainerAsyncClient(this.bucket);
+ BlobAsyncClient blobAsyncClient =
blobContainerAsyncClient.getBlobAsyncClient(fileReference.getName());
+ ByteBuffer blob = blobAsyncClient.downloadStream().blockLast();
+ //assert blob != null;
+ byte[] data = blob.array(); // Todo
+ try (FileOutputStream fileOutputStream = new
FileOutputStream(fileReference.getFile())) {
+ FileUtils.createParentDirectories(fileReference.getFile());
+ fileOutputStream.write(data);
+ fileOutputStream.flush();
+ } catch (IOException e) {
+ // Todo: what to do with exceptions,
+ // what happens to the remaining files both on cloud and
downloaded successfully in local NVMe
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public Collection<FileReference>
downloadDirectories(Collection<FileReference> toDownload)
+ throws HyracksDataException {
+ Set<FileReference> failedFiles = new HashSet<>();
+ toDownload.stream().parallel().forEach(directoryToDownload -> {
+
+ //BlobContainerClient blobContainerClient =
azBlobStorageClientProvider.getBlobContainerClient(""); // Todo: Container name
+ BlobContainerClient blobContainerClient =
this.azBlobStorageGateway.getBlobContainerClient(this.bucket); // Todo:
Container name
+ ListBlobsOptions listBlobsOptions = new
ListBlobsOptions().setPrefix(directoryToDownload.getName());
+ PagedIterable<BlobItem> blobItems =
blobContainerClient.listBlobs(listBlobsOptions, null); // Todo: what is this
null?
+
+ blobItems.stream().parallel().forEachOrdered(blobItem -> { //
Todo: Ugly lambdas; 2 of them
+ Path path = Path.of(blobItem.getName());
+ try {
+ Path parentDirectory = path.getParent();
+ if (Files.notExists(parentDirectory))
+ Files.createDirectories(parentDirectory);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ BlobClient blobClient =
blobContainerClient.getBlobClient(blobItem.getName());
+ try {
+ blobClient.downloadStream(new
FileOutputStream(blobItem.getName()));
+ } catch (FileNotFoundException e) {
+ try {
+ FileReference failedFile =
ioManager.resolve(blobItem.getName());
+ failedFiles.add(failedFile);
+ } catch (HyracksDataException ex) { // Todo: Ugly
try-catch ladder; declare all exceptions on method; refer S3 implementation.
+ throw new RuntimeException(ex);
+ }
+ throw new RuntimeException(e);
+ }
+ });
+ });
+ return failedFiles;
+ }
+
+ @Override
+ public void close() {
+ // Unlike S3 clients, the Azure clients are not required to be
explicitly closed.
+ // Hence this would remain to be a no-impl method.
+ // Todo: reverify the above assumption and document it with a proper
supportive documentation from Azure.
+
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18235
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ie9deece8ba98a5b8be7c817baea286cd50d88e68
Gerrit-Change-Number: 18235
Gerrit-PatchSet: 1
Gerrit-Owner: Mohammad Nawazish Khan <[email protected]>
Gerrit-MessageType: newchange