>From Murtadha Hubail <[email protected]>:

Murtadha Hubail has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18770 )

Change subject: ASTERIXDB-3493: The storage engine should use the cloud storage 
prefix whilst writing objects to Google Cloud Storage (GCS)
......................................................................

ASTERIXDB-3493: The storage engine should use the cloud storage prefix whilst 
writing objects to Google Cloud Storage (GCS)

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-63369

Details:

When the object store used at the backend is Google Cloud Storage, the current 
storage engine implementation does not use
the cloud storage prefix, as a result the database objects are written directly 
under the bucket.

After the implementation the expectation is that the database objects would be 
written under the prefix directory.

Change-Id: I7b84bf98272581bc96851855d4bd8663780ab611
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18770
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
M 
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
5 files changed, 75 insertions(+), 28 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved
  Jenkins: Verified; Verified
  Anon. E. Moose #1000171:




diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
index fe5dd4d..e4e471d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.cloud.clients.google.gcs;

 import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
+import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.STORAGE_PREFIX;

 import java.io.IOException;
 import java.util.Map;
@@ -42,10 +43,11 @@
     private final int readMaxRequestsPerSeconds;
     private final int writeMaxRequestsPerSeconds;
     private final int writeBufferSize;
+    private final String prefix;

     private GCSClientConfig(String region, String endpoint, boolean 
anonymousAuth, long profilerLogInterval,
             long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int 
readMaxRequestsPerSeconds,
-            int writeBufferSize) {
+            int writeBufferSize, String prefix) {
         this.region = region;
         this.endpoint = endpoint;
         this.anonymousAuth = anonymousAuth;
@@ -54,18 +56,20 @@
         this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
         this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
         this.writeBufferSize = writeBufferSize;
+        this.prefix = prefix;
     }
 
     public GCSClientConfig(String region, String endpoint, boolean 
anonymousAuth, long profilerLogInterval,
-            int writeBufferSize) {
-        this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, 
writeBufferSize);
+            int writeBufferSize, String prefix) {
+        this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, 
writeBufferSize, prefix);
     }

     public static GCSClientConfig of(CloudProperties cloudProperties) {
         return new GCSClientConfig(cloudProperties.getStorageRegion(), 
cloudProperties.getStorageEndpoint(),
                 cloudProperties.isStorageAnonymousAuth(), 
cloudProperties.getProfilerLogInterval(),
                 cloudProperties.getTokenAcquireTimeout(), 
cloudProperties.getWriteMaxRequestsPerSecond(),
-                cloudProperties.getReadMaxRequestsPerSecond(), 
cloudProperties.getWriteBufferSize());
+                cloudProperties.getReadMaxRequestsPerSecond(), 
cloudProperties.getWriteBufferSize(),
+                cloudProperties.getStoragePrefix());
     }

     public static GCSClientConfig of(Map<String, String> configuration, int 
writeBufferSize) {
@@ -73,10 +77,10 @@
         long profilerLogInterval = 0;

         String region = "";
-        String prefix = "";
+        String prefix = configuration.getOrDefault(STORAGE_PREFIX, "");
         boolean anonymousAuth = false;

-        return new GCSClientConfig(region, endPoint, anonymousAuth, 
profilerLogInterval, writeBufferSize);
+        return new GCSClientConfig(region, endPoint, anonymousAuth, 
profilerLogInterval, writeBufferSize, prefix);
     }

     public String getRegion() {
@@ -118,4 +122,8 @@
     public int getWriteBufferSize() {
         return writeBufferSize;
     }
+
+    public String getPrefix() {
+        return prefix;
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 010a6bb..a74e7d3 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -100,20 +100,20 @@

     @Override
     public ICloudWriter createWriter(String bucket, String path, 
IWriteBufferProvider bufferProvider) {
-        return new GCSWriter(bucket, path, gcsClient, profilerLimiter, 
guardian, writeBufferSize);
+        return new GCSWriter(bucket, config.getPrefix() + path, gcsClient, 
profilerLimiter, guardian, writeBufferSize);
     }

     @Override
     public Set<CloudFile> listObjects(String bucket, String path, 
FilenameFilter filter) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        Page<Blob> blobs =
-                gcsClient.list(bucket, BlobListOption.prefix(path), 
BlobListOption.fields(Storage.BlobField.SIZE));
+        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + path),
+                BlobListOption.fields(Storage.BlobField.SIZE));

         Set<CloudFile> files = new HashSet<>();
         for (Blob blob : blobs.iterateAll()) {
             if (filter.accept(null, 
IoUtil.getFileNameFromPath(blob.getName()))) {
-                files.add(CloudFile.of(blob.getName(), blob.getSize()));
+                files.add(CloudFile.of(stripCloudPrefix(blob.getName()), 
blob.getSize()));
             }
         }
         return files;
