Repository: james-project Updated Branches: refs/heads/master 829074f99 -> ab4171cfa
JAMES-2630 change blobs-api to use Mono Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ba4d902e Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ba4d902e Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ba4d902e Branch: refs/heads/master Commit: ba4d902ee3ff8b53999597067541cf8324030964 Parents: e745d95 Author: Matthieu Baechler <[email protected]> Authored: Mon Jan 28 17:18:14 2019 +0100 Committer: Matthieu Baechler <[email protected]> Committed: Thu Jan 31 18:33:30 2019 +0100 ---------------------------------------------------------------------- .../mail/CassandraAttachmentMapper.java | 8 +- .../cassandra/mail/CassandraMessageDAO.java | 6 +- .../mail/migration/AttachmentV2Migration.java | 2 +- .../mail/CassandraAttachmentFallbackTest.java | 6 +- .../migration/AttachmentV2MigrationTest.java | 14 +- metrics/metrics-api/pom.xml | 7 + .../apache/james/metrics/api/MetricFactory.java | 7 + .../org/apache/james/blob/api/BlobStore.java | 9 +- .../james/blob/api/MetricableBlobStore.java | 9 +- .../java/org/apache/james/blob/api/Store.java | 47 ++++--- .../james/blob/api/BlobStoreContract.java | 38 +++--- .../blob/api/MetricableBlobStoreContract.java | 18 +-- .../james/blob/cassandra/CassandraBlobsDAO.java | 29 ++-- .../blob/cassandra/CassandraBlobsDAOTest.java | 6 +- .../james/blob/memory/MemoryBlobStore.java | 17 +-- server/blob/blob-objectstorage/pom.xml | 4 + .../objectstorage/ObjectStorageBlobsDAO.java | 49 +++---- .../ObjectStorageBlobsDAOContract.java | 4 +- .../ObjectStorageBlobsDAOTest.java | 40 +++--- .../apache/james/blob/union/UnionBlobStore.java | 55 +++----- .../james/blob/union/UnionBlobStoreTest.java | 131 ++++++++++--------- .../james/blob/mail/MimeMessageStoreTest.java | 14 +- .../ObjectStorageDependenciesModule.java | 4 +- .../ObjectStorageBlobStoreModuleTest.java | 2 +- .../cassandra/CassandraMailRepository.java | 2 + ...ilRepositoryWithFakeImplementationsTest.java | 12 +- .../apache/james/queue/rabbitmq/Enqueuer.java | 2 +- .../apache/james/queue/rabbitmq/MailLoader.java | 2 +- .../cassandra/CassandraMailQueueBrowser.java | 2 +- .../cassandra/CassandraMailQueueMailDelete.java | 2 +- .../view/cassandra/CassandraMailQueueView.java | 2 +- 31 files changed, 264 insertions(+), 286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java index a7d9dfc..8606b06 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java @@ -78,7 +78,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper { } private Mono<Attachment> retrievePayload(DAOAttachment daoAttachment) { - return Mono.fromCompletionStage(blobStore.readBytes(daoAttachment.getBlobId()) + return Mono.fromCompletionStage(blobStore.readBytes(daoAttachment.getBlobId()).toFuture() .thenApply(daoAttachment::toAttachment)); } @@ -109,7 +109,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper { @Override public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException { ownerDAO.addOwner(attachment.getAttachmentId(), owner) - .then(Mono.fromFuture(blobStore.save(attachment.getBytes()))) + .then(blobStore.save(attachment.getBytes())) .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) .flatMap(attachmentDAOV2::storeAttachment) .block(); @@ -135,8 +135,8 @@ public class CassandraAttachmentMapper implements AttachmentMapper { } public Mono<Void> storeAttachmentAsync(Attachment attachment, MessageId ownerMessageId) { - return Mono.fromFuture(blobStore.save(attachment.getBytes()) - .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))) + return blobStore.save(attachment.getBytes()) + .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) .flatMap(daoAttachment -> storeAttachmentWithIndex(daoAttachment, ownerMessageId)); } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index dab5496..ab06089 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -184,8 +184,8 @@ public class CassandraMessageDAO { byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent()); byte[] bodyContent = IOUtils.toByteArray(message.getBodyContent()); - CompletableFuture<BlobId> bodyFuture = blobStore.save(bodyContent); - CompletableFuture<BlobId> headerFuture = blobStore.save(headerContent); + CompletableFuture<BlobId> bodyFuture = blobStore.save(bodyContent).toFuture(); + CompletableFuture<BlobId> headerFuture = blobStore.save(headerContent).toFuture(); return headerFuture.thenCombine(bodyFuture, Pair::of); } catch (IOException e) { @@ -364,7 +364,7 @@ public class CassandraMessageDAO { } private Mono<byte[]> getFieldContent(String field, Row row) { - return Mono.fromFuture(blobStore.readBytes(blobIdFactory.from(row.getString(field)))); + return blobStore.readBytes(blobIdFactory.from(row.getString(field))); } public static MessageResult notFound(ComposedMessageIdWithMetaData id) { http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java index bebf83d..6398342 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java @@ -59,7 +59,7 @@ public class AttachmentV2Migration implements Migration { private Result migrateAttachment(Attachment attachment) { try { - blobStore.save(attachment.getBytes()) + blobStore.save(attachment.getBytes()).toFuture() .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) .thenCompose(daoAttachement -> attachmentDAOV2.storeAttachment(daoAttachement).toFuture()) .thenCompose(any -> attachmentDAOV1.deleteAttachment(attachment.getAttachmentId())) http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java index 10492c6..1b98b9c 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java @@ -100,7 +100,7 @@ class CassandraAttachmentFallbackTest { .bytes("{\"property\":`\"different\"}".getBytes(StandardCharsets.UTF_8)) .build(); - BlobId blobId = blobsDAO.save(attachment.getBytes()).join(); + BlobId blobId = blobsDAO.save(attachment.getBytes()).block(); attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block(); attachmentDAO.storeAttachment(otherAttachment).join(); @@ -135,7 +135,7 @@ class CassandraAttachmentFallbackTest { .bytes("{\"property\":`\"different\"}".getBytes(StandardCharsets.UTF_8)) .build(); - BlobId blobId = blobsDAO.save(attachment.getBytes()).join(); + BlobId blobId = blobsDAO.save(attachment.getBytes()).block(); attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block(); attachmentDAO.storeAttachment(otherAttachment).join(); @@ -170,7 +170,7 @@ class CassandraAttachmentFallbackTest { .bytes("{\"property\":`\"different\"}".getBytes(StandardCharsets.UTF_8)) .build(); - BlobId blobId = blobsDAO.save(attachment.getBytes()).join(); + BlobId blobId = blobsDAO.save(attachment.getBytes()).block(); attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block(); attachmentDAO.storeAttachment(otherAttachment).join(); http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java index 85616b1..9ca7bff 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java @@ -113,9 +113,9 @@ class AttachmentV2MigrationTest { .contains(CassandraAttachmentDAOV2.from(attachment1, BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).blockOptional()) .contains(CassandraAttachmentDAOV2.from(attachment2, BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); - assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join()) + assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).block()) .isEqualTo(attachment1.getBytes()); - assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())).join()) + assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())).block()) .isEqualTo(attachment2.getBytes()); } @@ -170,9 +170,9 @@ class AttachmentV2MigrationTest { attachment1, attachment2)); when(blobsDAO.save(attachment1.getBytes())) - .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); + .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); when(blobsDAO.save(attachment2.getBytes())) - .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); + .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); when(attachmentDAOV2.storeAttachment(any())).thenThrow(new RuntimeException()); assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); @@ -189,9 +189,9 @@ class AttachmentV2MigrationTest { attachment1, attachment2)); when(blobsDAO.save(attachment1.getBytes())) - .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); + .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); when(blobsDAO.save(attachment2.getBytes())) - .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); + .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty()); when(attachmentDAO.deleteAttachment(any())).thenThrow(new RuntimeException()); @@ -209,7 +209,7 @@ class AttachmentV2MigrationTest { attachment1, attachment2)); when(blobsDAO.save(attachment1.getBytes())) - .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); + .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); when(blobsDAO.save(attachment2.getBytes())) .thenThrow(new RuntimeException()); when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty()); http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/metrics/metrics-api/pom.xml ---------------------------------------------------------------------- diff --git a/metrics/metrics-api/pom.xml b/metrics/metrics-api/pom.xml index e78daa0..314ec3d 100644 --- a/metrics/metrics-api/pom.xml +++ b/metrics/metrics-api/pom.xml @@ -29,4 +29,11 @@ <name>Apache James :: Metrics :: API</name> + <dependencies> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> + </dependencies> + </project> http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java ---------------------------------------------------------------------- diff --git a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java index e61064e..0b02371 100644 --- a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java +++ b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java @@ -22,6 +22,8 @@ package org.apache.james.metrics.api; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import reactor.core.publisher.Mono; + public interface MetricFactory { Metric generate(String name); @@ -37,6 +39,11 @@ public interface MetricFactory { } } + default <T> Mono<T> runPublishingTimerMetric(String name, Mono<T> mono) { + TimeMetric timer = timer(name); + return mono.doOnNext(ignored -> timer.stopAndPublish()); + } + default void runPublishingTimerMetric(String name, Runnable runnable) { runPublishingTimerMetric(name, () -> { runnable.run(); http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java index 55527da..8b68d93 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java @@ -19,15 +19,16 @@ package org.apache.james.blob.api; import java.io.InputStream; -import java.util.concurrent.CompletableFuture; + +import reactor.core.publisher.Mono; public interface BlobStore { - CompletableFuture<BlobId> save(byte[] data); + Mono<BlobId> save(byte[] data); - CompletableFuture<BlobId> save(InputStream data); + Mono<BlobId> save(InputStream data); - CompletableFuture<byte[]> readBytes(BlobId blobId); + Mono<byte[]> readBytes(BlobId blobId); InputStream read(BlobId blobId); } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java index 5e76835..4ed7b17 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java @@ -19,13 +19,14 @@ package org.apache.james.blob.api; import java.io.InputStream; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; import javax.inject.Named; import org.apache.james.metrics.api.MetricFactory; +import reactor.core.publisher.Mono; + public class MetricableBlobStore implements BlobStore { public static final String BLOB_STORE_IMPLEMENTATION = "blobStoreImplementation"; @@ -47,19 +48,19 @@ public class MetricableBlobStore implements BlobStore { } @Override - public CompletableFuture<BlobId> save(byte[] data) { + public Mono<BlobId> save(byte[] data) { return metricFactory .runPublishingTimerMetric(SAVE_BYTES_TIMER_NAME, blobStoreImpl.save(data)); } @Override - public CompletableFuture<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data) { return metricFactory .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(data)); } @Override - public CompletableFuture<byte[]> readBytes(BlobId blobId) { + public Mono<byte[]> readBytes(BlobId blobId) { return metricFactory .runPublishingTimerMetric(READ_BYTES_TIMER_NAME, blobStoreImpl.readBytes(blobId)); } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java index c270a9d..07a5611 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java @@ -20,20 +20,21 @@ package org.apache.james.blob.api; import java.io.InputStream; +import java.util.Collection; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; -import org.apache.james.util.FluentFutureStream; -import com.google.common.collect.ImmutableMap; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; public interface Store<T, I> { - CompletableFuture<I> save(T t); + Mono<I> save(T t); - CompletableFuture<T> read(I blobIds); + Mono<T> read(I blobIds); class BlobType { private final String name; @@ -85,30 +86,28 @@ public interface Store<T, I> { } @Override - public CompletableFuture<I> save(T t) { - return FluentFutureStream.of( - encoder.encode(t) - .map(this::saveEntry)) - .completableFuture() - .thenApply(pairStream -> pairStream.collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue))) - .thenApply(idFactory::generate); + public Mono<I> save(T t) { + return Flux.fromStream(encoder.encode(t)) + .flatMapSequential(this::saveEntry) + .collectMap(Tuple2::getT1, Tuple2::getT2) + .map(idFactory::generate); } - private CompletableFuture<Pair<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) { - return blobStore.save(entry.getRight()) - .thenApply(blobId -> Pair.of(entry.getLeft(), blobId)); + private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) { + return Mono.just(entry.getLeft()) + .zipWith(blobStore.save(entry.getRight())); } @Override - public CompletableFuture<T> read(I blobIds) { - CompletableFuture<Stream<Pair<BlobType, byte[]>>> binaries = FluentFutureStream.of(blobIds.asMap() - .entrySet() - .stream() - .map(entry -> blobStore.readBytes(entry.getValue()) - .thenApply(bytes -> Pair.of(entry.getKey(), bytes)))) - .completableFuture(); - - return binaries.thenApply(decoder::decode); + public Mono<T> read(I blobIds) { + return Flux.fromIterable(blobIds.asMap().entrySet()) + .flatMapSequential( + entry -> blobStore.readBytes(entry.getValue()) + .zipWith(Mono.just(entry.getKey()))) + .map(entry -> Pair.of(entry.getT2(), entry.getT1())) + .collectList() + .map(Collection::stream) + .map(decoder::decode); } } } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java index cfc441d..2164c74 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java @@ -43,37 +43,37 @@ public interface BlobStoreContract { @Test default void saveShouldThrowWhenNullData() { - assertThatThrownBy(() -> testee().save((byte[]) null)) + assertThatThrownBy(() -> testee().save((byte[]) null).block()) .isInstanceOf(NullPointerException.class); } @Test default void saveShouldThrowWhenNullInputStream() { - assertThatThrownBy(() -> testee().save((InputStream) null)) + assertThatThrownBy(() -> testee().save((InputStream) null).block()) .isInstanceOf(NullPointerException.class); } @Test default void saveShouldSaveEmptyData() { - BlobId blobId = testee().save(EMPTY_BYTEARRAY).join(); + BlobId blobId = testee().save(EMPTY_BYTEARRAY).block(); - byte[] bytes = testee().readBytes(blobId).join(); + byte[] bytes = testee().readBytes(blobId).block(); assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty(); } @Test default void saveShouldSaveEmptyInputStream() { - BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY)).join(); + BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY)).block(); - byte[] bytes = testee().readBytes(blobId).join(); + byte[] bytes = testee().readBytes(blobId).block(); assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty(); } @Test default void saveShouldReturnBlobId() { - BlobId blobId = testee().save(SHORT_BYTEARRAY).join(); + BlobId blobId = testee().save(SHORT_BYTEARRAY).block(); assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66")); } @@ -81,40 +81,40 @@ public interface BlobStoreContract { @Test default void saveShouldReturnBlobIdOfInputStream() { BlobId blobId = - testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY)).join(); + testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY)).block(); assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66")); } @Test default void readBytesShouldThrowWhenNoExisting() { - assertThatThrownBy(() -> testee().readBytes(blobIdFactory().from("unknown")).join()) - .hasCauseInstanceOf(ObjectStoreException.class); + assertThatThrownBy(() -> testee().readBytes(blobIdFactory().from("unknown")).block()) + .isExactlyInstanceOf(ObjectStoreException.class); } @Test default void readBytesShouldReturnSavedData() { - BlobId blobId = testee().save(SHORT_BYTEARRAY).join(); + BlobId blobId = testee().save(SHORT_BYTEARRAY).block(); - byte[] bytes = testee().readBytes(blobId).join(); + byte[] bytes = testee().readBytes(blobId).block(); assertThat(bytes).isEqualTo(SHORT_BYTEARRAY); } @Test default void readBytesShouldReturnLongSavedData() { - BlobId blobId = testee().save(ELEVEN_KILOBYTES).join(); + BlobId blobId = testee().save(ELEVEN_KILOBYTES).block(); - byte[] bytes = testee().readBytes(blobId).join(); + byte[] bytes = testee().readBytes(blobId).block(); assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES); } @Test default void readBytesShouldReturnBigSavedData() { - BlobId blobId = testee().save(TWELVE_MEGABYTES).join(); + BlobId blobId = testee().save(TWELVE_MEGABYTES).block(); - byte[] bytes = testee().readBytes(blobId).join(); + byte[] bytes = testee().readBytes(blobId).block(); assertThat(bytes).isEqualTo(TWELVE_MEGABYTES); } @@ -127,7 +127,7 @@ public interface BlobStoreContract { @Test default void readShouldReturnSavedData() { - BlobId blobId = testee().save(SHORT_BYTEARRAY).join(); + BlobId blobId = testee().save(SHORT_BYTEARRAY).block(); InputStream read = testee().read(blobId); @@ -136,7 +136,7 @@ public interface BlobStoreContract { @Test default void readShouldReturnLongSavedData() { - BlobId blobId = testee().save(ELEVEN_KILOBYTES).join(); + BlobId blobId = testee().save(ELEVEN_KILOBYTES).block(); InputStream read = testee().read(blobId); @@ -146,7 +146,7 @@ public interface BlobStoreContract { @Test default void readShouldReturnBigSavedData() { // 12 MB of text - BlobId blobId = testee().save(TWELVE_MEGABYTES).join(); + BlobId blobId = testee().save(TWELVE_MEGABYTES).block(); InputStream read = testee().read(blobId); http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java index f645589..c568764 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java @@ -79,30 +79,30 @@ public interface MetricableBlobStoreContract extends BlobStoreContract { @Test default void saveBytesShouldPublishSaveBytesTimerMetrics() { - testee().save(BYTES_CONTENT).join(); - testee().save(BYTES_CONTENT).join(); + testee().save(BYTES_CONTENT).block(); + testee().save(BYTES_CONTENT).block(); verify(metricsTestExtension.saveBytesTimeMetric, times(2)).stopAndPublish(); } @Test default void saveInputStreamShouldPublishSaveInputStreamTimerMetrics() { - testee().save(new ByteArrayInputStream(BYTES_CONTENT)).join(); - testee().save(new ByteArrayInputStream(BYTES_CONTENT)).join(); - testee().save(new ByteArrayInputStream(BYTES_CONTENT)).join(); + testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block(); + testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block(); + testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block(); verify(metricsTestExtension.saveInputStreamTimeMetric, times(3)).stopAndPublish(); } @Test default void readBytesShouldPublishReadBytesTimerMetrics() { - BlobId blobId = testee().save(BYTES_CONTENT).join(); - testee().readBytes(blobId).join(); - testee().readBytes(blobId).join(); + BlobId blobId = testee().save(BYTES_CONTENT).block(); + testee().readBytes(blobId).block(); + testee().readBytes(blobId).block(); verify(metricsTestExtension.readBytesTimeMetric, times(2)).stopAndPublish(); } @Test default void readShouldPublishReadTimerMetrics() { - BlobId blobId = testee().save(BYTES_CONTENT).join(); + BlobId blobId = testee().save(BYTES_CONTENT).block(); testee().read(blobId); testee().read(blobId); verify(metricsTestExtension.readTimeMetric, times(2)).stopAndPublish(); http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java index 01c6e8d..2dfc808 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java @@ -28,7 +28,6 @@ import java.io.InputStream; import java.io.PipedInputStream; import java.nio.ByteBuffer; import java.util.Comparator; -import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -49,7 +48,6 @@ import org.apache.james.blob.cassandra.utils.PipedStreamSubscriber; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; -import com.github.fge.lambdas.Throwing; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.primitives.Bytes; @@ -115,10 +113,10 @@ public class CassandraBlobsDAO implements BlobStore { } @Override - public CompletableFuture<BlobId> save(byte[] data) { + public Mono<BlobId> save(byte[] data) { Preconditions.checkNotNull(data); - return saveAsMono(data).toFuture(); + return saveAsMono(data); } private Mono<BlobId> saveAsMono(byte[] data) { @@ -164,17 +162,10 @@ public class CassandraBlobsDAO implements BlobStore { } @Override - public CompletableFuture<byte[]> readBytes(BlobId blobId) { - try { - return readBlobParts(blobId) - .collectList() - .map(parts -> Bytes.concat(parts.toArray(new byte[0][]))) - .toFuture(); - } catch (ObjectStoreException e) { - CompletableFuture<byte[]> error = new CompletableFuture<>(); - error.completeExceptionally(e); - return error; - } + public Mono<byte[]> readBytes(BlobId blobId) { + return readBlobParts(blobId) + .collectList() + .map(parts -> Bytes.concat(parts.toArray(new byte[0][]))); } private Mono<Integer> selectRowCount(BlobId blobId) { @@ -220,11 +211,9 @@ public class CassandraBlobsDAO implements BlobStore { } @Override - public CompletableFuture<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data) { Preconditions.checkNotNull(data); - return Mono.fromSupplier(Throwing.supplier(() -> IOUtils.toByteArray(data)).sneakyThrow()) - .publishOn(Schedulers.elastic()) - .flatMap(this::saveAsMono) - .toFuture(); + return Mono.fromCallable(() -> IOUtils.toByteArray(data)) + .flatMap(this::saveAsMono); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java index 86eeb86..51a9933 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java @@ -72,16 +72,16 @@ public class CassandraBlobsDAOTest implements MetricableBlobStoreContract { @Test void readBytesShouldReturnSplitSavedDataByChunk() { String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE); - BlobId blobId = testee.save(longString.getBytes(StandardCharsets.UTF_8)).join(); + BlobId blobId = testee.save(longString.getBytes(StandardCharsets.UTF_8)).block(); - byte[] bytes = testee.readBytes(blobId).join(); + byte[] bytes = testee.readBytes(blobId).block(); assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString); } @Test void blobStoreShouldSupport100MBBlob() { - BlobId blobId = testee.save(new ZeroedInputStream(100_000_000)).join(); + BlobId blobId = testee.save(new ZeroedInputStream(100_000_000)).block(); InputStream bytes = testee.read(blobId); assertThat(bytes).hasSameContentAs(new ZeroedInputStream(100_000_000)); } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java index c76b975..9f72fc4 100644 --- a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java +++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java @@ -23,16 +23,15 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.io.IOUtils; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.ObjectStoreException; -import org.apache.james.util.CompletableFutureUtil; import com.google.common.base.Preconditions; +import reactor.core.publisher.Mono; public class MemoryBlobStore implements BlobStore { private final ConcurrentHashMap<BlobId, byte[]> blobs; @@ -44,17 +43,17 @@ public class MemoryBlobStore implements BlobStore { } @Override - public CompletableFuture<BlobId> save(byte[] data) { + public Mono<BlobId> save(byte[] data) { Preconditions.checkNotNull(data); BlobId blobId = factory.forPayload(data); blobs.put(blobId, data); - return CompletableFuture.completedFuture(blobId); + return Mono.just(blobId); } @Override - public CompletableFuture<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data) { Preconditions.checkNotNull(data); try { byte[] bytes = IOUtils.toByteArray(data); @@ -65,12 +64,8 @@ public class MemoryBlobStore implements BlobStore { } @Override - public CompletableFuture<byte[]> readBytes(BlobId blobId) { - try { - return CompletableFuture.completedFuture(retrieveStoredValue(blobId)); - } catch (ObjectStoreException e) { - return CompletableFutureUtil.exceptionallyFuture(e); - } + public Mono<byte[]> readBytes(BlobId blobId) { + return Mono.fromCallable(() -> retrieveStoredValue(blobId)); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-objectstorage/pom.xml ---------------------------------------------------------------------- diff --git a/server/blob/blob-objectstorage/pom.xml b/server/blob/blob-objectstorage/pom.xml index 6a35af0..a27c88d 100644 --- a/server/blob/blob-objectstorage/pom.xml +++ b/server/blob/blob-objectstorage/pom.xml @@ -115,6 +115,10 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java index b51f431..7b02932 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java @@ -22,9 +22,6 @@ package org.apache.james.blob.objectstorage; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import org.apache.commons.io.IOUtils; import org.apache.james.blob.api.BlobId; @@ -33,7 +30,6 @@ import org.apache.james.blob.api.ObjectStoreException; import org.apache.james.blob.objectstorage.swift.SwiftKeystone2ObjectStorage; import org.apache.james.blob.objectstorage.swift.SwiftKeystone3ObjectStorage; import org.apache.james.blob.objectstorage.swift.SwiftTempAuthObjectStorage; -import org.apache.james.util.concurrent.NamedThreadFactory; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.options.CopyOptions; import org.jclouds.domain.Location; @@ -41,10 +37,10 @@ import org.jclouds.io.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; import com.google.common.hash.Hashing; import com.google.common.hash.HashingInputStream; +import reactor.core.publisher.Mono; public class ObjectStorageBlobsDAO implements BlobStore { private static final Location DEFAULT_LOCATION = null; @@ -56,7 +52,6 @@ public class ObjectStorageBlobsDAO implements BlobStore { private final ContainerName containerName; private final org.jclouds.blobstore.BlobStore blobStore; private final PayloadCodec payloadCodec; - private final Executor executor; ObjectStorageBlobsDAO(ContainerName containerName, BlobId.Factory blobIdFactory, org.jclouds.blobstore.BlobStore blobStore, PayloadCodec payloadCodec) { @@ -64,7 +59,6 @@ public class ObjectStorageBlobsDAO implements BlobStore { this.containerName = containerName; this.blobStore = blobStore; this.payloadCodec = payloadCodec; - this.executor = Executors.newCachedThreadPool(NamedThreadFactory.withClassName(getClass())); } public static ObjectStorageBlobsDAOBuilder.RequireContainerName builder(SwiftTempAuthObjectStorage.Configuration testConfig) { @@ -79,53 +73,48 @@ public class ObjectStorageBlobsDAO implements BlobStore { return SwiftKeystone3ObjectStorage.daoBuilder(testConfig); } - public CompletableFuture<ContainerName> createContainer(ContainerName name) { - return CompletableFuture.supplyAsync(() -> blobStore.createContainerInLocation(DEFAULT_LOCATION, name.value())) - .thenApply(created -> { - if (!created) { - LOGGER.debug("{} already existed", name); - } - return name; - }); + public Mono<ContainerName> createContainer(ContainerName name) { + return Mono.fromCallable(() -> blobStore.createContainerInLocation(DEFAULT_LOCATION, name.value())) + .filter(created -> created == false) + .doOnNext(ignored -> LOGGER.debug("{} already existed", name)) + .thenReturn(name); } @Override - public CompletableFuture<BlobId> save(byte[] data) { + public Mono<BlobId> save(byte[] data) { return save(new ByteArrayInputStream(data)); } @Override - public CompletableFuture<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data) { Preconditions.checkNotNull(data); BlobId tmpId = blobIdFactory.randomId(); return save(data, tmpId) - .thenCompose(id -> updateBlobId(tmpId, id)); + .flatMap(id -> updateBlobId(tmpId, id)); } - private CompletableFuture<BlobId> updateBlobId(BlobId from, BlobId to) { + private Mono<BlobId> updateBlobId(BlobId from, BlobId to) { String containerName = this.containerName.value(); - return CompletableFuture - .supplyAsync(() -> blobStore.copyBlob(containerName, from.asString(), containerName, to.asString(), CopyOptions.NONE), executor) - .thenAcceptAsync(any -> blobStore.removeBlob(containerName, from.asString())) - .thenApply(any -> to); + return Mono + .fromCallable(() -> blobStore.copyBlob(containerName, from.asString(), containerName, to.asString(), CopyOptions.NONE)) + .then(Mono.fromRunnable(() -> blobStore.removeBlob(containerName, from.asString()))) + .thenReturn(to); } - private CompletableFuture<BlobId> save(InputStream data, BlobId id) { + private Mono<BlobId> save(InputStream data, BlobId id) { String containerName = this.containerName.value(); HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data); Payload payload = payloadCodec.write(hashingInputStream); Blob blob = blobStore.blobBuilder(id.asString()).payload(payload).build(); - return CompletableFuture - .supplyAsync(() -> blobStore.putBlob(containerName, blob), executor) - .thenApply(any -> blobIdFactory.from(hashingInputStream.hash().toString())); + return Mono.fromCallable(() -> blobStore.putBlob(containerName, blob)) + .then(Mono.fromCallable(() -> blobIdFactory.from(hashingInputStream.hash().toString()))); } @Override - public CompletableFuture<byte[]> readBytes(BlobId blobId) { - return CompletableFuture - .supplyAsync(Throwing.supplier(() -> IOUtils.toByteArray(read(blobId))).sneakyThrow(), executor); + public Mono<byte[]> readBytes(BlobId blobId) { + return Mono.fromCallable(() -> IOUtils.toByteArray(read(blobId))); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOContract.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOContract.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOContract.java index e113cb5..409272d 100644 --- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOContract.java +++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOContract.java @@ -36,9 +36,9 @@ public interface ObjectStorageBlobsDAOContract { default void assertBlobsDAOCanStoreAndRetrieve(ObjectStorageBlobsDAOBuilder.ReadyToBuild builder) { ObjectStorageBlobsDAO dao = builder.build(); - dao.createContainer(containerName()); + dao.createContainer(containerName()).block(); - BlobId blobId = dao.save(BYTES).join(); + BlobId blobId = dao.save(BYTES).block(); InputStream inputStream = dao.read(blobId); assertThat(inputStream).hasSameContentAs(new ByteArrayInputStream(BYTES)); http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java index 78c6a8a..95c1fec 100644 --- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java +++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java @@ -26,7 +26,6 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import org.apache.commons.io.IOUtils; import org.apache.james.blob.api.BlobId; @@ -48,6 +47,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import com.google.common.base.Strings; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; @ExtendWith(DockerSwiftExtension.class) public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { @@ -85,7 +86,7 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { .blobIdFactory(blobIdFactory); blobStore = daoBuilder.getSupplier().get(); objectStorageBlobsDAO = daoBuilder.build(); - objectStorageBlobsDAO.createContainer(containerName); + objectStorageBlobsDAO.createContainer(containerName).block(); testee = new MetricableBlobStore(metricsTestExtension.getMetricFactory(), objectStorageBlobsDAO); } @@ -106,18 +107,18 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { } @Test - void createContainerShouldMakeTheContainerToExist() throws Exception { + void createContainerShouldMakeTheContainerToExist() { ContainerName containerName = ContainerName.of(UUID.randomUUID().toString()); - objectStorageBlobsDAO.createContainer(containerName).get(); + objectStorageBlobsDAO.createContainer(containerName).block(); assertThat(blobStore.containerExists(containerName.value())).isTrue(); } @Test - void createContainerShouldNotFailWithRuntimeExceptionWhenCreateContainerTwice() throws Exception { + void createContainerShouldNotFailWithRuntimeExceptionWhenCreateContainerTwice() { ContainerName containerName = ContainerName.of(UUID.randomUUID().toString()); - objectStorageBlobsDAO.createContainer(containerName).get(); - assertThatCode(() -> objectStorageBlobsDAO.createContainer(containerName).get()) + objectStorageBlobsDAO.createContainer(containerName).block(); + assertThatCode(() -> objectStorageBlobsDAO.createContainer(containerName).block()) .doesNotThrowAnyException(); } @@ -130,7 +131,7 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { .payloadCodec(new AESPayloadCodec(CRYPTO_CONFIG)) .build(); byte[] bytes = "James is the best!".getBytes(StandardCharsets.UTF_8); - BlobId blobId = encryptedDao.save(bytes).join(); + BlobId blobId = encryptedDao.save(bytes).block(); InputStream read = encryptedDao.read(blobId); assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes)); @@ -145,7 +146,7 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { .payloadCodec(new AESPayloadCodec(CRYPTO_CONFIG)) .build(); byte[] bytes = "James is the best!".getBytes(StandardCharsets.UTF_8); - BlobId blobId = encryptedDao.save(bytes).join(); + BlobId blobId = encryptedDao.save(bytes).block(); InputStream encryptedIs = testee.read(blobId); assertThat(encryptedIs).isNotNull(); @@ -166,24 +167,25 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract { @Test void saveBytesShouldNotCompleteWhenDoesNotAwait() { // String need to be big enough to get async thread busy hence could not return result instantly - CompletableFuture<BlobId> blobIdFuture = testee.save(BIG_STRING.getBytes(StandardCharsets.UTF_8)); - assertThat(blobIdFuture) - .isNotCompleted(); + Mono<BlobId> blobIdFuture = testee + .save(BIG_STRING.getBytes(StandardCharsets.UTF_8)) + .subscribeOn(Schedulers.elastic()); + assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @Test void saveInputStreamShouldNotCompleteWhenDoesNotAwait() { - CompletableFuture<BlobId> blobIdFuture = testee.save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8))); - assertThat(blobIdFuture) - .isNotCompleted(); + Mono<BlobId> blobIdFuture = testee + .save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8))) + .subscribeOn(Schedulers.elastic()); + assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @Test void readBytesShouldNotCompleteWhenDoesNotAwait() { - BlobId blobId = testee().save(BIG_STRING.getBytes(StandardCharsets.UTF_8)).join(); - CompletableFuture<byte[]> resultFuture = testee.readBytes(blobId); - assertThat(resultFuture) - .isNotCompleted(); + BlobId blobId = testee().save(BIG_STRING.getBytes(StandardCharsets.UTF_8)).block(); + Mono<byte[]> resultFuture = testee.readBytes(blobId).subscribeOn(Schedulers.elastic()); + assertThat(resultFuture.toFuture()).isNotCompleted(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java index c69367f..950fcf7 100644 --- a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java +++ b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.PushbackInputStream; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Supplier; @@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import reactor.core.publisher.Mono; public class UnionBlobStore implements BlobStore { @@ -80,11 +80,11 @@ public class UnionBlobStore implements BlobStore { } @Override - public CompletableFuture<BlobId> save(byte[] data) { + public Mono<BlobId> save(byte[] data) { try { return saveToCurrentFallbackIfFails( - currentBlobStore.save(data), - () -> legacyBlobStore.save(data)); + Mono.defer(() -> currentBlobStore.save(data)), + () -> Mono.defer(() -> legacyBlobStore.save(data))); } catch (Exception e) { LOGGER.error("exception directly happens while saving bytes data, fall back to legacy blob store", e); return legacyBlobStore.save(data); @@ -92,11 +92,11 @@ public class UnionBlobStore implements BlobStore { } @Override - public CompletableFuture<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data) { try { return saveToCurrentFallbackIfFails( - currentBlobStore.save(data), - () -> legacyBlobStore.save(data)); + Mono.defer(() -> currentBlobStore.save(data)), + () -> Mono.defer(() -> legacyBlobStore.save(data))); } catch (Exception e) { LOGGER.error("exception directly happens while saving InputStream data, fall back to legacy blob store", e); return legacyBlobStore.save(data); @@ -104,12 +104,12 @@ public class UnionBlobStore implements BlobStore { } @Override - public CompletableFuture<byte[]> readBytes(BlobId blobId) { + public Mono<byte[]> readBytes(BlobId blobId) { try { return readBytesFallBackIfFailsOrEmptyResult(blobId); } catch (Exception e) { LOGGER.error("exception directly happens while readBytes, fall back to legacy blob store", e); - return legacyBlobStore.readBytes(blobId); + return Mono.defer(() -> legacyBlobStore.readBytes(blobId)); } } @@ -141,39 +141,24 @@ public class UnionBlobStore implements BlobStore { return false; } - private CompletableFuture<byte[]> readBytesFallBackIfFailsOrEmptyResult(BlobId blobId) { - return currentBlobStore.readBytes(blobId) - .thenApply(Optional::ofNullable) - .exceptionally(this::logAndReturnEmptyOptional) - .thenCompose(maybeBytes -> readFromLegacyIfNeeded(maybeBytes, blobId)); + private Mono<byte[]> readBytesFallBackIfFailsOrEmptyResult(BlobId blobId) { + return Mono.defer(() -> currentBlobStore.readBytes(blobId)) + .onErrorResume(this::logAndReturnEmpty) + .switchIfEmpty(legacyBlobStore.readBytes(blobId)); } - private CompletableFuture<BlobId> saveToCurrentFallbackIfFails( - CompletableFuture<BlobId> currentSavingOperation, - Supplier<CompletableFuture<BlobId>> fallbackSavingOperationSupplier) { + private Mono<BlobId> saveToCurrentFallbackIfFails( + Mono<BlobId> currentSavingOperation, + Supplier<Mono<BlobId>> fallbackSavingOperationSupplier) { return currentSavingOperation - .thenApply(Optional::ofNullable) - .exceptionally(this::logAndReturnEmptyOptional) - .thenCompose(maybeBlobId -> saveToLegacyIfNeeded(maybeBlobId, fallbackSavingOperationSupplier)); + .onErrorResume(this::logAndReturnEmpty) + .switchIfEmpty(fallbackSavingOperationSupplier.get()); } - private <T> Optional<T> logAndReturnEmptyOptional(Throwable throwable) { + private <T> Mono<T> logAndReturnEmpty(Throwable throwable) { LOGGER.error("error happens from current blob store, fall back to legacy blob store", throwable); - return Optional.empty(); - } - - private CompletableFuture<BlobId> saveToLegacyIfNeeded(Optional<BlobId> maybeBlobId, - Supplier<CompletableFuture<BlobId>> saveToLegacySupplier) { - return maybeBlobId - .map(CompletableFuture::completedFuture) - .orElseGet(saveToLegacySupplier); - } - - private CompletableFuture<byte[]> readFromLegacyIfNeeded(Optional<byte[]> readFromCurrentResult, BlobId blodId) { - return readFromCurrentResult - .map(CompletableFuture::completedFuture) - .orElseGet(() -> legacyBlobStore.readBytes(blodId)); + return Mono.empty(); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java index b614ed7..db1a5b5 100644 --- a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java +++ b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java @@ -37,7 +37,6 @@ import org.apache.james.blob.api.BlobStoreContract; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.api.ObjectStoreException; import org.apache.james.blob.memory.MemoryBlobStore; -import org.apache.james.util.CompletableFutureUtil; import org.apache.james.util.StreamUtils; import org.assertj.core.api.SoftAssertions; import org.junit.jupiter.api.BeforeEach; @@ -50,23 +49,25 @@ import org.junit.jupiter.params.provider.MethodSource; import org.testcontainers.shaded.com.google.common.base.MoreObjects; import org.testcontainers.shaded.com.google.common.collect.ImmutableList; +import reactor.core.publisher.Mono; + class UnionBlobStoreTest implements BlobStoreContract { - private static class FutureThrowingBlobStore implements BlobStore { + private static class FailingBlobStore implements BlobStore { @Override - public CompletableFuture<BlobId> save(byte[] data) { - return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("broken everywhere")); + public Mono<BlobId> save(byte[] data) { + return Mono.error(new RuntimeException("broken everywhere")); } @Override - public CompletableFuture<BlobId> save(InputStream data) { - return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("broken everywhere")); + public Mono<BlobId> save(InputStream data) { + return Mono.error(new RuntimeException("broken everywhere")); } @Override - public CompletableFuture<byte[]> readBytes(BlobId blobId) { - return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("broken everywhere")); + public Mono<byte[]> readBytes(BlobId blobId) { + return Mono.error(new RuntimeException("broken everywhere")); } @Override @@ -84,17 +85,17 @@ class UnionBlobStoreTest implements BlobStoreContract { private static class ThrowingBlobStore implements BlobStore { @Override - public CompletableFuture<BlobId> save(byte[] data) { + public Mono<BlobId> save(byte[] data) { throw new RuntimeException("broken everywhere"); } @Override - public CompletableFuture<BlobId> save(InputStream data) { + public Mono<BlobId> save(InputStream data) { throw new RuntimeException("broken everywhere"); } @Override - public CompletableFuture<byte[]> readBytes(BlobId blobId) { + public Mono<byte[]> readBytes(BlobId blobId) { throw new RuntimeException("broken everywhere"); } @@ -141,13 +142,13 @@ class UnionBlobStoreTest implements BlobStoreContract { class CurrentSaveThrowsExceptionDirectly { @Test - void saveShouldFallBackToLegacyWhenCurrentGotException() throws Exception { + void saveShouldFallBackToLegacyWhenCurrentGotException() { MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); UnionBlobStore unionBlobStore = UnionBlobStore.builder() .current(new ThrowingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get(); + BlobId blobId = unionBlobStore.save(BLOB_CONTENT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(unionBlobStore.read(blobId)) @@ -158,13 +159,13 @@ class UnionBlobStoreTest implements BlobStoreContract { } @Test - void saveInputStreamShouldFallBackToLegacyWhenCurrentGotException() throws Exception { + void saveInputStreamShouldFallBackToLegacyWhenCurrentGotException() { MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); UnionBlobStore unionBlobStore = UnionBlobStore.builder() .current(new ThrowingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(unionBlobStore.read(blobId)) @@ -179,13 +180,13 @@ class UnionBlobStoreTest implements BlobStoreContract { class CurrentSaveCompletesExceptionally { @Test - void saveShouldFallBackToLegacyWhenCurrentCompletedExceptionally() throws Exception { + void saveShouldFallBackToLegacyWhenCurrentCompletedExceptionally() { MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); UnionBlobStore unionBlobStore = UnionBlobStore.builder() - .current(new FutureThrowingBlobStore()) + .current(new FailingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get(); + BlobId blobId = unionBlobStore.save(BLOB_CONTENT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(unionBlobStore.read(blobId)) @@ -196,13 +197,13 @@ class UnionBlobStoreTest implements BlobStoreContract { } @Test - void saveInputStreamShouldFallBackToLegacyWhenCurrentCompletedExceptionally() throws Exception { + void saveInputStreamShouldFallBackToLegacyWhenCurrentCompletedExceptionally() { MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); UnionBlobStore unionBlobStore = UnionBlobStore.builder() - .current(new FutureThrowingBlobStore()) + .current(new FailingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(unionBlobStore.read(blobId)) @@ -218,29 +219,29 @@ class UnionBlobStoreTest implements BlobStoreContract { class CurrentReadThrowsExceptionDirectly { @Test - void readShouldReturnFallbackToLegacyWhenCurrentGotException() throws Exception { + void readShouldReturnFallbackToLegacyWhenCurrentGotException() { MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); UnionBlobStore unionBlobStore = UnionBlobStore.builder() .current(new ThrowingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block(); assertThat(unionBlobStore.read(blobId)) .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); } @Test - void readBytesShouldReturnFallbackToLegacyWhenCurrentGotException() throws Exception { + void readBytesShouldReturnFallbackToLegacyWhenCurrentGotException() { MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); UnionBlobStore unionBlobStore = UnionBlobStore.builder() .current(new ThrowingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block(); - assertThat(unionBlobStore.readBytes(blobId).get()) + assertThat(unionBlobStore.readBytes(blobId).block()) .isEqualTo(BLOB_CONTENT); } @@ -250,28 +251,28 @@ class UnionBlobStoreTest implements BlobStoreContract { class CurrentReadCompletesExceptionally { @Test - void readShouldReturnFallbackToLegacyWhenCurrentCompletedExceptionally() throws Exception { + void readShouldReturnFallbackToLegacyWhenCurrentCompletedExceptionally() { MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); UnionBlobStore unionBlobStore = UnionBlobStore.builder() - .current(new FutureThrowingBlobStore()) + .current(new FailingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block(); assertThat(unionBlobStore.read(blobId)) .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); } @Test - void readBytesShouldReturnFallbackToLegacyWhenCurrentCompletedExceptionally() throws Exception { + void readBytesShouldReturnFallbackToLegacyWhenCurrentCompletedExceptionally() { MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); UnionBlobStore unionBlobStore = UnionBlobStore.builder() - .current(new FutureThrowingBlobStore()) + .current(new FailingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block(); - assertThat(unionBlobStore.readBytes(blobId).get()) + assertThat(unionBlobStore.readBytes(blobId).block()) .isEqualTo(BLOB_CONTENT); } } @@ -281,7 +282,7 @@ class UnionBlobStoreTest implements BlobStoreContract { class CurrentAndLegacyCouldNotComplete { - Stream<Function<UnionBlobStore, CompletableFuture<?>>> blobStoreOperationsReturnFutures() { + Stream<Function<UnionBlobStore, Mono<?>>> blobStoreOperationsReturnFutures() { return Stream.of( blobStore -> blobStore.save(BLOB_CONTENT), blobStore -> blobStore.save(new ByteArrayInputStream(BLOB_CONTENT)), @@ -297,15 +298,15 @@ class UnionBlobStoreTest implements BlobStoreContract { List<UnionBlobStore> futureThrowingUnionBlobStores = ImmutableList.of( UnionBlobStore.builder() .current(new ThrowingBlobStore()) - .legacy(new FutureThrowingBlobStore()) + .legacy(new FailingBlobStore()) .build(), UnionBlobStore.builder() - .current(new FutureThrowingBlobStore()) + .current(new FailingBlobStore()) .legacy(new ThrowingBlobStore()) .build(), UnionBlobStore.builder() - .current(new FutureThrowingBlobStore()) - .legacy(new FutureThrowingBlobStore()) + .current(new FailingBlobStore()) + .legacy(new FailingBlobStore()) .build()); return blobStoreOperationsReturnFutures() @@ -338,74 +339,74 @@ class UnionBlobStoreTest implements BlobStoreContract { @ParameterizedTest @MethodSource("blobStoresCauseReturnExceptionallyFutures") void operationShouldReturnExceptionallyFuture(UnionBlobStore blobStoreReturnsExceptionallyFuture, - Function<UnionBlobStore, CompletableFuture<?>> blobStoreOperation) { - assertThat(blobStoreOperation.apply(blobStoreReturnsExceptionallyFuture)) - .isCompletedExceptionally(); + Function<UnionBlobStore, Mono<?>> blobStoreOperation) { + Mono<?> mono = blobStoreOperation.apply(blobStoreReturnsExceptionallyFuture); + assertThatThrownBy(mono::block).isInstanceOf(RuntimeException.class); } } @Test - void readShouldReturnFromCurrentWhenAvailable() throws Exception { - BlobId blobId = currentBlobStore.save(BLOB_CONTENT).get(); + void readShouldReturnFromCurrentWhenAvailable() { + BlobId blobId = currentBlobStore.save(BLOB_CONTENT).block(); assertThat(unionBlobStore.read(blobId)) .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); } @Test - void readShouldReturnFromLegacyWhenCurrentNotAvailable() throws Exception { - BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + void readShouldReturnFromLegacyWhenCurrentNotAvailable() { + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block(); assertThat(unionBlobStore.read(blobId)) .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); } @Test - void readBytesShouldReturnFromCurrentWhenAvailable() throws Exception { - BlobId blobId = currentBlobStore.save(BLOB_CONTENT).get(); + void readBytesShouldReturnFromCurrentWhenAvailable() { + BlobId blobId = currentBlobStore.save(BLOB_CONTENT).block(); - assertThat(unionBlobStore.readBytes(blobId).get()) + assertThat(unionBlobStore.readBytes(blobId).block()) .isEqualTo(BLOB_CONTENT); } @Test - void readBytesShouldReturnFromLegacyWhenCurrentNotAvailable() throws Exception { - BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + void readBytesShouldReturnFromLegacyWhenCurrentNotAvailable() { + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block(); - assertThat(unionBlobStore.readBytes(blobId).get()) + assertThat(unionBlobStore.readBytes(blobId).block()) .isEqualTo(BLOB_CONTENT); } @Test - void saveShouldWriteToCurrent() throws Exception { - BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get(); + void saveShouldWriteToCurrent() { + BlobId blobId = unionBlobStore.save(BLOB_CONTENT).block(); - assertThat(currentBlobStore.readBytes(blobId).get()) + assertThat(currentBlobStore.readBytes(blobId).block()) .isEqualTo(BLOB_CONTENT); } @Test - void saveShouldNotWriteToLegacy() throws Exception { - BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get(); + void saveShouldNotWriteToLegacy() { + BlobId blobId = unionBlobStore.save(BLOB_CONTENT).block(); - assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).join()) - .hasCauseInstanceOf(ObjectStoreException.class); + assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).block()) + .isInstanceOf(ObjectStoreException.class); } @Test - void saveInputStreamShouldWriteToCurrent() throws Exception { - BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + void saveInputStreamShouldWriteToCurrent() { + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block(); - assertThat(currentBlobStore.readBytes(blobId).get()) + assertThat(currentBlobStore.readBytes(blobId).block()) .isEqualTo(BLOB_CONTENT); } @Test - void saveInputStreamShouldNotWriteToLegacy() throws Exception { - BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + void saveInputStreamShouldNotWriteToLegacy() { + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block(); - assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).join()) - .isNotInstanceOf(ObjectStoreException.class); + assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).block()) + .isInstanceOf(ObjectStoreException.class); } @Test http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java ---------------------------------------------------------------------- diff --git a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java index d8de475..4bec4a4 100644 --- a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java +++ b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java @@ -70,9 +70,9 @@ class MimeMessageStoreTest { .setText("Important mail content") .build(); - MimeMessagePartsId parts = testee.save(message).join(); + MimeMessagePartsId parts = testee.save(message).block(); - MimeMessage retrievedMessage = testee.read(parts).join(); + MimeMessage retrievedMessage = testee.read(parts).block(); assertThat(MimeMessageUtil.asString(retrievedMessage)) .isEqualTo(MimeMessageUtil.asString(message)); @@ -86,9 +86,9 @@ class MimeMessageStoreTest { .setSubject("Important Mail") .build(); - MimeMessagePartsId parts = testee.save(message).join(); + MimeMessagePartsId parts = testee.save(message).block(); - MimeMessage retrievedMessage = testee.read(parts).join(); + MimeMessage retrievedMessage = testee.read(parts).block(); assertThat(MimeMessageUtil.asString(retrievedMessage)) .isEqualTo(MimeMessageUtil.asString(message)); @@ -105,14 +105,14 @@ class MimeMessageStoreTest { .setText("Important mail content") .build(); - MimeMessagePartsId parts = testee.save(message).join(); + MimeMessagePartsId parts = testee.save(message).block(); SoftAssertions.assertSoftly( softly -> { BlobId headerBlobId = parts.getHeaderBlobId(); BlobId bodyBlobId = parts.getBodyBlobId(); - softly.assertThat(new String(blobStore.readBytes(headerBlobId).join(), StandardCharsets.UTF_8)) + softly.assertThat(new String(blobStore.readBytes(headerBlobId).block(), StandardCharsets.UTF_8)) .isEqualTo("Date: Thu, 6 Sep 2018 13:29:13 +0700 (ICT)\r\n" + "From: [email protected]\r\n" + "To: [email protected]\r\n" + @@ -121,7 +121,7 @@ class MimeMessageStoreTest { "MIME-Version: 1.0\r\n" + "Content-Type: text/plain; charset=UTF-8\r\n" + "Content-Transfer-Encoding: 7bit\r\n\r\n"); - softly.assertThat(new String(blobStore.readBytes(bodyBlobId).join(), StandardCharsets.UTF_8)) + softly.assertThat(new String(blobStore.readBytes(bodyBlobId).block(), StandardCharsets.UTF_8)) .isEqualTo("Important mail content"); }); } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java b/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java index 0aeb26d..8cb907a 100644 --- a/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java +++ b/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java @@ -20,8 +20,8 @@ package org.apache.james.modules.objectstorage; import java.io.FileNotFoundException; +import java.time.Duration; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.inject.Singleton; @@ -70,7 +70,7 @@ public class ObjectStorageDependenciesModule extends AbstractModule { .blobIdFactory(blobIdFactory) .payloadCodec(configuration.getPayloadCodec()) .build(); - dao.createContainer(configuration.getNamespace()).get(1, TimeUnit.MINUTES); + dao.createContainer(configuration.getNamespace()).block(Duration.ofMinutes(1)); return dao; } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/ObjectStorageBlobStoreModuleTest.java ---------------------------------------------------------------------- diff --git a/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/ObjectStorageBlobStoreModuleTest.java b/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/ObjectStorageBlobStoreModuleTest.java index 692c2b7..3678f22 100644 --- a/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/ObjectStorageBlobStoreModuleTest.java +++ b/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/ObjectStorageBlobStoreModuleTest.java @@ -123,7 +123,7 @@ class ObjectStorageBlobStoreModuleTest { .with(binder -> binder.bind(ObjectStorageBlobConfiguration.class).toInstance(configuration))); ObjectStorageBlobsDAO dao = injector.getInstance(ObjectStorageBlobsDAO.class); - dao.createContainer(configuration.getNamespace()); + dao.createContainer(configuration.getNamespace()).block(); BlobStore blobStore = injector.getInstance(Key.get(BlobStore.class, Names.named(MetricableBlobStore.BLOB_STORE_IMPLEMENTATION))); http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index 6f005e8..893035a 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -61,6 +61,7 @@ public class CassandraMailRepository implements MailRepository { MailKey mailKey = MailKey.forMail(mail); Mono.fromFuture(mimeMessageStore.save(mail.getMessage()) + .toFuture() .thenCompose(Throwing.function(parts -> mailDAO.store(url, mail, parts.getHeaderBlobId(), parts.getBodyBlobId())))) @@ -101,6 +102,7 @@ public class CassandraMailRepository implements MailRepository { .build(); return mimeMessageStore.read(parts) + .toFuture() .thenApply(mimeMessage -> mailDTO.getMailBuilder() .mimeMessage(mimeMessage) .build()); http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java index b39ed2d..683919a 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java @@ -95,17 +95,13 @@ class CassandraMailRepositoryWithFakeImplementationsTest { class FailingStore implements Store<MimeMessage, MimeMessagePartsId> { @Override - public CompletableFuture<MimeMessagePartsId> save(MimeMessage mimeMessage) { - return CompletableFuture.supplyAsync(() -> { - throw new RuntimeException("Expected failure while saving"); - }); + public Mono<MimeMessagePartsId> save(MimeMessage mimeMessage) { + return Mono.error(new RuntimeException("Expected failure while saving")); } @Override - public CompletableFuture<MimeMessage> read(MimeMessagePartsId blobIds) { - return CompletableFuture.supplyAsync(() -> { - throw new RuntimeException("Expected failure while reading"); - }); + public Mono<MimeMessage> read(MimeMessagePartsId blobIds) { + return Mono.error(new RuntimeException("Expected failure while reading")); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index 7a93fe0..349c878 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -69,7 +69,7 @@ class Enqueuer { private CompletableFuture<MimeMessagePartsId> saveMail(Mail mail) throws MailQueue.MailQueueException { try { - return mimeMessageStore.save(mail.getMessage()); + return mimeMessageStore.save(mail.getMessage()).toFuture(); } catch (MessagingException e) { throw new MailQueue.MailQueueException("Error while saving blob", e); } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java index 0016702..f69c533 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java @@ -46,7 +46,7 @@ class MailLoader { .headerBlobId(blobIdFactory.from(dto.getHeaderBlobId())) .bodyBlobId(blobIdFactory.from(dto.getBodyBlobId())) .build()) - .join(); + .block(); return dto.toMailWithMimeMessage(mimeMessage); } catch (AddressException e) { http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java index 4279c15..29da254 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -115,7 +115,7 @@ public class CassandraMailQueueBrowser { private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) { EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem(); - return Mono.fromCompletionStage(mimeMessageStore.read(enqueuedItem.getPartsId())) + return mimeMessageStore.read(enqueuedItem.getPartsId()) .map(mimeMessage -> toMail(enqueuedItem, mimeMessage)); } http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java index 57732cb..a97999a 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java @@ -59,7 +59,7 @@ public class CassandraMailQueueMailDelete { Mono<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) { return deletedMailsDao .markAsDeleted(mailQueueName, mailKey) - .doOnTerminate(() -> maybeUpdateBrowseStart(mailQueueName)); + .doOnNext(ignored -> maybeUpdateBrowseStart(mailQueueName)); } Mono<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) { http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index 211917a..f318e5e 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -117,7 +117,7 @@ public class CassandraMailQueueView implements MailQueueView { .filter(mailReference -> deleteCondition.shouldBeDeleted(mailReference.getMail())) .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName)) .count() - .doOnTerminate(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)) + .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)) .block(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
