This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e60349cd55484f181bb4b7382c3c044c9db244c3 Author: Matthieu Baechler <[email protected]> AuthorDate: Mon Jan 20 10:33:29 2020 +0100 JAMES-3028 S3 DumbBlobStore Implementation --- .../objectstorage/aws/DockerAwsS3Container.java | 3 +- server/blob/blob-s3/pom.xml | 95 ++++++++ .../objectstorage/aws/AwsS3AuthConfiguration.java | 123 ++++++++++ .../james/blob/objectstorage/aws/Region.java | 66 +++++ .../james/blob/objectstorage/aws/S3BlobStore.java | 130 ++++++++++ .../blob/objectstorage/aws/S3DumbBlobStore.java | 268 +++++++++++++++++++++ .../aws/AwsS3AuthConfigurationTest.java | 120 +++++++++ .../james/blob/objectstorage/aws/DockerAwsS3.java | 35 +++ .../objectstorage/aws/DockerAwsS3Container.java | 41 ++-- .../objectstorage/aws/DockerAwsS3Extension.java | 53 ++++ .../james/blob/objectstorage/aws/RegionTest.java | 40 +++ .../blob/objectstorage/aws/S3BlobStoreTest.java | 66 +++++ .../objectstorage/aws/S3DumbBlobStoreTest.java | 55 +++++ server/blob/pom.xml | 1 + 14 files changed, 1075 insertions(+), 21 deletions(-) diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Container.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Container.java index e8b1072..edf5ec0 100644 --- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Container.java +++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Container.java @@ -25,10 +25,11 @@ import org.apache.james.blob.objectstorage.DockerAwsS3; import org.apache.james.util.Host; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.ImageFromDockerfile; public class DockerAwsS3Container { - private static final String AWS_S3_DOCKER_IMAGE = "linagora/cloudserver:mem-8.1.17"; + private static final String AWS_S3_DOCKER_IMAGE = "zenko/cloudserver:8.2.3"; private static final int AWS_S3_PORT = 8000; private static final int ONE_TIME = 1; diff --git a/server/blob/blob-s3/pom.xml b/server/blob/blob-s3/pom.xml new file mode 100644 index 0000000..6c9dc6d --- /dev/null +++ b/server/blob/blob-s3/pom.xml @@ -0,0 +1,95 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <properties> + <s3-sdk.version>2.10.41</s3-sdk.version> + </properties> + + <parent> + <artifactId>james-server-blob</artifactId> + <groupId>org.apache.james</groupId> + <version>3.5.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>blob-s3</artifactId> + <packaging>jar</packaging> + + <name>Apache James :: Server :: Blob :: S3</name> + + <dependencies> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-api</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-api</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-core</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-util</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>testing-base</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>javax.annotation</groupId> + <artifactId>javax.annotation-api</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.addons</groupId> + <artifactId>reactor-extra</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>netty-nio-client</artifactId> + <version>${s3-sdk.version}</version> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + <version>${s3-sdk.version}</version> + </dependency> + </dependencies> +</project> diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3AuthConfiguration.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3AuthConfiguration.java new file mode 100644 index 0000000..f0a8d06 --- /dev/null +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3AuthConfiguration.java @@ -0,0 +1,123 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.objectstorage.aws; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +public class AwsS3AuthConfiguration { + + public static Builder.RequiredEndpoint builder() { + return endpoint -> accessKeyId -> secretKey -> new Builder.ReadyToBuild(endpoint, accessKeyId, secretKey); + } + + public interface Builder { + + @FunctionalInterface + interface RequiredEndpoint { + RequiredAccessKeyId endpoint(String endpoint); + } + + @FunctionalInterface + interface RequiredAccessKeyId { + RequiredSecretKey accessKeyId(String accessKeyId); + } + + @FunctionalInterface + interface RequiredSecretKey { + ReadyToBuild secretKey(String secretKey); + } + + class ReadyToBuild { + private final String endpoint; + private final String accessKeyId; + private final String secretKey; + + public ReadyToBuild(String endpoint, String accessKeyId, String secretKey) { + this.endpoint = endpoint; + this.accessKeyId = accessKeyId; + this.secretKey = secretKey; + } + + public AwsS3AuthConfiguration build() { + Preconditions.checkNotNull(endpoint, "'endpoint' is mandatory"); + Preconditions.checkArgument(!endpoint.isEmpty(), "'endpoint' is mandatory"); + + Preconditions.checkNotNull(accessKeyId, "'accessKeyId' is mandatory"); + Preconditions.checkArgument(!accessKeyId.isEmpty(), "'accessKeyId' is mandatory"); + + Preconditions.checkNotNull(secretKey, "'secretKey' is mandatory"); + Preconditions.checkArgument(!secretKey.isEmpty(), "'secretKey' is mandatory"); + + return new AwsS3AuthConfiguration(endpoint, accessKeyId, secretKey); + } + } + } + + private final String endpoint; + private final String accessKeyId; + private final String secretKey; + + private AwsS3AuthConfiguration(String endpoint, + String accessKeyId, + String secretKey) { + this.endpoint = endpoint; + this.accessKeyId = accessKeyId; + this.secretKey = secretKey; + } + + public String getEndpoint() { + return endpoint; + } + + public String getAccessKeyId() { + return accessKeyId; + } + + public String getSecretKey() { + return secretKey; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof AwsS3AuthConfiguration) { + AwsS3AuthConfiguration that = (AwsS3AuthConfiguration) o; + return Objects.equal(endpoint, that.endpoint) && + Objects.equal(accessKeyId, that.accessKeyId) && + Objects.equal(secretKey, that.secretKey); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hashCode(endpoint, accessKeyId, secretKey); + } + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this) + .add("endpoint", endpoint) + .add("accessKeyId", accessKeyId) + .add("secretKey", secretKey) + .toString(); + } +} diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/Region.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/Region.java new file mode 100644 index 0000000..2ce0cc9 --- /dev/null +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/Region.java @@ -0,0 +1,66 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.objectstorage.aws; + +import java.util.Objects; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; + +public final class Region { + + private final String region; + + public static Region of(String region) { + Preconditions.checkNotNull(region); + + return new Region(region); + } + + private Region(String region) { + this.region = region; + } + + public software.amazon.awssdk.regions.Region asAws() { + return software.amazon.awssdk.regions.Region.of(region); + } + + + @Override + public boolean equals(Object o) { + if (o instanceof Region) { + Region that = (Region) o; + return Objects.equals(this.region, that.region); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(region); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("region", region) + .toString(); + } +} diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStore.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStore.java new file mode 100644 index 0000000..1eba115 --- /dev/null +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStore.java @@ -0,0 +1,130 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.objectstorage.aws; + +import java.io.IOException; +import java.io.InputStream; + +import javax.inject.Inject; + +import org.apache.commons.io.IOUtils; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BucketName; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.hash.Hashing; +import com.google.common.hash.HashingInputStream; +import com.google.common.io.FileBackedOutputStream; + +import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; + +public class S3BlobStore implements BlobStore { + + private static final boolean LAZY_RESOURCE_CLEANUP = false; + private static final int FILE_THRESHOLD = 1024 * 100; + private final S3DumbBlobStore dumbBlobStore; + private final BlobId.Factory blobIdFactory; + private final BucketName defaultBucket; + + @Inject + @VisibleForTesting + S3BlobStore(S3DumbBlobStore dumbBlobStore, BlobId.Factory blobIdFactory, BucketName defaultBucket) { + this.dumbBlobStore = dumbBlobStore; + this.blobIdFactory = blobIdFactory; + this.defaultBucket = defaultBucket; + } + + @Override + public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) { + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(data); + + BlobId blobId = blobIdFactory.forPayload(data); + + return dumbBlobStore.save(bucketName, blobId, data) + .then(Mono.just(blobId)); + } + + @Override + public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(data); + HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data); + return Mono.using( + () -> new FileBackedOutputStream(FILE_THRESHOLD), + fileBackedOutputStream -> saveAndGenerateBlobId(bucketName, hashingInputStream, fileBackedOutputStream), + fileBackedOutputStream -> { + try { + fileBackedOutputStream.reset(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + LAZY_RESOURCE_CLEANUP); + } + + private Mono<BlobId> saveAndGenerateBlobId(BucketName bucketName, HashingInputStream hashingInputStream, FileBackedOutputStream fileBackedOutputStream) { + return Mono + .fromCallable(() -> { + IOUtils.copy(hashingInputStream, fileBackedOutputStream); + return Tuples.of(blobIdFactory.from(hashingInputStream.hash().toString()), fileBackedOutputStream.asByteSource()); + }) + .flatMap(tuple -> dumbBlobStore.save(bucketName, tuple.getT1(), tuple.getT2()).thenReturn(tuple.getT1())); + } + + + @Override + public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { + Preconditions.checkNotNull(bucketName); + + return dumbBlobStore.readBytes(bucketName, blobId); + } + + @Override + public InputStream read(BucketName bucketName, BlobId blobId) { + Preconditions.checkNotNull(bucketName); + + return dumbBlobStore.read(bucketName, blobId); + } + + @Override + public Mono<Void> deleteBucket(BucketName bucketName) { + Preconditions.checkNotNull(bucketName); + + return dumbBlobStore.deleteBucket(bucketName); + } + + @Override + public BucketName getDefaultBucketName() { + return defaultBucket; + } + + @Override + public Mono<Void> delete(BucketName bucketName, BlobId blobId) { + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(blobId); + + return dumbBlobStore.delete(bucketName, blobId); + } + +} diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java new file mode 100644 index 0000000..d8e9deb --- /dev/null +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStore.java @@ -0,0 +1,268 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.objectstorage.aws; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import javax.annotation.PreDestroy; +import javax.inject.Inject; + +import org.apache.commons.io.IOUtils; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.DumbBlobStore; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.blob.api.ObjectStoreIOException; +import org.apache.james.util.DataChunker; +import org.apache.james.util.ReactorUtils; + +import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; +import com.google.common.io.FileBackedOutputStream; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.retry.Retry; +import reactor.retry.RetryWithAsyncCallback; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.BytesWrapper; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListBucketsResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Object; + +public class S3DumbBlobStore implements DumbBlobStore, Closeable { + + private static final int CHUNK_SIZE = 1024 * 1024; + private static final int EMPTY_BUCKET_BATCH_SIZE = 1000; + private static final int FILE_THRESHOLD = 1024 * 100; + private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100); + private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); + private static final boolean LAZY = false; + private static final int MAX_RETRIES = 5; + + private final S3AsyncClient client; + + @Inject + S3DumbBlobStore(AwsS3AuthConfiguration configuration, Region region) { + client = S3AsyncClient.builder() + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create(configuration.getAccessKeyId(), configuration.getSecretKey()))) + .httpClientBuilder(NettyNioAsyncHttpClient.builder() + .maxConcurrency(100) + .maxPendingConnectionAcquires(10_000)) + .endpointOverride(URI.create(configuration.getEndpoint())) + .region(region.asAws()) + .build(); + } + + @Override + @PreDestroy + public void close() { + client.close(); + } + + @Override + public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + return getObject(bucketName, blobId) + .map(response -> ReactorUtils.toInputStream(response.flux)) + .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + bucketName, e)) + .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + bucketName, e)) + .block(); + } + + private static class FluxResponse { + final CompletableFuture<FluxResponse> cf = new CompletableFuture<>(); + GetObjectResponse sdkResponse; + Flux<ByteBuffer> flux; + } + + private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) { + return Mono.fromFuture(() -> + client.getObject( + builder -> builder.bucket(bucketName.asString()).key(blobId.asString()), + new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() { + + FluxResponse response; + + @Override + public CompletableFuture<FluxResponse> prepare() { + response = new FluxResponse(); + return response.cf; + } + + @Override + public void onResponse(GetObjectResponse response) { + this.response.sdkResponse = response; + } + + @Override + public void exceptionOccurred(Throwable error) { + this.response.cf.completeExceptionally(error); + } + + @Override + public void onStream(SdkPublisher<ByteBuffer> publisher) { + response.flux = Flux.from(publisher); + response.cf.complete(response); + } + })); + } + + + @Override + public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { + return Mono.fromFuture(() -> + client.getObject( + builder -> builder.bucket(bucketName.asString()).key(blobId.asString()), + AsyncResponseTransformer.toBytes())) + .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + bucketName, e)) + .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + bucketName, e)) + .map(BytesWrapper::asByteArray); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { + return Mono.fromFuture(() -> + client.putObject( + builder -> builder.bucket(bucketName.asString()).key(blobId.asString()).contentLength((long) data.length), + AsyncRequestBody.fromBytes(data))) + .retryWhen(createBucketOnRetry(bucketName)) + .then(); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) { + Preconditions.checkNotNull(inputStream); + + return uploadUsingFile(bucketName, blobId, inputStream); + } + + private Mono<Void> uploadUsingFile(BucketName bucketName, BlobId blobId, InputStream inputStream) { + return Mono.using( + () -> new FileBackedOutputStream(FILE_THRESHOLD), + fileBackedOutputStream -> + Mono.fromCallable(() -> IOUtils.copy(inputStream, fileBackedOutputStream)) + .flatMap(ignore -> save(bucketName, blobId, fileBackedOutputStream.asByteSource())), + Throwing.consumer(FileBackedOutputStream::reset), + LAZY) + .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e)); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { + return Mono.using(content::openStream, + stream -> + Mono.fromFuture(() -> + client.putObject( + Throwing.<PutObjectRequest.Builder>consumer( + builder -> builder.bucket(bucketName.asString()).contentLength(content.size()).key(blobId.asString())) + .sneakyThrow(), + AsyncRequestBody.fromPublisher( + DataChunker.chunkStream(stream, CHUNK_SIZE)))), + Throwing.consumer(InputStream::close), + LAZY) + .retryWhen(createBucketOnRetry(bucketName)) + .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e)) + .onErrorMap(SdkClientException.class, e -> new ObjectStoreIOException("Error saving blob", e)) + .then(); + } + + private Retry<Object> createBucketOnRetry(BucketName bucketName) { + return RetryWithAsyncCallback.onlyIf(retryContext -> retryContext.exception() instanceof NoSuchBucketException) + .exponentialBackoff(FIRST_BACK_OFF, FOREVER) + .withBackoffScheduler(Schedulers.elastic()) + .retryMax(MAX_RETRIES) + .onRetryWithMono(retryContext -> Mono + .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString()))) + .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty())); + } + + @Override + public Mono<Void> delete(BucketName bucketName, BlobId blobId) { + return Mono.fromFuture(() -> + client.deleteObject(delete -> delete.bucket(bucketName.asString()).key(blobId.asString()))) + .then() + .onErrorResume(NoSuchBucketException.class, e -> Mono.empty()); + } + + @Override + public Mono<Void> deleteBucket(BucketName bucketName) { + return emptyBucket(bucketName) + .onErrorResume(t -> Mono.just(bucketName)) + .flatMap(ignore -> Mono.fromFuture(() -> client.deleteBucket(builder -> builder.bucket(bucketName.asString())))) + .onErrorResume(t -> Mono.empty()) + .then(); + } + + private Mono<BucketName> emptyBucket(BucketName bucketName) { + return Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString()))) + .flatMapIterable(ListObjectsResponse::contents) + .window(EMPTY_BUCKET_BATCH_SIZE) + .flatMap(this::buildListForBatch) + .flatMap(identifiers -> deleteObjects(bucketName, identifiers)) + .then(Mono.just(bucketName)); + } + + private Mono<List<ObjectIdentifier>> buildListForBatch(Flux<S3Object> batch) { + return batch + .map(element -> ObjectIdentifier.builder().key(element.key()).build()) + .collect(Guavate.toImmutableList()); + } + + private Mono<DeleteObjectsResponse> deleteObjects(BucketName bucketName, List<ObjectIdentifier> identifiers) { + return Mono.fromFuture(() -> client.deleteObjects(builder -> + builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers)))); + } + + @VisibleForTesting + public Mono<Void> deleteAllBuckets() { + return Mono.fromFuture(client::listBuckets) + .flatMapIterable(ListBucketsResponse::buckets) + .flatMap(bucket -> deleteBucket(BucketName.of(bucket.name()))) + .then(); + } + + +} diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3AuthConfigurationTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3AuthConfigurationTest.java new file mode 100644 index 0000000..d4da020 --- /dev/null +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3AuthConfigurationTest.java @@ -0,0 +1,120 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.objectstorage.aws; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.SoftAssertions.assertSoftly; + +import org.apache.james.blob.objectstorage.aws.AwsS3AuthConfiguration; +import org.junit.jupiter.api.Test; + +import nl.jqno.equalsverifier.EqualsVerifier; + +public class AwsS3AuthConfigurationTest { + + @Test + public void credentialsShouldRespectBeanContract() { + EqualsVerifier.forClass(AwsS3AuthConfiguration.class).verify(); + } + + @Test + public void builderShouldThrowWhenEndpointIsNull() { + assertThatThrownBy(() -> AwsS3AuthConfiguration.builder() + .endpoint(null) + .accessKeyId("myAccessKeyId") + .secretKey("mySecretKey") + .build()) + .isInstanceOf(NullPointerException.class) + .hasMessage("'endpoint' is mandatory"); + } + + @Test + public void builderShouldThrowWhenEndpointIsEmpty() { + assertThatThrownBy(() -> AwsS3AuthConfiguration.builder() + .endpoint("") + .accessKeyId("myAccessKeyId") + .secretKey("mySecretKey") + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'endpoint' is mandatory"); + } + + @Test + public void builderShouldThrowWhenAccessKeyIdIsNull() { + assertThatThrownBy(() -> AwsS3AuthConfiguration.builder() + .endpoint("myEndpoint") + .accessKeyId(null) + .secretKey("mySecretKey") + .build()) + .isInstanceOf(NullPointerException.class) + .hasMessage("'accessKeyId' is mandatory"); + } + + @Test + public void builderShouldThrowWhenAccessKeyIdIsEmpty() { + assertThatThrownBy(() -> AwsS3AuthConfiguration.builder() + .endpoint("myEndpoint") + .accessKeyId("") + .secretKey("mySecretKey") + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'accessKeyId' is mandatory"); + } + + @Test + public void builderShouldThrowWhenSecretKeyIsNull() { + assertThatThrownBy(() -> AwsS3AuthConfiguration.builder() + .endpoint("myEndpoint") + .accessKeyId("myAccessKeyId") + .secretKey(null) + .build()) + .isInstanceOf(NullPointerException.class) + .hasMessage("'secretKey' is mandatory"); + } + + @Test + public void builderShouldThrowWhenSecretKeyIsEmpty() { + assertThatThrownBy(() -> AwsS3AuthConfiguration.builder() + .endpoint("myEndpoint") + .accessKeyId("myAccessKeyId") + .secretKey("") + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'secretKey' is mandatory"); + } + + @Test + public void builderShouldWork() { + String endpoint = "myEndpoint"; + String accessKeyId = "myAccessKeyId"; + String secretKey = "mySecretKey"; + AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder() + .endpoint(endpoint) + .accessKeyId(accessKeyId) + .secretKey(secretKey) + .build(); + + assertSoftly(softly -> { + softly.assertThat(configuration.getEndpoint()).isEqualTo(endpoint); + softly.assertThat(configuration.getAccessKeyId()).isEqualTo(accessKeyId); + softly.assertThat(configuration.getSecretKey()).isEqualTo(secretKey); + }); + } +} diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3.java new file mode 100644 index 0000000..98202df --- /dev/null +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3.java @@ -0,0 +1,35 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.objectstorage.aws; + +import java.net.URI; + +public class DockerAwsS3 { + private final URI awsS3Endpoint; + + public DockerAwsS3(URI awsS3Endpoint) { + this.awsS3Endpoint = awsS3Endpoint; + } + + public URI awsS3Endpoint() { + return awsS3Endpoint; + } +} + diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Container.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Container.java similarity index 56% copy from server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Container.java copy to server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Container.java index e8b1072..ff566d2 100644 --- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Container.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Container.java @@ -1,34 +1,35 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ package org.apache.james.blob.objectstorage.aws; import java.net.URI; -import org.apache.james.blob.objectstorage.DockerAwsS3; import org.apache.james.util.Host; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; public class DockerAwsS3Container { - private static final String AWS_S3_DOCKER_IMAGE = "linagora/cloudserver:mem-8.1.17"; + public static final Region REGION = Region.of(software.amazon.awssdk.regions.Region.EU_WEST_1.id()); + + private static final String AWS_S3_DOCKER_IMAGE = "zenko/cloudserver:8.2.3"; private static final int AWS_S3_PORT = 8000; private static final int ONE_TIME = 1; diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Extension.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Extension.java new file mode 100644 index 0000000..424b718 --- /dev/null +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/DockerAwsS3Extension.java @@ -0,0 +1,53 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.objectstorage.aws; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +public class DockerAwsS3Extension implements ParameterResolver, BeforeAllCallback, AfterAllCallback { + + private DockerAwsS3Container awsS3Container; + + @Override + public void beforeAll(ExtensionContext context) { + awsS3Container = new DockerAwsS3Container(); + awsS3Container.start(); + } + + @Override + public void afterAll(ExtensionContext context) { + awsS3Container.stop(); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return (parameterContext.getParameter().getType() == DockerAwsS3Container.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return awsS3Container; + } +} diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/RegionTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/RegionTest.java new file mode 100644 index 0000000..82532de --- /dev/null +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/RegionTest.java @@ -0,0 +1,40 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.objectstorage.aws; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +import nl.jqno.equalsverifier.EqualsVerifier; + +class RegionTest { + + @Test + void shouldNotAcceptNullRegion() { + assertThatThrownBy(() -> Region.of(null)).isInstanceOf(NullPointerException.class); + } + + @Test + void shouldRespectBeanContract() { + EqualsVerifier.forClass(Region.class).verify(); + } + +} \ No newline at end of file diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreTest.java new file mode 100644 index 0000000..aea81d4 --- /dev/null +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreTest.java @@ -0,0 +1,66 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.objectstorage.aws; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreContract; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.HashBlobId; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(DockerAwsS3Extension.class) +class S3BlobStoreTest implements BlobStoreContract { + + private S3BlobStore testee; + private S3DumbBlobStore s3DumbBlobStore; + + @BeforeEach + void setUp(DockerAwsS3Container dockerAwsS3) { + + AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder() + .endpoint(dockerAwsS3.getEndpoint()) + .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID) + .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY) + .build(); + + s3DumbBlobStore = new S3DumbBlobStore(configuration, DockerAwsS3Container.REGION); + testee = new S3BlobStore(s3DumbBlobStore, new HashBlobId.Factory(), BucketName.DEFAULT); + } + + @AfterEach + void tearDown() { + s3DumbBlobStore.deleteAllBuckets().block(); + s3DumbBlobStore.close(); + } + + @Override + public BlobStore testee() { + return testee; + } + + @Override + public BlobId.Factory blobIdFactory() { + return new HashBlobId.Factory(); + } + +} \ No newline at end of file diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStoreTest.java new file mode 100644 index 0000000..93ea6af --- /dev/null +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DumbBlobStoreTest.java @@ -0,0 +1,55 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.blob.objectstorage.aws; + +import org.apache.james.blob.api.DumbBlobStore; +import org.apache.james.blob.api.DumbBlobStoreContract; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(DockerAwsS3Extension.class) +public class S3DumbBlobStoreTest implements DumbBlobStoreContract { + + private S3DumbBlobStore testee; + + @BeforeEach + void setUp(DockerAwsS3Container dockerAwsS3) { + + AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder() + .endpoint(dockerAwsS3.getEndpoint()) + .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID) + .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY) + .build(); + + testee = new S3DumbBlobStore(configuration, DockerAwsS3Container.REGION); + } + + @AfterEach + void tearDown() { + testee.deleteAllBuckets().block(); + testee.close(); + } + + @Override + public DumbBlobStore testee() { + return testee; + } + +} diff --git a/server/blob/pom.xml b/server/blob/pom.xml index 14e37f3..d10760c 100644 --- a/server/blob/pom.xml +++ b/server/blob/pom.xml @@ -42,6 +42,7 @@ <module>blob-gc</module> <module>blob-memory</module> <module>blob-objectstorage</module> + <module>blob-s3</module> <module>mail-store</module> </modules> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