@@ -123,7 +123,7 @@
     public int read(String bucket, String path, long offset, ByteBuffer 
buffer) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        BlobId blobId = BlobId.of(bucket, path);
+        BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
         long readTo = offset + buffer.remaining();
         int totalRead = 0;
         try (ReadChannel from = gcsClient.reader(blobId).limit(readTo)) {
@@ -145,7 +145,7 @@
     public byte[] readAllBytes(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        BlobId blobId = BlobId.of(bucket, path);
+        BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
         try {
             return gcsClient.readAllBytes(blobId);
         } catch (StorageException e) {
@@ -157,7 +157,7 @@
     public InputStream getObjectStream(String bucket, String path, long 
offset, long length) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset 
+ length)) {
+        try (ReadChannel reader = gcsClient.reader(bucket, config.getPrefix() 
+ path).limit(offset + length)) {
             reader.seek(offset);
             return Channels.newInputStream(reader);
         } catch (StorageException | IOException e) {
@@ -169,7 +169,7 @@
     public void write(String bucket, String path, byte[] data) {
         guardian.checkWriteAccess(bucket, path);
         profilerLimiter.objectWrite();
-        BlobInfo blobInfo = BlobInfo.newBuilder(bucket, path).build();
+        BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + 
path).build();
         gcsClient.create(blobInfo, data);
     }

@@ -177,7 +177,7 @@
     public void copy(String bucket, String srcPath, FileReference destPath) {
         guardian.checkReadAccess(bucket, srcPath);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(srcPath));
+        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + srcPath));
         for (Blob blob : blobs.iterateAll()) {
             profilerLimiter.objectCopy();
             BlobId source = blob.getBlobId();
@@ -200,7 +200,7 @@
         while (pathIter.hasNext()) {
             batchRequest = gcsClient.batch();
             for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
-                BlobId blobId = BlobId.of(bucket, pathIter.next());
+                BlobId blobId = BlobId.of(bucket, config.getPrefix() + 
pathIter.next());
                 guardian.checkWriteAccess(bucket, blobId.getName());
                 batchRequest.delete(blobId);
             }
@@ -214,7 +214,8 @@
     public long getObjectSize(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        Blob blob = gcsClient.get(bucket, path, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        Blob blob =
+                gcsClient.get(bucket, config.getPrefix() + path, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
         if (blob == null) {
             return 0;
         }
@@ -225,7 +226,8 @@
     public boolean exists(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        Blob blob = gcsClient.get(bucket, path, 
Storage.BlobGetOption.fields(Storage.BlobField.values()));
+        Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
+                Storage.BlobGetOption.fields(Storage.BlobField.values()));
         return blob != null && blob.exists();
     }

@@ -233,7 +235,7 @@
     public boolean isEmptyPrefix(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(path));
+        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + path));
         return !blobs.hasNextPage();
     }

@@ -278,4 +280,8 @@
         }
         return builder.build().getService();
     }
