This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d674a1edb2c75bd983f8c1c010e0d89602707c1b Author: Rene Cordier <[email protected]> AuthorDate: Mon Oct 5 14:05:13 2020 +0700 JAMES-3028 Implement bucket name resolution in the S3BlobStoreDAO --- .../blob/objectstorage/aws/S3BlobStoreDAO.java | 56 +++++++++++++++------- .../blob/objectstorage/aws/S3BlobStoreDAOTest.java | 9 +++- .../aws/S3DeDuplicationBlobStoreTest.java | 10 +++- ...oughBlobStoreTest.java => S3NamespaceTest.java} | 20 +++++--- .../aws/S3PassThroughBlobStoreTest.java | 10 +++- ...toreTest.java => S3PrefixAndNamespaceTest.java} | 21 +++++--- ...ThroughBlobStoreTest.java => S3PrefixTest.java} | 17 ++++--- 7 files changed, 101 insertions(+), 42 deletions(-) diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java index 1cbcceb..1f73847 100644 --- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java @@ -87,21 +87,24 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { private static final int MAX_RETRIES = 5; private final InstrumentedPool<S3AsyncClient> clientPool; + private final BucketNameResolver bucketNameResolver; @Inject - S3BlobStoreDAO(AwsS3AuthConfiguration configuration, Region region) { + S3BlobStoreDAO(S3BlobStoreConfiguration configuration) { + AwsS3AuthConfiguration authConfiguration = configuration.getSpecificAuthConfiguration(); + S3Configuration pathStyleAccess = S3Configuration.builder() .pathStyleAccessEnabled(true) .build(); Callable<S3AsyncClient> clientCreator = () -> S3AsyncClient.builder() .credentialsProvider(StaticCredentialsProvider.create( - AwsBasicCredentials.create(configuration.getAccessKeyId(), configuration.getSecretKey()))) + AwsBasicCredentials.create(authConfiguration.getAccessKeyId(), authConfiguration.getSecretKey()))) .httpClientBuilder(NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .maxPendingConnectionAcquires(10_000)) - .endpointOverride(configuration.getEndpoint()) - .region(region.asAws()) + .endpointOverride(authConfiguration.getEndpoint()) + .region(configuration.getRegion().asAws()) .serviceConfiguration(pathStyleAccess) .build(); @@ -111,6 +114,11 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { .maxPendingAcquireUnbounded() .sizeUnbounded() .fifo(); + + bucketNameResolver = BucketNameResolver.builder() + .prefix(configuration.getBucketPrefix()) + .namespace(configuration.getNamespace()) + .build(); } public void start() { @@ -125,10 +133,12 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { @Override public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { - return getObject(bucketName, blobId) + BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); + + return getObject(resolvedBucketName, blobId) .map(response -> ReactorUtils.toInputStream(response.flux)) - .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + bucketName, e)) - .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + bucketName, e)) + .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) + .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + resolvedBucketName.asString(), e)) .block(); } @@ -174,24 +184,28 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { @Override public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { + BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); + return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.getObject( - builder -> builder.bucket(bucketName.asString()).key(blobId.asString()), + builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()), AsyncResponseTransformer.toBytes()))) .next() - .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + bucketName, e)) - .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + bucketName, e)) + .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) + .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + resolvedBucketName.asString(), e)) .map(BytesWrapper::asByteArray); } @Override public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { + BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); + return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.putObject( - builder -> builder.bucket(bucketName.asString()).key(blobId.asString()).contentLength((long) data.length), + builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()).contentLength((long) data.length), AsyncRequestBody.fromBytes(data)))) .next() - .retryWhen(createBucketOnRetry(bucketName)) + .retryWhen(createBucketOnRetry(resolvedBucketName)) .then(); } @@ -215,18 +229,20 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { @Override public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { + BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); + return Mono.using(content::openStream, stream -> clientPool.withPoolable(client -> Mono.fromFuture(() -> client.putObject( Throwing.<PutObjectRequest.Builder>consumer( - builder -> builder.bucket(bucketName.asString()).contentLength(content.size()).key(blobId.asString())) + builder -> builder.bucket(resolvedBucketName.asString()).contentLength(content.size()).key(blobId.asString())) .sneakyThrow(), AsyncRequestBody.fromPublisher( DataChunker.chunkStream(stream, CHUNK_SIZE))))).next(), Throwing.consumer(InputStream::close), LAZY) - .retryWhen(createBucketOnRetry(bucketName)) + .retryWhen(createBucketOnRetry(resolvedBucketName)) .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e)) .onErrorMap(SdkClientException.class, e -> new ObjectStoreIOException("Error saving blob", e)) .then(); @@ -244,8 +260,10 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { @Override public Mono<Void> delete(BucketName bucketName, BlobId blobId) { + BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); + return clientPool.withPoolable(client -> Mono.fromFuture(() -> - client.deleteObject(delete -> delete.bucket(bucketName.asString()).key(blobId.asString())))) + client.deleteObject(delete -> delete.bucket(resolvedBucketName.asString()).key(blobId.asString())))) .next() .then() .onErrorResume(NoSuchBucketException.class, e -> Mono.empty()); @@ -253,6 +271,12 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { @Override public Mono<Void> deleteBucket(BucketName bucketName) { + BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); + + return deleteResolvedBucket(resolvedBucketName); + } + + private Mono<Void> deleteResolvedBucket(BucketName bucketName) { return emptyBucket(bucketName) .onErrorResume(t -> Mono.just(bucketName)) .flatMap(ignore -> clientPool.withPoolable(client -> Mono.fromFuture(() -> @@ -287,7 +311,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { public Mono<Void> deleteAllBuckets() { return clientPool.withPoolable(client -> Mono.fromFuture(client::listBuckets) .flatMapIterable(ListBucketsResponse::buckets) - .flatMap(bucket -> deleteBucket(BucketName.of(bucket.name())))) + .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name())))) .then(); } } diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java index 5d41e9f..e426926 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java @@ -31,13 +31,18 @@ public class S3BlobStoreDAOTest implements BlobStoreDAOContract { @BeforeAll static void setUp(DockerAwsS3Container dockerAwsS3) { - AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder() + AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder() .endpoint(dockerAwsS3.getEndpoint()) .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID) .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY) .build(); - testee = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region()); + S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder() + .authConfiguration(authConfiguration) + .region(dockerAwsS3.dockerAwsS3().region()) + .build(); + + testee = new S3BlobStoreDAO(s3Configuration); } @AfterEach diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java index f2915aa..2dabaa9 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java @@ -37,13 +37,19 @@ class S3DeDuplicationBlobStoreTest implements BlobStoreContract { @BeforeAll static void setUpClass(DockerAwsS3Container dockerAwsS3) { - AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder() + AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder() .endpoint(dockerAwsS3.getEndpoint()) .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID) .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY) .build(); - s3BlobStoreDAO = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region()); + S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder() + .authConfiguration(authConfiguration) + .region(dockerAwsS3.dockerAwsS3().region()) + .build(); + + s3BlobStoreDAO = new S3BlobStoreDAO(s3Configuration); + testee = BlobStoreFactory.builder() .blobStoreDAO(s3BlobStoreDAO) .blobIdFactory(new HashBlobId.Factory()) diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3NamespaceTest.java similarity index 82% copy from server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java copy to server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3NamespaceTest.java index 6cd960b..3fdc0b3 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3NamespaceTest.java @@ -22,6 +22,7 @@ package org.apache.james.blob.objectstorage.aws; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.BlobStoreContract; +import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.HashBlobId; import org.apache.james.server.blob.deduplication.BlobStoreFactory; import org.junit.jupiter.api.AfterAll; @@ -30,24 +31,30 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(DockerAwsS3Extension.class) -class S3PassThroughBlobStoreTest implements BlobStoreContract { - +class S3NamespaceTest implements BlobStoreContract { private static BlobStore testee; private static S3BlobStoreDAO s3BlobStoreDAO; @BeforeAll static void setUpClass(DockerAwsS3Container dockerAwsS3) { - AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder() + AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder() .endpoint(dockerAwsS3.getEndpoint()) .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID) .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY) .build(); - s3BlobStoreDAO = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region()); + S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder() + .authConfiguration(authConfiguration) + .region(dockerAwsS3.dockerAwsS3().region()) + .defaultBucketName(BucketName.of("namespace")) + .build(); + + s3BlobStoreDAO = new S3BlobStoreDAO(s3Configuration); + testee = BlobStoreFactory.builder() .blobStoreDAO(s3BlobStoreDAO) .blobIdFactory(new HashBlobId.Factory()) - .defaultBucketName() + .bucket(BucketName.of("namespace")) .passthrough(); } @@ -70,5 +77,4 @@ class S3PassThroughBlobStoreTest implements BlobStoreContract { public BlobId.Factory blobIdFactory() { return new HashBlobId.Factory(); } - -} \ No newline at end of file +} diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java index 6cd960b..f535e44 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java @@ -37,13 +37,19 @@ class S3PassThroughBlobStoreTest implements BlobStoreContract { @BeforeAll static void setUpClass(DockerAwsS3Container dockerAwsS3) { - AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder() + AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder() .endpoint(dockerAwsS3.getEndpoint()) .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID) .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY) .build(); - s3BlobStoreDAO = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region()); + S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder() + .authConfiguration(authConfiguration) + .region(dockerAwsS3.dockerAwsS3().region()) + .build(); + + s3BlobStoreDAO = new S3BlobStoreDAO(s3Configuration); + testee = BlobStoreFactory.builder() .blobStoreDAO(s3BlobStoreDAO) .blobIdFactory(new HashBlobId.Factory()) diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixAndNamespaceTest.java similarity index 80% copy from server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java copy to server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixAndNamespaceTest.java index f2915aa..bde3309 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixAndNamespaceTest.java @@ -22,6 +22,7 @@ package org.apache.james.blob.objectstorage.aws; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.BlobStoreContract; +import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.HashBlobId; import org.apache.james.server.blob.deduplication.BlobStoreFactory; import org.junit.jupiter.api.AfterAll; @@ -30,24 +31,31 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(DockerAwsS3Extension.class) -class S3DeDuplicationBlobStoreTest implements BlobStoreContract { - +class S3PrefixAndNamespaceTest implements BlobStoreContract { private static BlobStore testee; private static S3BlobStoreDAO s3BlobStoreDAO; @BeforeAll static void setUpClass(DockerAwsS3Container dockerAwsS3) { - AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder() + AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder() .endpoint(dockerAwsS3.getEndpoint()) .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID) .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY) .build(); - s3BlobStoreDAO = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region()); + S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder() + .authConfiguration(authConfiguration) + .region(dockerAwsS3.dockerAwsS3().region()) + .defaultBucketName(BucketName.of("namespace")) + .bucketPrefix("prefix") + .build(); + + s3BlobStoreDAO = new S3BlobStoreDAO(s3Configuration); + testee = BlobStoreFactory.builder() .blobStoreDAO(s3BlobStoreDAO) .blobIdFactory(new HashBlobId.Factory()) - .defaultBucketName() + .bucket(BucketName.of("namespace")) .deduplication(); } @@ -70,5 +78,4 @@ class S3DeDuplicationBlobStoreTest implements BlobStoreContract { public BlobId.Factory blobIdFactory() { return new HashBlobId.Factory(); } - -} \ No newline at end of file +} diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixTest.java similarity index 85% copy from server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java copy to server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixTest.java index 6cd960b..b8bb257 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixTest.java @@ -30,20 +30,26 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(DockerAwsS3Extension.class) -class S3PassThroughBlobStoreTest implements BlobStoreContract { - +class S3PrefixTest implements BlobStoreContract { private static BlobStore testee; private static S3BlobStoreDAO s3BlobStoreDAO; @BeforeAll static void setUpClass(DockerAwsS3Container dockerAwsS3) { - AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder() + AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder() .endpoint(dockerAwsS3.getEndpoint()) .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID) .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY) .build(); - s3BlobStoreDAO = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region()); + S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder() + .authConfiguration(authConfiguration) + .region(dockerAwsS3.dockerAwsS3().region()) + .bucketPrefix("prefix") + .build(); + + s3BlobStoreDAO = new S3BlobStoreDAO(s3Configuration); + testee = BlobStoreFactory.builder() .blobStoreDAO(s3BlobStoreDAO) .blobIdFactory(new HashBlobId.Factory()) @@ -70,5 +76,4 @@ class S3PassThroughBlobStoreTest implements BlobStoreContract { public BlobId.Factory blobIdFactory() { return new HashBlobId.Factory(); } - -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
