>From Mohammad Nawazish Khan <[email protected]>:

Mohammad Nawazish Khan has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18235 )


Change subject: ASTERIXDB-3379 Provide Azure Blob Storage support for the Engine
......................................................................

ASTERIXDB-3379 Provide Azure Blob Storage support for the Engine

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- In order to provide compute/storage separation, the engine is to support 
reading and writing it's data from Azure Blob Storage.

Change-Id: Ie9deece8ba98a5b8be7c817baea286cd50d88e68
---
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientProvider.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzParallelDownloader.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageGateway.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClient.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
7 files changed, 550 insertions(+), 4 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/35/18235/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index 21450c4..015eebc 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -18,8 +18,7 @@
  */
 package org.apache.asterix.cloud.clients;

-import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
-import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClient;
 import org.apache.asterix.common.config.CloudProperties;

 public class CloudClientProvider {
@@ -31,8 +30,10 @@
     public static ICloudClient getClient(CloudProperties cloudProperties) {
         String storageScheme = cloudProperties.getStorageScheme();
         if ("s3".equalsIgnoreCase(storageScheme)) {
-            S3ClientConfig config = S3ClientConfig.of(cloudProperties);
-            return new S3CloudClient(config);
+            /*S3ClientConfig config = S3ClientConfig.of(cloudProperties);
+            return new S3CloudClient(config);*/
+            System.out.println("Retrieving Azure Blob Storage Client");
+            return new AzBlobStorageClient("devstoreaccount1", 
"cloud-storage-container");
         }
         throw new IllegalStateException("unsupported cloud storage scheme: " + 
storageScheme);
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
new file mode 100644
index 0000000..0ac5ad4
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
@@ -0,0 +1,70 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.azure.storage.blob.specialized.BlockBlobClient;
+
+public class AzBlobStorageBufferedWriter implements ICloudBufferedWriter {
+    private final AzBlobStorageClientProvider azBlobStorageClientProvider;
+    //private final AzBlobStorageGateway azBlobStorageGateway;
+    private final List<String> blockIDArrayList;
+    private int blockNumber;
+    private final String path;
+    private String uploadID;
+
+    public AzBlobStorageBufferedWriter(
+            AzBlobStorageClientProvider azBlobStorageClientProvider 
/*AzBlobStorageGateway azBlobStorageGateway*/,
+            List<String> blockIDArrayList, String path) {
+        this.azBlobStorageClientProvider = azBlobStorageClientProvider;
+        //this.azBlobStorageGateway = azBlobStorageGateway;
+        this.blockIDArrayList = blockIDArrayList;
+        this.path = path;
+    }
+
+    @Override
+    public int upload(InputStream stream, int length) {
+        BlockBlobClient blockBlobClient = 
azBlobStorageClientProvider.getBlobClient(this.path).getBlockBlobClient();
+        //BlockBlobClient blockBlobClient = 
azBlobStorageGateway.getBlobClient(this.path).getBlockBlobClient();
+        try {
+            byte[] blockData = stream.readNBytes(length);
+            ByteArrayInputStream blockStream = new 
ByteArrayInputStream(blockData);
+            String blockID =
+                    
Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
+            this.uploadID = blockID;
+            blockIDArrayList.add(blockID);
+            blockBlobClient.stageBlock(blockID, blockStream, blockData.length);
+        } catch (IOException e) {
+            System.out.println("Error while uploading blocks of data: " + 
e.getMessage());
+        }
+        return blockNumber++;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return this.uploadID == null;
+    }
+
+    @Override
+    public void finish() throws HyracksDataException {
+        if (this.uploadID == null) {
+            throw new IllegalStateException("Cannot finish without writing any 
bytes");
+        }
+        BlockBlobClient blockBlobClient = 
azBlobStorageClientProvider.getBlobClient(this.path).getBlockBlobClient();
+        //BlockBlobClient blockBlobClient = 
azBlobStorageGateway.getBlobClient(this.path).getBlockBlobClient();
+        blockBlobClient.commitBlockList(blockIDArrayList); // Todo: Add retry 
as in S3 implementation. client has this feature probably
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        // Todo
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClient.java
new file mode 100644
index 0000000..5e6bcb2
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClient.java
@@ -0,0 +1,219 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.core.util.BinaryData;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobRange;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+// Todo: Profiler not yet implemented which is available in S3 implementation
+public class AzBlobStorageClient implements ICloudClient {
+    private static final String BUCKET_ROOT_PATH = "";
+    //private final String azServiceAccount;
+    private BlobClient blobClient;
+    private BlobServiceClient blobServiceClient;
+    private BlobContainerClient blobContainerClient;
+    private /*final*/ AzBlobStorageClientProvider azBlobStorageClientProvider; 
// Todo: This is not abs client provider
+
+    private /*final*/ AzBlobStorageGateway azBlobStorageGateway;
+
+    public AzBlobStorageClient(String bucket, String path) {
+        this.azBlobStorageClientProvider = new 
AzBlobStorageClientProvider("devstoreaccount1"); // Todo: @nawazish-couchbase 
from where to get the service account
+        //this.blobServiceClient = 
azBlobStorageClientProvider.getBlobServiceClient();
+        //this.blobContainerClient = 
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+        //this.blobClient = azBlobStorageClientProvider.getBlobClient(path); 
// The "path" here signifies the virtual directory structure in ABS
+        //this.azBlobStorageGateway = new 
AzBlobStorageGateway("devstoreaccount1", bucket);
+        System.out.println("Az Gateway created successfully...");
+    }
+
+    public AzBlobStorageClient() {
+        //this.azBlobStorageClientProvider = new 
AzBlobStorageClientProvider("2431722c5003454b863ea79d");
+        this.azBlobStorageGateway = new 
AzBlobStorageGateway("devstoreaccount1", "cloud-storage-container");
+    }
+
+    /*public AzBlobStorageClient(final String azServiceAccount, final String 
bucket) {
+       this.azServiceAccount = azServiceAccount;
+       this.azBlobStorageGateway = new AzBlobStorageGateway(azServiceAccount, 
bucket);
+    }*/
+    @Override
+    public ICloudBufferedWriter createBufferedWriter(String bucket, String 
path) {
+        //AzBlobStorageGateway azBlobStorageGateway = new 
AzBlobStorageGateway(this.azServiceAccount, bucket);
+        return new 
AzBlobStorageBufferedWriter(this.azBlobStorageClientProvider 
/*this.azBlobStorageGateway*/,
+                new ArrayList<>(), path);
+    }
+
+    @Override
+    public Set<String> listObjects(String bucket, String path, FilenameFilter 
filter) {
+        PagedIterable<BlobItem> blobItems = getBlobItems(bucket, path);
+        if (filter == null) {
+            return 
blobItems.stream().map(BlobItem::getName).collect(Collectors.toSet());
+        }
+        return blobItems.stream().map(BlobItem::getName).filter(blobItem -> 
filter.accept(null, blobItem))
+                .collect(Collectors.toSet());
+    }
+
+    private PagedIterable<BlobItem> getBlobItems(String bucket, String path) {
+        ListBlobsOptions options = new ListBlobsOptions().setPrefix(path);
+        BlobContainerClient blobContainerClient = 
this.azBlobStorageClientProvider.getBlobContainerClient(bucket);
+        //BlobContainerClient blobContainerClient = 
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+        return blobContainerClient.listBlobs(options, null);
+    }
+
+    @Override
+    public int read(String bucket, String path, long offset, ByteBuffer 
buffer) throws HyracksDataException {
+        BlobContainerClient blobContainerClient = 
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+        BlobClient blobClient = 
azBlobStorageClientProvider.getBlobClient(path);
+        /*BlobContainerClient blobContainerClient = 
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+        BlobClient blobClient = blobContainerClient.getBlobClient(path);*/
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream(buffer.capacity());
+        long rem = buffer.remaining();
+        BlobRange blobRange = new BlobRange(offset, rem);
+        blobClient.downloadStreamWithResponse(byteArrayOutputStream, 
blobRange, null, null, false, null, null);
+        byte[] byteArray = byteArrayOutputStream.toByteArray();
+        buffer.put(byteArray); // Todo: throws runtime exception overflow and 
read only buffer exceptions. handle it.
+        try {
+            byteArrayOutputStream.close(); //Todo: throw relevant 
HyracksDataException
+        } catch (IOException e) {
+            System.out.println("Failed to close o/p stream on which blob range 
was read");
+        }
+        return ((int) rem - buffer.remaining());
+    }
+
+    @Override
+    public byte[] readAllBytes(String bucket, String path) throws 
HyracksDataException {
+        BlobContainerClient blobContainerClient = 
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+        BlobClient blobClient = 
azBlobStorageClientProvider.getBlobClient(path);
+        /*BlobContainerClient blobContainerClient = 
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+        BlobClient blobClient = blobContainerClient.getBlobClient(path);*/
+        BinaryData binaryData = blobClient.downloadContent();
+        return binaryData.toBytes(); // Todo: check for any runtime exceptions 
in these methods.
+    }
+
+    @Override
+    public InputStream getObjectStream(String bucket, String path) {
+        BlobContainerClient blobContainerClient = 
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+        BlobClient blobClient = 
azBlobStorageClientProvider.getBlobClient(path);
+        /*BlobContainerClient blobContainerClient = 
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+        BlobClient blobClient = blobContainerClient.getBlobClient(path);*/
+        return blobClient.openInputStream();
+    }
+
+    @Override
+    public void write(String bucket, String path, byte[] data) {
+        BinaryData binaryData = BinaryData.fromBytes(data);
+        BlobClient blobClient = 
this.azBlobStorageClientProvider.getBlobClient(path);
+        //BlobClient blobClient = 
this.azBlobStorageGateway.getBlobClient(path);
+        blobClient.upload(binaryData);
+    }
+
+    @Override
+    public void copy(String bucket, String srcPath, FileReference destPath) {
+        BlobContainerClient blobContainerClient = 
azBlobStorageClientProvider.getBlobContainerClient(bucket);// Todo might not 
need to create a bucket in case if bucket is not already present.
+        BlobClient srcBlobClient = 
azBlobStorageClientProvider.getBlobClient(srcPath);
+        /*BlobContainerClient blobContainerClient = 
this.azBlobStorageGateway.getBlobContainerClient(bucket);// Todo might not need 
to create a bucket in case if bucket is not already present.
+        BlobClient srcBlobClient = 
blobContainerClient.getBlobClient(srcPath);*/
+        String srcBlobUrl = srcBlobClient.getBlobUrl();
+        BlobClient destBlobClient = 
blobContainerClient.getBlobClient(destPath.getFile().getPath());
+        destBlobClient.beginCopy(srcBlobUrl, null);
+    }
+
+    @Override
+    public void deleteObjects(String bucket, Collection<String> paths) {
+        BlobContainerClient blobContainerClient = 
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+        //BlobContainerClient blobContainerClient = 
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+        PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs();
+        Set<BlobItem> blobsToDelete =
+                blobItems.stream().filter(blobItem -> 
paths.contains(blobItem.getName())).collect(Collectors.toSet());
+        Set<Object> deletedBlobs = blobsToDelete.stream().map(blobItem -> { // 
Todo: ugly lambda?
+            String name = blobItem.getName();
+            BlobClient blobClient = blobContainerClient.getBlobClient(name);
+            blobClient.delete();
+            return name;
+        }).collect(Collectors.toSet());
+        System.out.println("total number of blobs deleted: " + 
deletedBlobs.size());
+        // Todo this method might need some clean coding. Method references 
might be helpful.
+    }
+
+    @Override
+    public long getObjectSize(String bucket, String path) throws 
HyracksDataException {
+        BlobContainerClient blobContainerClient = 
azBlobStorageClientProvider.getBlobContainerClient(bucket); //Todo: why the 
call in first line; side effect wala code.
+        BlobClient blobClient = 
azBlobStorageClientProvider.getBlobClient(path);
+        /*BlobContainerClient blobContainerClient = 
this.azBlobStorageGateway.getBlobContainerClient(bucket); // Todo: careful with 
the following two lines. The container client fetched for a bucket might not be 
the same when getBlobClient is called in the subsequent line.
+        BlobClient blobClient = blobContainerClient.getBlobClient(path);*/
+        return blobClient.getProperties().getBlobSize();
+        // Todo: where is the handling for HyracksDataException
+    }
+
+    @Override
+    public boolean exists(String bucket, String path) throws 
HyracksDataException {
+        BlobContainerClient blobContainerClient = 
azBlobStorageClientProvider.getBlobContainerClient(bucket);
+        BlobClient blobClient = 
azBlobStorageClientProvider.getBlobClient(path);
+        /*BlobContainerClient blobContainerClient = 
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+        BlobClient blobClient = blobContainerClient.getBlobClient(path);*/
+        return blobClient.exists();
+        // Todo: Handle Hyracks Data Exception thrown in the signature
+    }
+
+    @Override
+    public boolean isEmptyPrefix(String bucket, String path) throws 
HyracksDataException {
+        ListBlobsOptions listBlobsOptions = new 
ListBlobsOptions().setPrefix(path);
+        BlobContainerClient blobContainerClient = 
this.azBlobStorageGateway.getBlobContainerClient(bucket);
+        PagedIterable<BlobItem> blobItems = 
blobContainerClient.listBlobs(listBlobsOptions, null);// Todo: what is this 
null thingy; bad code
+        return blobItems.stream().findAny().isEmpty(); // Todo: handle the 
Hyracks data exception
+    }
+
+    @Override
+    public IParallelDownloader createParallelDownloader(String bucket, 
IOManager ioManager) {
+        return new AzParallelDownloader(bucket, ioManager,
+                this.azBlobStorageGateway/*, 
this.azBlobStorageClientProvider*/);
+    }
+
+    @Override
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+        PagedIterable<BlobItem> blobItems = this.getBlobItems(bucket, 
BUCKET_ROOT_PATH);
+        List<BlobItem> blobs = 
blobItems.stream().distinct().collect(Collectors.toList());
+        ArrayNode objectsInfo = objectMapper.createArrayNode();
+        blobs = blobs.stream()
+                .sorted((blob1, blob2) -> 
String.CASE_INSENSITIVE_ORDER.compare(blob1.getName(), blob2.getName()))
+                .collect(Collectors.toList());
+        for (BlobItem blob : blobs) {
+            ObjectNode objectInfo = objectsInfo.addObject();
+            objectInfo.put("path", blob.getName());
+            objectInfo.put("size", blob.getProperties().getContentLength());
+        }
+        return objectsInfo;
+    }
+
+    @Override
+    public void close() {
+        // Closing Azure Blob Clients is not required as the underlying netty 
connection pool
+        // handles the same for the apps.
+        // Ref: https://github.com/Azure/azure-sdk-for-java/issues/17903
+        // Hence this implementation is a no op.
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
new file mode 100644
index 0000000..c915ce3
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
@@ -0,0 +1,4 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+public class AzBlobStorageClientConfig {
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientProvider.java
new file mode 100644
index 0000000..f5d6a8f
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientProvider.java
@@ -0,0 +1,52 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import com.azure.storage.blob.*;
+import com.azure.storage.common.StorageSharedKeyCredential;
+
+public class AzBlobStorageClientProvider {
+    private BlobServiceClient blobServiceClient;
+    private BlobContainerClient blobContainerClient;
+
+    private /*final*/ AzBlobStorageGateway azBlobStorageGateway;
+
+    public AzBlobStorageClientProvider(final String azServiceAccount) {
+        final String containerName = "cloud-storage-container"; // Todo: this 
is already defined as a constant
+        //this.azBlobStorageGateway = new 
AzBlobStorageGateway(azServiceAccount, containerName);
+        final String accKey =
+                
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+        final String accName = "devstoreaccount1";
+        StorageSharedKeyCredential creds = new 
StorageSharedKeyCredential(accName, accKey);
+        //String endpointString = 
String.format("https://%s.blob.core.windows.net";, azServiceAccount);
+        final String endpointString = 
"http://127.0.0.1:8001/devstoreaccount1/"; + containerName;
+        BlobContainerClient b = new 
BlobContainerClientBuilder().containerName(containerName).credential(creds)
+                .endpoint(endpointString).buildClient();
+
+        if (!b.exists())
+            b.create();
+
+        System.out.printf("Is container %s created: %b\n", containerName, 
b.exists());
+        this.blobContainerClient = b;
+        System.out.println("ABS client provider created successfully...");
+    }
+
+    // Todo: @nawazish-couchbase null check against blobServiceClient
+    public BlobServiceClient getBlobServiceClient() {
+        return this.blobServiceClient;
+    }
+
+    // Todo: @nawazish-couchbase null check against blobServiceClient?
+    public BlobContainerClient getBlobContainerClient(String containerName) {
+        //this.blobServiceClient.createBlobContainerIfNotExists(containerName);
+        //this.blobContainerClient = 
this.blobServiceClient.getBlobContainerClient(containerName);
+        return this.blobContainerClient;
+    }
+
+    public BlobClient getBlobClient(String path) {
+        return this.blobContainerClient.getBlobClient(path);
+    }
+
+    public BlobAsyncClient getBlobAsyncClient(String path) {
+        return null;
+        // Todo implement the Async part of this
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageGateway.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageGateway.java
new file mode 100644
index 0000000..3c57ab1
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageGateway.java
@@ -0,0 +1,75 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.util.Objects;
+
+import com.azure.storage.blob.*;
+import com.azure.storage.common.StorageSharedKeyCredential;
+
+public class AzBlobStorageGateway {
+
+    private final String serviceAccount;
+    private final String bucket;
+    private final static String AZ_SERVICE_ACCOUNT = 
"2431722c5003454b863ea79d"; // Todo: remove
+    private final static String AZ_SERVICE_ACCOUNT_KEY =
+            
"DgB5H3MH5Vm0UNxh8OhzvYZyBua7qIpFfzG1zwCiSTcsPlKYU7TGmqvEB97DC/kV3piNkwlNaw4m+AStgKxgcg==";
 // Todo: remove
+    private final BlobServiceClient blobServiceClient;
+    //private final BlobContainerClient blobContainerClient;
+
+    public AzBlobStorageGateway(final String serviceAccount, final String 
bucket) {
+        sanitizeInputs(serviceAccount, bucket);
+        this.serviceAccount = serviceAccount;
+        this.bucket = bucket;
+        this.blobServiceClient = buildBlobServiceClient();
+        //this.blobContainerClient = buildBlobContainerClient(bucket);
+    }
+
+    private static void sanitizeInputs(final String serviceAccount, final 
String bucket) {
+        Objects.requireNonNull(serviceAccount, "Azure Service Account cannot 
be null");
+        Objects.requireNonNull(bucket, "Azure Container name cannot be null");
+        if (serviceAccount.isEmpty())
+            throw new IllegalArgumentException("Azure Service Account cannot 
be empty string");
+        if (bucket.isEmpty())
+            throw new IllegalArgumentException("Azure Container name cannot be 
empty string");
+    }
+
+    private BlobServiceClient buildBlobServiceClient() {
+        //String endpointString = 
String.format("https://%s.blob.core.windows.net";, this.serviceAccount);
+        final String accKey =
+                
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+        final String accName = "devstoreaccount1";
+        String endpointString = 
"http://127.0.0.1:8001/devstoreaccount1/instance-8ekq10wlu-multinodeapi-bucket";;
+        BlobServiceClient client = new 
BlobServiceClientBuilder().endpoint(endpointString)
+                //.credential(new DefaultAzureCredentialBuilder().build()) // 
Todo: Would be used in real scenario and testing
+                .credential(new StorageSharedKeyCredential(accName, 
accKey)).buildClient();
+        return client;
+    }
+
+    /*private BlobContainerClient buildBlobContainerClient(final String 
bucket) {
+        return this.blobServiceClient.getBlobContainerClient(bucket);
+    }*/
+
+    public BlobClient getBlobClient(final String path) {
+        return this.getBlobContainerClient(this.bucket).getBlobClient(path);
+    }
+
+    public BlobContainerClient getBlobContainerClient(final String bucket) {
+        return this.blobServiceClient.getBlobContainerClient(bucket);
+    }
+
+    public BlobContainerAsyncClient getBlobContainerAsyncClient(final String 
bucket) {
+        final String accKey =
+                
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+        final String accName = "devstoreaccount1";
+        //final String endpoint = 
"http://127.0.0.1:10000/devstoreaccount1/"+containerName;
+        final String endpoint = "http://127.0.0.1:8001/devstoreaccount1/"; + 
bucket;
+        StorageSharedKeyCredential creds = new 
StorageSharedKeyCredential(accName, accKey);
+        BlobContainerAsyncClient b = new 
BlobContainerClientBuilder().containerName(bucket).credential(creds)
+                .endpoint(endpoint).buildAsyncClient();
+        //b.createIfNotExists();
+        return b;
+    }
+
+    public BlobAsyncClient getBlobAsyncClient(final String path) {
+        return null;
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzParallelDownloader.java
new file mode 100644
index 0000000..2c650cd
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzParallelDownloader.java
@@ -0,0 +1,109 @@
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobAsyncClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerAsyncClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+
+public class AzParallelDownloader implements IParallelDownloader {
+    private final String bucket; // Todo: never used.
+    private final IOManager ioManager;
+    //private AzBlobStorageClientProvider azBlobStorageClientProvider;
+    private AzBlobStorageGateway azBlobStorageGateway;
+
+    public AzParallelDownloader(String bucket, IOManager ioManager,
+            AzBlobStorageGateway azBlobStorageGateway/*,
+                                                     
AzBlobStorageClientProvider azBlobStorageClientProvider*/) { // Todo: 3 params 
one too many?
+        this.bucket = bucket;
+        this.ioManager = ioManager;
+        this.azBlobStorageGateway = azBlobStorageGateway;
+        //this.azBlobStorageClientProvider = azBlobStorageClientProvider;
+    }
+
+    @Override
+    public void downloadFiles(Collection<FileReference> toDownload) throws 
HyracksDataException {
+        // Todo: handle HyracksDataException
+        toDownload.parallelStream().forEach(fileReference -> {
+            //BlobAsyncClient blobAsyncClient = 
azBlobStorageClientProvider.getBlobAsyncClient(fileReference.getName());
+            BlobContainerAsyncClient blobContainerAsyncClient =
+                    
azBlobStorageGateway.getBlobContainerAsyncClient(this.bucket);
+            BlobAsyncClient blobAsyncClient = 
blobContainerAsyncClient.getBlobAsyncClient(fileReference.getName());
+            ByteBuffer blob = blobAsyncClient.downloadStream().blockLast();
+            //assert blob != null;
+            byte[] data = blob.array(); // Todo
+            try (FileOutputStream fileOutputStream = new 
FileOutputStream(fileReference.getFile())) {
+                FileUtils.createParentDirectories(fileReference.getFile());
+                fileOutputStream.write(data);
+                fileOutputStream.flush();
+            } catch (IOException e) {
+                // Todo: what to do with exceptions,
+                //  what happens to the remaining files both on cloud and 
downloaded successfully in local NVMe
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Override
+    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
+            throws HyracksDataException {
+        Set<FileReference> failedFiles = new HashSet<>();
+        toDownload.stream().parallel().forEach(directoryToDownload -> {
+
+            //BlobContainerClient blobContainerClient = 
azBlobStorageClientProvider.getBlobContainerClient(""); // Todo: Container name
+            BlobContainerClient blobContainerClient = 
this.azBlobStorageGateway.getBlobContainerClient(this.bucket); // Todo: 
Container name
+            ListBlobsOptions listBlobsOptions = new 
ListBlobsOptions().setPrefix(directoryToDownload.getName());
+            PagedIterable<BlobItem> blobItems = 
blobContainerClient.listBlobs(listBlobsOptions, null); // Todo: what is this 
null?
+
+            blobItems.stream().parallel().forEachOrdered(blobItem -> { // 
Todo: Ugly lambdas; 2 of them
+                Path path = Path.of(blobItem.getName());
+                try {
+                    Path parentDirectory = path.getParent();
+                    if (Files.notExists(parentDirectory))
+                        Files.createDirectories(parentDirectory);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                BlobClient blobClient = 
blobContainerClient.getBlobClient(blobItem.getName());
+                try {
+                    blobClient.downloadStream(new 
FileOutputStream(blobItem.getName()));
+                } catch (FileNotFoundException e) {
+                    try {
+                        FileReference failedFile = 
ioManager.resolve(blobItem.getName());
+                        failedFiles.add(failedFile);
+                    } catch (HyracksDataException ex) { // Todo: Ugly 
try-catch ladder; declare all exceptions on method; refer S3 implementation.
+                        throw new RuntimeException(ex);
+                    }
+                    throw new RuntimeException(e);
+                }
+            });
+        });
+        return failedFiles;
+    }
+
+    @Override
+    public void close() {
+        // Unlike S3 clients, the Azure clients are not required to be 
explicitly closed.
+        // Hence this would remain to be a no-impl method.
+        // Todo: reverify the above assumption and document it with a proper 
supportive documentation from Azure.
+
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18235
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ie9deece8ba98a5b8be7c817baea286cd50d88e68
Gerrit-Change-Number: 18235
Gerrit-PatchSet: 1
Gerrit-Owner: Mohammad Nawazish Khan <[email protected]>
Gerrit-MessageType: newchange

Reply via email to