>From Murtadha Hubail <[email protected]>: Murtadha Hubail has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18770 )
Change subject: ASTERIXDB-3493: The storage engine should use the cloud storage prefix whilst writing objects to Google Cloud Storage (GCS) ...................................................................... ASTERIXDB-3493: The storage engine should use the cloud storage prefix whilst writing objects to Google Cloud Storage (GCS) - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-63369 Details: When the object store used at the backend is Google Cloud Storage, the current storage engine implementation does not use the cloud storage prefix, as a result the database objects are written directly under the bucket. After the implementation the expectation is that the database objects would be written under the prefix directory. Change-Id: I7b84bf98272581bc96851855d4bd8663780ab611 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18770 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java M asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java 5 files changed, 75 insertions(+), 28 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified; Verified Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java index fe5dd4d..e4e471d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java @@ -19,6 +19,7 @@ package org.apache.asterix.cloud.clients.google.gcs; import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.STORAGE_PREFIX; import java.io.IOException; import java.util.Map; @@ -42,10 +43,11 @@ private final int readMaxRequestsPerSeconds; private final int writeMaxRequestsPerSeconds; private final int writeBufferSize; + private final String prefix; private GCSClientConfig(String region, String endpoint, boolean anonymousAuth, long profilerLogInterval, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int readMaxRequestsPerSeconds, - int writeBufferSize) { + int writeBufferSize, String prefix) { this.region = region; this.endpoint = endpoint; this.anonymousAuth = anonymousAuth; @@ -54,18 +56,20 @@ this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds; this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds; this.writeBufferSize = writeBufferSize; + this.prefix = prefix; } public GCSClientConfig(String region, String endpoint, boolean anonymousAuth, long profilerLogInterval, - int writeBufferSize) { - this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, writeBufferSize); + int writeBufferSize, String prefix) { + this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, writeBufferSize, prefix); } public static GCSClientConfig of(CloudProperties cloudProperties) { return new GCSClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(), cloudProperties.isStorageAnonymousAuth(), cloudProperties.getProfilerLogInterval(), cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(), - cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize()); + cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize(), + cloudProperties.getStoragePrefix()); } public static GCSClientConfig of(Map<String, String> configuration, int writeBufferSize) { @@ -73,10 +77,10 @@ long profilerLogInterval = 0; String region = ""; - String prefix = ""; + String prefix = configuration.getOrDefault(STORAGE_PREFIX, ""); boolean anonymousAuth = false; - return new GCSClientConfig(region, endPoint, anonymousAuth, profilerLogInterval, writeBufferSize); + return new GCSClientConfig(region, endPoint, anonymousAuth, profilerLogInterval, writeBufferSize, prefix); } public String getRegion() { @@ -118,4 +122,8 @@ public int getWriteBufferSize() { return writeBufferSize; } + + public String getPrefix() { + return prefix; + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java index 010a6bb..a74e7d3 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java @@ -100,20 +100,20 @@ @Override public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) { - return new GCSWriter(bucket, path, gcsClient, profilerLimiter, guardian, writeBufferSize); + return new GCSWriter(bucket, config.getPrefix() + path, gcsClient, profilerLimiter, guardian, writeBufferSize); } @Override public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) { guardian.checkReadAccess(bucket, path); profilerLimiter.objectsList(); - Page<Blob> blobs = - gcsClient.list(bucket, BlobListOption.prefix(path), BlobListOption.fields(Storage.BlobField.SIZE)); + Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path), + BlobListOption.fields(Storage.BlobField.SIZE)); Set<CloudFile> files = new HashSet<>(); for (Blob blob : blobs.iterateAll()) { if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) { - files.add(CloudFile.of(blob.getName(), blob.getSize())); + files.add(CloudFile.of(stripCloudPrefix(blob.getName()), blob.getSize())); } } return files; @@ -123,7 +123,7 @@ public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); - BlobId blobId = BlobId.of(bucket, path); + BlobId blobId = BlobId.of(bucket, config.getPrefix() + path); long readTo = offset + buffer.remaining(); int totalRead = 0; try (ReadChannel from = gcsClient.reader(blobId).limit(readTo)) { @@ -145,7 +145,7 @@ public byte[] readAllBytes(String bucket, String path) { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); - BlobId blobId = BlobId.of(bucket, path); + BlobId blobId = BlobId.of(bucket, config.getPrefix() + path); try { return gcsClient.readAllBytes(blobId); } catch (StorageException e) { @@ -157,7 +157,7 @@ public InputStream getObjectStream(String bucket, String path, long offset, long length) { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); - try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset + length)) { + try (ReadChannel reader = gcsClient.reader(bucket, config.getPrefix() + path).limit(offset + length)) { reader.seek(offset); return Channels.newInputStream(reader); } catch (StorageException | IOException e) { @@ -169,7 +169,7 @@ public void write(String bucket, String path, byte[] data) { guardian.checkWriteAccess(bucket, path); profilerLimiter.objectWrite(); - BlobInfo blobInfo = BlobInfo.newBuilder(bucket, path).build(); + BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build(); gcsClient.create(blobInfo, data); } @@ -177,7 +177,7 @@ public void copy(String bucket, String srcPath, FileReference destPath) { guardian.checkReadAccess(bucket, srcPath); profilerLimiter.objectsList(); - Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath)); + Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath)); for (Blob blob : blobs.iterateAll()) { profilerLimiter.objectCopy(); BlobId source = blob.getBlobId(); @@ -200,7 +200,7 @@ while (pathIter.hasNext()) { batchRequest = gcsClient.batch(); for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) { - BlobId blobId = BlobId.of(bucket, pathIter.next()); + BlobId blobId = BlobId.of(bucket, config.getPrefix() + pathIter.next()); guardian.checkWriteAccess(bucket, blobId.getName()); batchRequest.delete(blobId); } @@ -214,7 +214,8 @@ public long getObjectSize(String bucket, String path) { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); - Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE)); + Blob blob = + gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE)); if (blob == null) { return 0; } @@ -225,7 +226,8 @@ public boolean exists(String bucket, String path) { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); - Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values())); + Blob blob = gcsClient.get(bucket, config.getPrefix() + path, + Storage.BlobGetOption.fields(Storage.BlobField.values())); return blob != null && blob.exists(); } @@ -233,7 +235,7 @@ public boolean isEmptyPrefix(String bucket, String path) { guardian.checkReadAccess(bucket, path); profilerLimiter.objectsList(); - Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(path)); + Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path)); return !blobs.hasNextPage(); } @@ -278,4 +280,8 @@ } return builder.build().getService(); } + + private String stripCloudPrefix(String objectName) { + return objectName.substring(config.getPrefix().length()); + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java index 0994cea..0d30120 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java @@ -56,6 +56,7 @@ private final Storage gcsClient; private final TransferManager transferManager; private final IRequestProfilerLimiter profiler; + private final GCSClientConfig config; public GCSParallelDownloader(String bucket, IOManager ioManager, GCSClientConfig config, IRequestProfilerLimiter profiler) throws HyracksDataException { @@ -70,18 +71,21 @@ this.gcsClient = builder.build().getService(); this.transferManager = TransferManagerConfig.newBuilder().setStorageOptions(builder.build()).build().getService(); + this.config = config; } @Override public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException { - ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket); + ParallelDownloadConfig.Builder downConfig = + ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix()); + Map<Path, List<BlobInfo>> pathListMap = new HashMap<>(); try { for (FileReference fileReference : toDownload) { profiler.objectGet(); FileUtils.createParentDirectories(fileReference.getFile()); - addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(), - BlobInfo.newBuilder(BlobId.of(bucket, fileReference.getRelativePath())).build()); + addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(), BlobInfo + .newBuilder(BlobId.of(bucket, config.getPrefix() + fileReference.getRelativePath())).build()); } } catch (IOException e) { throw HyracksDataException.create(e); @@ -89,7 +93,7 @@ List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size()); for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) { downloadJobs.add(transferManager.downloadBlobs(entry.getValue(), - config.setDownloadDirectory(entry.getKey()).build())); + downConfig.setDownloadDirectory(entry.getKey()).build())); } downloadJobs.forEach(DownloadJob::getDownloadResults); } @@ -98,20 +102,22 @@ public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) throws HyracksDataException { Set<FileReference> failedFiles = new HashSet<>(); - ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket); + ParallelDownloadConfig.Builder config = + ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix()); Map<Path, List<BlobInfo>> pathListMap = new HashMap<>(); for (FileReference fileReference : toDownload) { profiler.objectMultipartDownload(); - Page<Blob> blobs = gcsClient.list(bucket, Storage.BlobListOption.prefix(fileReference.getRelativePath())); + Page<Blob> blobs = gcsClient.list(bucket, + Storage.BlobListOption.prefix(this.config.getPrefix() + fileReference.getRelativePath())); for (Blob blob : blobs.iterateAll()) { addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(), blob.asBlobInfo()); } } List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size()); for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) { - downloadJobs.add(transferManager.downloadBlobs(entry.getValue(), - config.setDownloadDirectory(entry.getKey()).build())); + ParallelDownloadConfig parallelDownloadConfig = config.setDownloadDirectory(entry.getKey()).build(); + downloadJobs.add(transferManager.downloadBlobs(entry.getValue(), parallelDownloadConfig)); } List<DownloadResult> results; for (DownloadJob job : downloadJobs) { diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java index 09cc3f6..08864ac 100644 --- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java +++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java @@ -51,7 +51,7 @@ LOGGER.info("Client created successfully"); int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE); GCSClientConfig config = - new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize); + new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize, ""); CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java index f2dbde7..6314ce8 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java @@ -26,6 +26,7 @@ public static final String APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME = "applicationDefaultCredentials"; public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials"; public static final String ENDPOINT_FIELD_NAME = "endpoint"; + public static final String STORAGE_PREFIX = "prefix"; /* * Hadoop internal configuration -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18770 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I7b84bf98272581bc96851855d4bd8663780ab611 Gerrit-Change-Number: 18770 Gerrit-PatchSet: 6 Gerrit-Owner: Mohammad Nawazish Khan <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-MessageType: merged
