>From Wail Alkowaileet <[email protected]>: Wail Alkowaileet has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18645 )
Change subject: [NO ISSUE]: Add Guardian to GCSWriter, Request Limits for GCS ...................................................................... [NO ISSUE]: Add Guardian to GCSWriter, Request Limits for GCS Ext-ref: MB-63055 Change-Id: Id639afcadb1d88b4630e12dde40dbaae94e15f23 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18645 Integration-Tests: Jenkins <[email protected]> Tested-by: Wail Alkowaileet <[email protected]> Reviewed-by: Wail Alkowaileet <[email protected]> --- M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java A asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSRequestRateLimiter.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java M asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java M asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java 8 files changed, 141 insertions(+), 29 deletions(-) Approvals: Wail Alkowaileet: Looks good to me, approved; Verified Jenkins: Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java index 3a03445..89a4781 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java @@ -36,7 +36,6 @@ import org.junit.Assume; import org.junit.BeforeClass; import org.junit.FixMethodOrder; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.MethodSorters; @@ -55,7 +54,6 @@ */ @RunWith(Parameterized.class) @FixMethodOrder(MethodSorters.NAME_ASCENDING) -@Ignore public class CloudStorageGCSTest { private static final Logger LOGGER = LogManager.getLogger(); diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf index 3c883a8..0046644 100644 --- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf +++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf @@ -70,3 +70,6 @@ cloud.storage.endpoint=http://127.0.0.1:4443 cloud.storage.anonymous.auth=true cloud.storage.cache.policy=selective +cloud.max.write.requests.per.second=1000 +cloud.max.read.requests.per.second=5000 +cloud.write.buffer.size=5 diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java index 4edb7a7..fe5dd4d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java @@ -25,38 +25,50 @@ import org.apache.asterix.common.config.CloudProperties; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.util.StorageUtil; import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.OAuth2Credentials; import com.google.cloud.NoCredentials; public class GCSClientConfig { - public static final int WRITE_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(1, StorageUtil.StorageUnit.MEGABYTE); + // The maximum number of files that can be deleted (GCS restriction): https://cloud.google.com/storage/quotas#json-requests static final int DELETE_BATCH_SIZE = 100; private final String region; private final String endpoint; - private final String prefix; private final boolean anonymousAuth; private final long profilerLogInterval; + private final long tokenAcquireTimeout; + private final int readMaxRequestsPerSeconds; + private final int writeMaxRequestsPerSeconds; + private final int writeBufferSize; - public GCSClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, - long profilerLogInterval) { + private GCSClientConfig(String region, String endpoint, boolean anonymousAuth, long profilerLogInterval, + long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int readMaxRequestsPerSeconds, + int writeBufferSize) { this.region = region; this.endpoint = endpoint; - this.prefix = prefix; this.anonymousAuth = anonymousAuth; this.profilerLogInterval = profilerLogInterval; + this.tokenAcquireTimeout = tokenAcquireTimeout; + this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds; + this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds; + this.writeBufferSize = writeBufferSize; + } + + public GCSClientConfig(String region, String endpoint, boolean anonymousAuth, long profilerLogInterval, + int writeBufferSize) { + this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, writeBufferSize); } public static GCSClientConfig of(CloudProperties cloudProperties) { return new GCSClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(), - cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(), - cloudProperties.getProfilerLogInterval()); + cloudProperties.isStorageAnonymousAuth(), cloudProperties.getProfilerLogInterval(), + cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(), + cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize()); } - public static GCSClientConfig of(Map<String, String> configuration) { + public static GCSClientConfig of(Map<String, String> configuration, int writeBufferSize) { String endPoint = configuration.getOrDefault(ENDPOINT_FIELD_NAME, ""); long profilerLogInterval = 0; @@ -64,7 +76,7 @@ String prefix = ""; boolean anonymousAuth = false; - return new GCSClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval); + return new GCSClientConfig(region, endPoint, anonymousAuth, profilerLogInterval, writeBufferSize); } public String getRegion() { @@ -75,10 +87,6 @@ return endpoint; } - public String getPrefix() { - return prefix; - } - public long getProfilerLogInterval() { return profilerLogInterval; } @@ -94,4 +102,20 @@ throw HyracksDataException.create(e); } } + + public long getTokenAcquireTimeout() { + return tokenAcquireTimeout; + } + + public int getWriteMaxRequestsPerSeconds() { + return writeMaxRequestsPerSeconds; + } + + public int getReadMaxRequestsPerSeconds() { + return readMaxRequestsPerSeconds; + } + + public int getWriteBufferSize() { + return writeBufferSize; + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java index de242bd..010a6bb 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java @@ -40,8 +40,7 @@ import org.apache.asterix.cloud.clients.IParallelDownloader; import org.apache.asterix.cloud.clients.profiler.CountRequestProfilerLimiter; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; -import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfilerLimiter; -import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRequestLimiter; +import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.util.IoUtil; @@ -68,16 +67,19 @@ private final GCSClientConfig config; private final ICloudGuardian guardian; private final IRequestProfilerLimiter profilerLimiter; + private final int writeBufferSize; public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) { this.gcsClient = gcsClient; this.config = config; this.guardian = guardian; + this.writeBufferSize = config.getWriteBufferSize(); long profilerInterval = config.getProfilerLogInterval(); + GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config); if (profilerInterval > 0) { - profilerLimiter = new CountRequestProfilerLimiter(profilerInterval, NoOpRequestLimiter.INSTANCE); + profilerLimiter = new CountRequestProfilerLimiter(profilerInterval, limiter); } else { - profilerLimiter = NoOpRequestProfilerLimiter.INSTANCE; + profilerLimiter = new RequestLimiterNoOpProfiler(limiter); } guardian.setCloudClient(this); } @@ -88,7 +90,7 @@ @Override public int getWriteBufferSize() { - return GCSClientConfig.WRITE_BUFFER_SIZE; + return writeBufferSize; } @Override @@ -98,7 +100,7 @@ @Override public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) { - return new GCSWriter(bucket, path, gcsClient, profilerLimiter); + return new GCSWriter(bucket, path, gcsClient, profilerLimiter, guardian, writeBufferSize); } @Override @@ -119,6 +121,7 @@ @Override public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException { + guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); BlobId blobId = BlobId.of(bucket, path); long readTo = offset + buffer.remaining(); @@ -140,6 +143,7 @@ @Override public byte[] readAllBytes(String bucket, String path) { + guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); BlobId blobId = BlobId.of(bucket, path); try { @@ -151,6 +155,7 @@ @Override public InputStream getObjectStream(String bucket, String path, long offset, long length) { + guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset + length)) { reader.seek(offset); @@ -170,8 +175,9 @@ @Override public void copy(String bucket, String srcPath, FileReference destPath) { - Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath)); + guardian.checkReadAccess(bucket, srcPath); profilerLimiter.objectsList(); + Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath)); for (Blob blob : blobs.iterateAll()) { profilerLimiter.objectCopy(); BlobId source = blob.getBlobId(); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSRequestRateLimiter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSRequestRateLimiter.java new file mode 100644 index 0000000..71f6b8c --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSRequestRateLimiter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients.google.gcs; + +import org.apache.asterix.cloud.clients.profiler.limiter.IRateLimiter; +import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter; +import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRateLimiter; +import org.apache.asterix.cloud.clients.profiler.limiter.TokenBasedRateLimiter; + +public class GCSRequestRateLimiter implements IRequestRateLimiter { + private final IRateLimiter writeLimiter; + private final IRateLimiter readLimiter; + + public GCSRequestRateLimiter(GCSClientConfig config) { + long tokenAcquireTimeout = config.getTokenAcquireTimeout(); + this.writeLimiter = createLimiter(config.getWriteMaxRequestsPerSeconds(), tokenAcquireTimeout); + this.readLimiter = createLimiter(config.getReadMaxRequestsPerSeconds(), tokenAcquireTimeout); + } + + @Override + public void writeRequest() { + writeLimiter.acquire(); + } + + @Override + public void readRequest() { + readLimiter.acquire(); + } + + @Override + public void listRequest() { + readLimiter.acquire(); + } + + private static IRateLimiter createLimiter(int maxRequestsPerSecond, long tokeAcquireTimeout) { + if (maxRequestsPerSecond > 0) { + return new TokenBasedRateLimiter(maxRequestsPerSecond, tokeAcquireTimeout); + } + return NoOpRateLimiter.INSTANCE; + } +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java index 41d1a71..8d68f01 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java @@ -18,11 +18,10 @@ */ package org.apache.asterix.cloud.clients.google.gcs; -import static org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig.WRITE_BUFFER_SIZE; - import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.cloud.clients.ICloudWriter; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -40,14 +39,20 @@ private final String path; private final IRequestProfilerLimiter profiler; private final Storage gcsClient; + private final ICloudGuardian guardian; + private final int writeBufferSize; + private WriteChannel writer = null; private long writtenBytes; - public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfilerLimiter profiler) { + public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfilerLimiter profiler, + ICloudGuardian guardian, int writeBufferSize) { this.bucket = bucket; this.path = path; this.profiler = profiler; this.gcsClient = gcsClient; + this.guardian = guardian; + this.writeBufferSize = writeBufferSize; writtenBytes = 0; } @@ -58,6 +63,7 @@ @Override public int write(ByteBuffer page) throws HyracksDataException { + guardian.checkIsolatedWriteAccess(bucket, path); profiler.objectMultipartUpload(); setUploadId(); int written = 0; @@ -93,6 +99,7 @@ @Override public void finish() throws HyracksDataException { + guardian.checkWriteAccess(bucket, path); setUploadId(); profiler.objectMultipartUpload(); try { @@ -115,7 +122,7 @@ private void setUploadId() { if (writer == null) { writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, path)).build()); - writer.setChunkSize(WRITE_BUFFER_SIZE); + writer.setChunkSize(writeBufferSize); writtenBytes = 0; log("STARTED"); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java index 9e9c003..886f20d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java @@ -62,7 +62,7 @@ @Override ICloudClient createCloudClient() throws CompilationException { - GCSClientConfig config = GCSClientConfig.of(configuration); + GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize); return new GCSCloudClient(config, GCSUtils.buildClient(configuration), ICloudGuardian.NoOpCloudGuardian.INSTANCE); } diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java index 3c62cce..09cc3f6 100644 --- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java +++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java @@ -22,6 +22,7 @@ import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig; import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient; +import org.apache.hyracks.util.StorageUtil; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -48,7 +49,9 @@ client.create(BucketInfo.newBuilder(PLAYGROUND_CONTAINER).setStorageClass(StorageClass.STANDARD) .setLocation(MOCK_SERVER_REGION).build()); LOGGER.info("Client created successfully"); - GCSClientConfig config = new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0); + int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE); + GCSClientConfig config = + new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize); CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18645 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Id639afcadb1d88b4630e12dde40dbaae94e15f23 Gerrit-Change-Number: 18645 Gerrit-PatchSet: 5 Gerrit-Owner: Savyasach Reddy <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Wail Alkowaileet <[email protected]> Gerrit-MessageType: merged