+
+    private String stripCloudPrefix(String objectName) {
+        return objectName.substring(config.getPrefix().length());
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index 0994cea..0d30120 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -56,6 +56,7 @@
     private final Storage gcsClient;
     private final TransferManager transferManager;
     private final IRequestProfilerLimiter profiler;
+    private final GCSClientConfig config;

     public GCSParallelDownloader(String bucket, IOManager ioManager, 
GCSClientConfig config,
             IRequestProfilerLimiter profiler) throws HyracksDataException {
@@ -70,18 +71,21 @@
         this.gcsClient = builder.build().getService();
         this.transferManager =
                 
TransferManagerConfig.newBuilder().setStorageOptions(builder.build()).build().getService();
+        this.config = config;
     }

     @Override
     public void downloadFiles(Collection<FileReference> toDownload) throws 
HyracksDataException {
-        ParallelDownloadConfig.Builder config = 
ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+        ParallelDownloadConfig.Builder downConfig =
+                
ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());
+
         Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
         try {
             for (FileReference fileReference : toDownload) {
                 profiler.objectGet();
                 FileUtils.createParentDirectories(fileReference.getFile());
-                addToMap(pathListMap, 
fileReference.getDeviceHandle().getMount().toPath(),
-                        BlobInfo.newBuilder(BlobId.of(bucket, 
fileReference.getRelativePath())).build());
+                addToMap(pathListMap, 
fileReference.getDeviceHandle().getMount().toPath(), BlobInfo
+                        .newBuilder(BlobId.of(bucket, config.getPrefix() + 
fileReference.getRelativePath())).build());
             }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
@@ -89,7 +93,7 @@
         List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
         for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
             downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
-                    config.setDownloadDirectory(entry.getKey()).build()));
+                    downConfig.setDownloadDirectory(entry.getKey()).build()));
         }
         downloadJobs.forEach(DownloadJob::getDownloadResults);
     }
@@ -98,20 +102,22 @@
     public Collection<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
             throws HyracksDataException {
         Set<FileReference> failedFiles = new HashSet<>();
-        ParallelDownloadConfig.Builder config = 
ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+        ParallelDownloadConfig.Builder config =
+                
ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());

         Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
         for (FileReference fileReference : toDownload) {
             profiler.objectMultipartDownload();
-            Page<Blob> blobs = gcsClient.list(bucket, 
Storage.BlobListOption.prefix(fileReference.getRelativePath()));
+            Page<Blob> blobs = gcsClient.list(bucket,
+                    Storage.BlobListOption.prefix(this.config.getPrefix() + 
fileReference.getRelativePath()));
             for (Blob blob : blobs.iterateAll()) {
                 addToMap(pathListMap, 
fileReference.getDeviceHandle().getMount().toPath(), blob.asBlobInfo());
             }
         }
         List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
         for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
-            downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
-                    config.setDownloadDirectory(entry.getKey()).build()));
+            ParallelDownloadConfig parallelDownloadConfig = 
config.setDownloadDirectory(entry.getKey()).build();
+            downloadJobs.add(transferManager.downloadBlobs(entry.getValue(), 
parallelDownloadConfig));
         }
         List<DownloadResult> results;
         for (DownloadJob job : downloadJobs) {
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index 09cc3f6..08864ac 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -51,7 +51,7 @@
         LOGGER.info("Client created successfully");
         int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
         GCSClientConfig config =
-                new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
true, 0, writeBufferSize);
+                new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
true, 0, writeBufferSize, "");
         CLOUD_CLIENT = new GCSCloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index f2dbde7..6314ce8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -26,6 +26,7 @@
     public static final String APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME = 
"applicationDefaultCredentials";
     public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
     public static final String ENDPOINT_FIELD_NAME = "endpoint";
+    public static final String STORAGE_PREFIX = "prefix";

     /*
      * Hadoop internal configuration

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18770
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7b84bf98272581bc96851855d4bd8663780ab611
Gerrit-Change-Number: 18770
Gerrit-PatchSet: 6
Gerrit-Owner: Mohammad Nawazish Khan <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-MessageType: merged

Reply via email to