Repository: james-project
Updated Branches:
  refs/heads/master 829074f99 -> ab4171cfa


JAMES-2630 change blobs-api to use Mono


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ba4d902e
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ba4d902e
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ba4d902e

Branch: refs/heads/master
Commit: ba4d902ee3ff8b53999597067541cf8324030964
Parents: e745d95
Author: Matthieu Baechler <[email protected]>
Authored: Mon Jan 28 17:18:14 2019 +0100
Committer: Matthieu Baechler <[email protected]>
Committed: Thu Jan 31 18:33:30 2019 +0100

----------------------------------------------------------------------
 .../mail/CassandraAttachmentMapper.java         |   8 +-
 .../cassandra/mail/CassandraMessageDAO.java     |   6 +-
 .../mail/migration/AttachmentV2Migration.java   |   2 +-
 .../mail/CassandraAttachmentFallbackTest.java   |   6 +-
 .../migration/AttachmentV2MigrationTest.java    |  14 +-
 metrics/metrics-api/pom.xml                     |   7 +
 .../apache/james/metrics/api/MetricFactory.java |   7 +
 .../org/apache/james/blob/api/BlobStore.java    |   9 +-
 .../james/blob/api/MetricableBlobStore.java     |   9 +-
 .../java/org/apache/james/blob/api/Store.java   |  47 ++++---
 .../james/blob/api/BlobStoreContract.java       |  38 +++---
 .../blob/api/MetricableBlobStoreContract.java   |  18 +--
 .../james/blob/cassandra/CassandraBlobsDAO.java |  29 ++--
 .../blob/cassandra/CassandraBlobsDAOTest.java   |   6 +-
 .../james/blob/memory/MemoryBlobStore.java      |  17 +--
 server/blob/blob-objectstorage/pom.xml          |   4 +
 .../objectstorage/ObjectStorageBlobsDAO.java    |  49 +++----
 .../ObjectStorageBlobsDAOContract.java          |   4 +-
 .../ObjectStorageBlobsDAOTest.java              |  40 +++---
 .../apache/james/blob/union/UnionBlobStore.java |  55 +++-----
 .../james/blob/union/UnionBlobStoreTest.java    | 131 ++++++++++---------
 .../james/blob/mail/MimeMessageStoreTest.java   |  14 +-
 .../ObjectStorageDependenciesModule.java        |   4 +-
 .../ObjectStorageBlobStoreModuleTest.java       |   2 +-
 .../cassandra/CassandraMailRepository.java      |   2 +
 ...ilRepositoryWithFakeImplementationsTest.java |  12 +-
 .../apache/james/queue/rabbitmq/Enqueuer.java   |   2 +-
 .../apache/james/queue/rabbitmq/MailLoader.java |   2 +-
 .../cassandra/CassandraMailQueueBrowser.java    |   2 +-
 .../cassandra/CassandraMailQueueMailDelete.java |   2 +-
 .../view/cassandra/CassandraMailQueueView.java  |   2 +-
 31 files changed, 264 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index a7d9dfc..8606b06 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -78,7 +78,7 @@ public class CassandraAttachmentMapper implements 
AttachmentMapper {
     }
 
     private Mono<Attachment> retrievePayload(DAOAttachment daoAttachment) {
-        return 
Mono.fromCompletionStage(blobStore.readBytes(daoAttachment.getBlobId())
+        return 
Mono.fromCompletionStage(blobStore.readBytes(daoAttachment.getBlobId()).toFuture()
             .thenApply(daoAttachment::toAttachment));
     }
 
@@ -109,7 +109,7 @@ public class CassandraAttachmentMapper implements 
AttachmentMapper {
     @Override
     public void storeAttachmentForOwner(Attachment attachment, Username owner) 
throws MailboxException {
         ownerDAO.addOwner(attachment.getAttachmentId(), owner)
-            .then(Mono.fromFuture(blobStore.save(attachment.getBytes())))
+            .then(blobStore.save(attachment.getBytes()))
             .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
             .flatMap(attachmentDAOV2::storeAttachment)
             .block();
@@ -135,8 +135,8 @@ public class CassandraAttachmentMapper implements 
AttachmentMapper {
     }
 
     public Mono<Void> storeAttachmentAsync(Attachment attachment, MessageId 
ownerMessageId) {
-        return Mono.fromFuture(blobStore.save(attachment.getBytes())
-            .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, 
blobId)))
+        return blobStore.save(attachment.getBytes())
+            .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
             .flatMap(daoAttachment -> storeAttachmentWithIndex(daoAttachment, 
ownerMessageId));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index dab5496..ab06089 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -184,8 +184,8 @@ public class CassandraMessageDAO {
             byte[] headerContent = 
IOUtils.toByteArray(message.getHeaderContent());
             byte[] bodyContent = IOUtils.toByteArray(message.getBodyContent());
 
-            CompletableFuture<BlobId> bodyFuture = blobStore.save(bodyContent);
-            CompletableFuture<BlobId> headerFuture = 
blobStore.save(headerContent);
+            CompletableFuture<BlobId> bodyFuture = 
blobStore.save(bodyContent).toFuture();
+            CompletableFuture<BlobId> headerFuture = 
blobStore.save(headerContent).toFuture();
 
             return headerFuture.thenCombine(bodyFuture, Pair::of);
         } catch (IOException e) {
@@ -364,7 +364,7 @@ public class CassandraMessageDAO {
     }
 
     private Mono<byte[]> getFieldContent(String field, Row row) {
-        return 
Mono.fromFuture(blobStore.readBytes(blobIdFactory.from(row.getString(field))));
+        return blobStore.readBytes(blobIdFactory.from(row.getString(field)));
     }
 
     public static MessageResult notFound(ComposedMessageIdWithMetaData id) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
index bebf83d..6398342 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
@@ -59,7 +59,7 @@ public class AttachmentV2Migration implements Migration {
 
     private Result migrateAttachment(Attachment attachment) {
         try {
-            blobStore.save(attachment.getBytes())
+            blobStore.save(attachment.getBytes()).toFuture()
                 .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, 
blobId))
                 .thenCompose(daoAttachement -> 
attachmentDAOV2.storeAttachment(daoAttachement).toFuture())
                 .thenCompose(any -> 
attachmentDAOV1.deleteAttachment(attachment.getAttachmentId()))

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
index 10492c6..1b98b9c 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
@@ -100,7 +100,7 @@ class CassandraAttachmentFallbackTest {
             
.bytes("{\"property\":`\"different\"}".getBytes(StandardCharsets.UTF_8))
             .build();
 
-        BlobId blobId = blobsDAO.save(attachment.getBytes()).join();
+        BlobId blobId = blobsDAO.save(attachment.getBytes()).block();
         
attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, 
blobId)).block();
         attachmentDAO.storeAttachment(otherAttachment).join();
 
@@ -135,7 +135,7 @@ class CassandraAttachmentFallbackTest {
             
.bytes("{\"property\":`\"different\"}".getBytes(StandardCharsets.UTF_8))
             .build();
 
-        BlobId blobId = blobsDAO.save(attachment.getBytes()).join();
+        BlobId blobId = blobsDAO.save(attachment.getBytes()).block();
         
attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, 
blobId)).block();
         attachmentDAO.storeAttachment(otherAttachment).join();
 
@@ -170,7 +170,7 @@ class CassandraAttachmentFallbackTest {
             
.bytes("{\"property\":`\"different\"}".getBytes(StandardCharsets.UTF_8))
             .build();
 
-        BlobId blobId = blobsDAO.save(attachment.getBytes()).join();
+        BlobId blobId = blobsDAO.save(attachment.getBytes()).block();
         
attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, 
blobId)).block();
         attachmentDAO.storeAttachment(otherAttachment).join();
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
index 85616b1..9ca7bff 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
@@ -113,9 +113,9 @@ class AttachmentV2MigrationTest {
             .contains(CassandraAttachmentDAOV2.from(attachment1, 
BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
         
assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).blockOptional())
             .contains(CassandraAttachmentDAOV2.from(attachment2, 
BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
-        
assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join())
+        
assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).block())
             .isEqualTo(attachment1.getBytes());
-        
assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())).join())
+        
assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())).block())
             .isEqualTo(attachment2.getBytes());
     }
 
@@ -170,9 +170,9 @@ class AttachmentV2MigrationTest {
             attachment1,
             attachment2));
         when(blobsDAO.save(attachment1.getBytes()))
-            
.thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
+            
.thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
         when(blobsDAO.save(attachment2.getBytes()))
-            
.thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
+            
.thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
         when(attachmentDAOV2.storeAttachment(any())).thenThrow(new 
RuntimeException());
 
         assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
@@ -189,9 +189,9 @@ class AttachmentV2MigrationTest {
             attachment1,
             attachment2));
         when(blobsDAO.save(attachment1.getBytes()))
-            
.thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
+            
.thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
         when(blobsDAO.save(attachment2.getBytes()))
-            
.thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
+            
.thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
         when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty());
         when(attachmentDAO.deleteAttachment(any())).thenThrow(new 
RuntimeException());
 
@@ -209,7 +209,7 @@ class AttachmentV2MigrationTest {
             attachment1,
             attachment2));
         when(blobsDAO.save(attachment1.getBytes()))
-            
.thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
+            
.thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
         when(blobsDAO.save(attachment2.getBytes()))
             .thenThrow(new RuntimeException());
         when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty());

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/metrics/metrics-api/pom.xml
----------------------------------------------------------------------
diff --git a/metrics/metrics-api/pom.xml b/metrics/metrics-api/pom.xml
index e78daa0..314ec3d 100644
--- a/metrics/metrics-api/pom.xml
+++ b/metrics/metrics-api/pom.xml
@@ -29,4 +29,11 @@
 
     <name>Apache James :: Metrics :: API</name>
 
+    <dependencies>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+    </dependencies>
+
 </project>

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
----------------------------------------------------------------------
diff --git 
a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
 
b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
index e61064e..0b02371 100644
--- 
a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
+++ 
b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
@@ -22,6 +22,8 @@ package org.apache.james.metrics.api;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
+import reactor.core.publisher.Mono;
+
 public interface MetricFactory {
 
     Metric generate(String name);
@@ -37,6 +39,11 @@ public interface MetricFactory {
         }
     }
 
+    default <T> Mono<T> runPublishingTimerMetric(String name, Mono<T> mono) {
+        TimeMetric timer = timer(name);
+        return mono.doOnNext(ignored -> timer.stopAndPublish());
+    }
+
     default void runPublishingTimerMetric(String name, Runnable runnable) {
         runPublishingTimerMetric(name, () -> {
             runnable.run();

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index 55527da..8b68d93 100644
--- 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -19,15 +19,16 @@
 package org.apache.james.blob.api;
 
 import java.io.InputStream;
-import java.util.concurrent.CompletableFuture;
+
+import reactor.core.publisher.Mono;
 
 public interface BlobStore {
 
-    CompletableFuture<BlobId> save(byte[] data);
+    Mono<BlobId> save(byte[] data);
 
-    CompletableFuture<BlobId> save(InputStream data);
+    Mono<BlobId> save(InputStream data);
 
-    CompletableFuture<byte[]> readBytes(BlobId blobId);
+    Mono<byte[]> readBytes(BlobId blobId);
 
     InputStream read(BlobId blobId);
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
index 5e76835..4ed7b17 100644
--- 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
+++ 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
@@ -19,13 +19,14 @@
 package org.apache.james.blob.api;
 
 import java.io.InputStream;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 import javax.inject.Named;
 
 import org.apache.james.metrics.api.MetricFactory;
 
+import reactor.core.publisher.Mono;
+
 public class MetricableBlobStore implements BlobStore {
 
     public static final String BLOB_STORE_IMPLEMENTATION = 
"blobStoreImplementation";
@@ -47,19 +48,19 @@ public class MetricableBlobStore implements BlobStore {
     }
 
     @Override
-    public CompletableFuture<BlobId> save(byte[] data) {
+    public Mono<BlobId> save(byte[] data) {
         return metricFactory
             .runPublishingTimerMetric(SAVE_BYTES_TIMER_NAME, 
blobStoreImpl.save(data));
     }
 
     @Override
-    public CompletableFuture<BlobId> save(InputStream data) {
+    public Mono<BlobId> save(InputStream data) {
         return metricFactory
             .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, 
blobStoreImpl.save(data));
     }
 
     @Override
-    public CompletableFuture<byte[]> readBytes(BlobId blobId) {
+    public Mono<byte[]> readBytes(BlobId blobId) {
         return metricFactory
             .runPublishingTimerMetric(READ_BYTES_TIMER_NAME, 
blobStoreImpl.readBytes(blobId));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
index c270a9d..07a5611 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
@@ -20,20 +20,21 @@
 package org.apache.james.blob.api;
 
 import java.io.InputStream;
+import java.util.Collection;
 import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.util.FluentFutureStream;
 
-import com.google.common.collect.ImmutableMap;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
 
 public interface Store<T, I> {
 
-    CompletableFuture<I> save(T t);
+    Mono<I> save(T t);
 
-    CompletableFuture<T> read(I blobIds);
+    Mono<T> read(I blobIds);
 
     class BlobType {
         private final String name;
@@ -85,30 +86,28 @@ public interface Store<T, I> {
         }
 
         @Override
-        public CompletableFuture<I> save(T t) {
-            return FluentFutureStream.of(
-                encoder.encode(t)
-                    .map(this::saveEntry))
-                .completableFuture()
-                .thenApply(pairStream -> 
pairStream.collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue)))
-                .thenApply(idFactory::generate);
+        public Mono<I> save(T t) {
+            return Flux.fromStream(encoder.encode(t))
+                .flatMapSequential(this::saveEntry)
+                .collectMap(Tuple2::getT1, Tuple2::getT2)
+                .map(idFactory::generate);
         }
 
-        private CompletableFuture<Pair<BlobType, BlobId>> 
saveEntry(Pair<BlobType, InputStream> entry) {
-            return blobStore.save(entry.getRight())
-                .thenApply(blobId -> Pair.of(entry.getLeft(), blobId));
+        private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, 
InputStream> entry) {
+            return Mono.just(entry.getLeft())
+                .zipWith(blobStore.save(entry.getRight()));
         }
 
         @Override
-        public CompletableFuture<T> read(I blobIds) {
-            CompletableFuture<Stream<Pair<BlobType, byte[]>>> binaries = 
FluentFutureStream.of(blobIds.asMap()
-                .entrySet()
-                .stream()
-                .map(entry -> blobStore.readBytes(entry.getValue())
-                    .thenApply(bytes -> Pair.of(entry.getKey(), bytes))))
-                .completableFuture();
-
-            return binaries.thenApply(decoder::decode);
+        public Mono<T> read(I blobIds) {
+            return Flux.fromIterable(blobIds.asMap().entrySet())
+                .flatMapSequential(
+                    entry -> blobStore.readBytes(entry.getValue())
+                        .zipWith(Mono.just(entry.getKey())))
+                .map(entry -> Pair.of(entry.getT2(), entry.getT1()))
+                .collectList()
+                .map(Collection::stream)
+                .map(decoder::decode);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
index cfc441d..2164c74 100644
--- 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
@@ -43,37 +43,37 @@ public interface BlobStoreContract {
 
     @Test
     default void saveShouldThrowWhenNullData() {
-        assertThatThrownBy(() -> testee().save((byte[]) null))
+        assertThatThrownBy(() -> testee().save((byte[]) null).block())
             .isInstanceOf(NullPointerException.class);
     }
 
     @Test
     default void saveShouldThrowWhenNullInputStream() {
-        assertThatThrownBy(() -> testee().save((InputStream) null))
+        assertThatThrownBy(() -> testee().save((InputStream) null).block())
             .isInstanceOf(NullPointerException.class);
     }
 
     @Test
     default void saveShouldSaveEmptyData() {
-        BlobId blobId = testee().save(EMPTY_BYTEARRAY).join();
+        BlobId blobId = testee().save(EMPTY_BYTEARRAY).block();
 
-        byte[] bytes = testee().readBytes(blobId).join();
+        byte[] bytes = testee().readBytes(blobId).block();
 
         assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
     }
 
     @Test
     default void saveShouldSaveEmptyInputStream() {
-        BlobId blobId = testee().save(new 
ByteArrayInputStream(EMPTY_BYTEARRAY)).join();
+        BlobId blobId = testee().save(new 
ByteArrayInputStream(EMPTY_BYTEARRAY)).block();
 
-        byte[] bytes = testee().readBytes(blobId).join();
+        byte[] bytes = testee().readBytes(blobId).block();
 
         assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
     }
 
     @Test
     default void saveShouldReturnBlobId() {
-        BlobId blobId = testee().save(SHORT_BYTEARRAY).join();
+        BlobId blobId = testee().save(SHORT_BYTEARRAY).block();
 
         
assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66"));
     }
@@ -81,40 +81,40 @@ public interface BlobStoreContract {
     @Test
     default void saveShouldReturnBlobIdOfInputStream() {
         BlobId blobId =
-            testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY)).join();
+            testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY)).block();
 
         
assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66"));
     }
 
     @Test
     default void readBytesShouldThrowWhenNoExisting() {
-        assertThatThrownBy(() -> 
testee().readBytes(blobIdFactory().from("unknown")).join())
-            .hasCauseInstanceOf(ObjectStoreException.class);
+        assertThatThrownBy(() -> 
testee().readBytes(blobIdFactory().from("unknown")).block())
+            .isExactlyInstanceOf(ObjectStoreException.class);
     }
 
     @Test
     default void readBytesShouldReturnSavedData() {
-        BlobId blobId = testee().save(SHORT_BYTEARRAY).join();
+        BlobId blobId = testee().save(SHORT_BYTEARRAY).block();
 
-        byte[] bytes = testee().readBytes(blobId).join();
+        byte[] bytes = testee().readBytes(blobId).block();
 
         assertThat(bytes).isEqualTo(SHORT_BYTEARRAY);
     }
 
     @Test
     default void readBytesShouldReturnLongSavedData() {
-        BlobId blobId = testee().save(ELEVEN_KILOBYTES).join();
+        BlobId blobId = testee().save(ELEVEN_KILOBYTES).block();
 
-        byte[] bytes = testee().readBytes(blobId).join();
+        byte[] bytes = testee().readBytes(blobId).block();
 
         assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES);
     }
 
     @Test
     default void readBytesShouldReturnBigSavedData() {
-        BlobId blobId = testee().save(TWELVE_MEGABYTES).join();
+        BlobId blobId = testee().save(TWELVE_MEGABYTES).block();
 
-        byte[] bytes = testee().readBytes(blobId).join();
+        byte[] bytes = testee().readBytes(blobId).block();
 
         assertThat(bytes).isEqualTo(TWELVE_MEGABYTES);
     }
@@ -127,7 +127,7 @@ public interface BlobStoreContract {
 
     @Test
     default void readShouldReturnSavedData() {
-        BlobId blobId = testee().save(SHORT_BYTEARRAY).join();
+        BlobId blobId = testee().save(SHORT_BYTEARRAY).block();
 
         InputStream read = testee().read(blobId);
 
@@ -136,7 +136,7 @@ public interface BlobStoreContract {
 
     @Test
     default void readShouldReturnLongSavedData() {
-        BlobId blobId = testee().save(ELEVEN_KILOBYTES).join();
+        BlobId blobId = testee().save(ELEVEN_KILOBYTES).block();
 
         InputStream read = testee().read(blobId);
 
@@ -146,7 +146,7 @@ public interface BlobStoreContract {
     @Test
     default void readShouldReturnBigSavedData() {
         // 12 MB of text
-        BlobId blobId = testee().save(TWELVE_MEGABYTES).join();
+        BlobId blobId = testee().save(TWELVE_MEGABYTES).block();
 
         InputStream read = testee().read(blobId);
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
index f645589..c568764 100644
--- 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
@@ -79,30 +79,30 @@ public interface MetricableBlobStoreContract extends 
BlobStoreContract {
 
     @Test
     default void saveBytesShouldPublishSaveBytesTimerMetrics() {
-        testee().save(BYTES_CONTENT).join();
-        testee().save(BYTES_CONTENT).join();
+        testee().save(BYTES_CONTENT).block();
+        testee().save(BYTES_CONTENT).block();
         verify(metricsTestExtension.saveBytesTimeMetric, 
times(2)).stopAndPublish();
     }
 
     @Test
     default void saveInputStreamShouldPublishSaveInputStreamTimerMetrics() {
-        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).join();
-        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).join();
-        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).join();
+        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
+        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
+        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
         verify(metricsTestExtension.saveInputStreamTimeMetric, 
times(3)).stopAndPublish();
     }
 
     @Test
     default void readBytesShouldPublishReadBytesTimerMetrics() {
-        BlobId blobId = testee().save(BYTES_CONTENT).join();
-        testee().readBytes(blobId).join();
-        testee().readBytes(blobId).join();
+        BlobId blobId = testee().save(BYTES_CONTENT).block();
+        testee().readBytes(blobId).block();
+        testee().readBytes(blobId).block();
         verify(metricsTestExtension.readBytesTimeMetric, 
times(2)).stopAndPublish();
     }
 
     @Test
     default void readShouldPublishReadTimerMetrics() {
-        BlobId blobId = testee().save(BYTES_CONTENT).join();
+        BlobId blobId = testee().save(BYTES_CONTENT).block();
         testee().read(blobId);
         testee().read(blobId);
         verify(metricsTestExtension.readTimeMetric, times(2)).stopAndPublish();

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
index 01c6e8d..2dfc808 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
@@ -28,7 +28,6 @@ import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -49,7 +48,6 @@ import 
org.apache.james.blob.cassandra.utils.PipedStreamSubscriber;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
-import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Bytes;
@@ -115,10 +113,10 @@ public class CassandraBlobsDAO implements BlobStore {
     }
 
     @Override
-    public CompletableFuture<BlobId> save(byte[] data) {
+    public Mono<BlobId> save(byte[] data) {
         Preconditions.checkNotNull(data);
 
-        return saveAsMono(data).toFuture();
+        return saveAsMono(data);
     }
 
     private Mono<BlobId> saveAsMono(byte[] data) {
@@ -164,17 +162,10 @@ public class CassandraBlobsDAO implements BlobStore {
     }
 
     @Override
-    public CompletableFuture<byte[]> readBytes(BlobId blobId) {
-        try {
-            return readBlobParts(blobId)
-                .collectList()
-                .map(parts -> Bytes.concat(parts.toArray(new byte[0][])))
-                .toFuture();
-        } catch (ObjectStoreException e) {
-            CompletableFuture<byte[]> error = new CompletableFuture<>();
-            error.completeExceptionally(e);
-            return error;
-        }
+    public Mono<byte[]> readBytes(BlobId blobId) {
+        return readBlobParts(blobId)
+            .collectList()
+            .map(parts -> Bytes.concat(parts.toArray(new byte[0][])));
     }
 
     private Mono<Integer> selectRowCount(BlobId blobId) {
@@ -220,11 +211,9 @@ public class CassandraBlobsDAO implements BlobStore {
     }
 
     @Override
-    public CompletableFuture<BlobId> save(InputStream data) {
+    public Mono<BlobId> save(InputStream data) {
         Preconditions.checkNotNull(data);
-        return Mono.fromSupplier(Throwing.supplier(() -> 
IOUtils.toByteArray(data)).sneakyThrow())
-            .publishOn(Schedulers.elastic())
-            .flatMap(this::saveAsMono)
-            .toFuture();
+        return Mono.fromCallable(() -> IOUtils.toByteArray(data))
+            .flatMap(this::saveAsMono);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
index 86eeb86..51a9933 100644
--- 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
@@ -72,16 +72,16 @@ public class CassandraBlobsDAOTest implements 
MetricableBlobStoreContract {
     @Test
     void readBytesShouldReturnSplitSavedDataByChunk() {
         String longString = Strings.repeat("0123456789\n", 
MULTIPLE_CHUNK_SIZE);
-        BlobId blobId = 
testee.save(longString.getBytes(StandardCharsets.UTF_8)).join();
+        BlobId blobId = 
testee.save(longString.getBytes(StandardCharsets.UTF_8)).block();
 
-        byte[] bytes = testee.readBytes(blobId).join();
+        byte[] bytes = testee.readBytes(blobId).block();
 
         assertThat(new String(bytes, 
StandardCharsets.UTF_8)).isEqualTo(longString);
     }
 
     @Test
     void blobStoreShouldSupport100MBBlob() {
-        BlobId blobId = testee.save(new ZeroedInputStream(100_000_000)).join();
+        BlobId blobId = testee.save(new 
ZeroedInputStream(100_000_000)).block();
         InputStream bytes = testee.read(blobId);
         assertThat(bytes).hasSameContentAs(new ZeroedInputStream(100_000_000));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
 
b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
index c76b975..9f72fc4 100644
--- 
a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
+++ 
b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
@@ -23,16 +23,15 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.ObjectStoreException;
-import org.apache.james.util.CompletableFutureUtil;
 
 import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
 
 public class MemoryBlobStore implements BlobStore {
     private final ConcurrentHashMap<BlobId, byte[]> blobs;
@@ -44,17 +43,17 @@ public class MemoryBlobStore implements BlobStore {
     }
 
     @Override
-    public CompletableFuture<BlobId> save(byte[] data) {
+    public Mono<BlobId> save(byte[] data) {
         Preconditions.checkNotNull(data);
         BlobId blobId = factory.forPayload(data);
 
         blobs.put(blobId, data);
 
-        return CompletableFuture.completedFuture(blobId);
+        return Mono.just(blobId);
     }
 
     @Override
-    public CompletableFuture<BlobId> save(InputStream data) {
+    public Mono<BlobId> save(InputStream data) {
         Preconditions.checkNotNull(data);
         try {
             byte[] bytes = IOUtils.toByteArray(data);
@@ -65,12 +64,8 @@ public class MemoryBlobStore implements BlobStore {
     }
 
     @Override
-    public CompletableFuture<byte[]> readBytes(BlobId blobId) {
-        try {
-            return 
CompletableFuture.completedFuture(retrieveStoredValue(blobId));
-        } catch (ObjectStoreException e) {
-            return CompletableFutureUtil.exceptionallyFuture(e);
-        }
+    public Mono<byte[]> readBytes(BlobId blobId) {
+        return Mono.fromCallable(() -> retrieveStoredValue(blobId));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-objectstorage/pom.xml
----------------------------------------------------------------------
diff --git a/server/blob/blob-objectstorage/pom.xml 
b/server/blob/blob-objectstorage/pom.xml
index 6a35af0..a27c88d 100644
--- a/server/blob/blob-objectstorage/pom.xml
+++ b/server/blob/blob-objectstorage/pom.xml
@@ -115,6 +115,10 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
 
b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
index b51f431..7b02932 100644
--- 
a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
+++ 
b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
@@ -22,9 +22,6 @@ package org.apache.james.blob.objectstorage;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
@@ -33,7 +30,6 @@ import org.apache.james.blob.api.ObjectStoreException;
 import org.apache.james.blob.objectstorage.swift.SwiftKeystone2ObjectStorage;
 import org.apache.james.blob.objectstorage.swift.SwiftKeystone3ObjectStorage;
 import org.apache.james.blob.objectstorage.swift.SwiftTempAuthObjectStorage;
-import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.jclouds.blobstore.domain.Blob;
 import org.jclouds.blobstore.options.CopyOptions;
 import org.jclouds.domain.Location;
@@ -41,10 +37,10 @@ import org.jclouds.io.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.HashingInputStream;
+import reactor.core.publisher.Mono;
 
 public class ObjectStorageBlobsDAO implements BlobStore {
     private static final Location DEFAULT_LOCATION = null;
@@ -56,7 +52,6 @@ public class ObjectStorageBlobsDAO implements BlobStore {
     private final ContainerName containerName;
     private final org.jclouds.blobstore.BlobStore blobStore;
     private final PayloadCodec payloadCodec;
-    private final Executor executor;
 
     ObjectStorageBlobsDAO(ContainerName containerName, BlobId.Factory 
blobIdFactory,
                           org.jclouds.blobstore.BlobStore blobStore, 
PayloadCodec payloadCodec) {
@@ -64,7 +59,6 @@ public class ObjectStorageBlobsDAO implements BlobStore {
         this.containerName = containerName;
         this.blobStore = blobStore;
         this.payloadCodec = payloadCodec;
-        this.executor = 
Executors.newCachedThreadPool(NamedThreadFactory.withClassName(getClass()));
     }
 
     public static ObjectStorageBlobsDAOBuilder.RequireContainerName 
builder(SwiftTempAuthObjectStorage.Configuration testConfig) {
@@ -79,53 +73,48 @@ public class ObjectStorageBlobsDAO implements BlobStore {
         return SwiftKeystone3ObjectStorage.daoBuilder(testConfig);
     }
 
-    public CompletableFuture<ContainerName> createContainer(ContainerName 
name) {
-        return CompletableFuture.supplyAsync(() -> 
blobStore.createContainerInLocation(DEFAULT_LOCATION, name.value()))
-            .thenApply(created -> {
-                if (!created) {
-                    LOGGER.debug("{} already existed", name);
-                }
-                return name;
-            });
+    public Mono<ContainerName> createContainer(ContainerName name) {
+        return Mono.fromCallable(() -> 
blobStore.createContainerInLocation(DEFAULT_LOCATION, name.value()))
+            .filter(created -> created == false)
+            .doOnNext(ignored -> LOGGER.debug("{} already existed", name))
+            .thenReturn(name);
     }
 
     @Override
-    public CompletableFuture<BlobId> save(byte[] data) {
+    public Mono<BlobId> save(byte[] data) {
         return save(new ByteArrayInputStream(data));
     }
 
     @Override
-    public CompletableFuture<BlobId> save(InputStream data) {
+    public Mono<BlobId> save(InputStream data) {
         Preconditions.checkNotNull(data);
 
         BlobId tmpId = blobIdFactory.randomId();
         return save(data, tmpId)
-            .thenCompose(id -> updateBlobId(tmpId, id));
+            .flatMap(id -> updateBlobId(tmpId, id));
     }
 
-    private CompletableFuture<BlobId> updateBlobId(BlobId from, BlobId to) {
+    private Mono<BlobId> updateBlobId(BlobId from, BlobId to) {
         String containerName = this.containerName.value();
-        return CompletableFuture
-            .supplyAsync(() -> blobStore.copyBlob(containerName, 
from.asString(), containerName, to.asString(), CopyOptions.NONE), executor)
-            .thenAcceptAsync(any -> blobStore.removeBlob(containerName, 
from.asString()))
-            .thenApply(any -> to);
+        return Mono
+            .fromCallable(() -> blobStore.copyBlob(containerName, 
from.asString(), containerName, to.asString(), CopyOptions.NONE))
+            .then(Mono.fromRunnable(() -> blobStore.removeBlob(containerName, 
from.asString())))
+            .thenReturn(to);
     }
 
-    private CompletableFuture<BlobId> save(InputStream data, BlobId id) {
+    private Mono<BlobId> save(InputStream data, BlobId id) {
         String containerName = this.containerName.value();
         HashingInputStream hashingInputStream = new 
HashingInputStream(Hashing.sha256(), data);
         Payload payload = payloadCodec.write(hashingInputStream);
         Blob blob = 
blobStore.blobBuilder(id.asString()).payload(payload).build();
 
-        return CompletableFuture
-            .supplyAsync(() -> blobStore.putBlob(containerName, blob), 
executor)
-            .thenApply(any -> 
blobIdFactory.from(hashingInputStream.hash().toString()));
+        return Mono.fromCallable(() -> blobStore.putBlob(containerName, blob))
+            .then(Mono.fromCallable(() -> 
blobIdFactory.from(hashingInputStream.hash().toString())));
     }
 
     @Override
-    public CompletableFuture<byte[]> readBytes(BlobId blobId) {
-        return CompletableFuture
-            .supplyAsync(Throwing.supplier(() -> 
IOUtils.toByteArray(read(blobId))).sneakyThrow(), executor);
+    public Mono<byte[]> readBytes(BlobId blobId) {
+        return Mono.fromCallable(() -> IOUtils.toByteArray(read(blobId)));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOContract.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOContract.java
 
b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOContract.java
index e113cb5..409272d 100644
--- 
a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOContract.java
+++ 
b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOContract.java
@@ -36,9 +36,9 @@ public interface ObjectStorageBlobsDAOContract {
 
     default void 
assertBlobsDAOCanStoreAndRetrieve(ObjectStorageBlobsDAOBuilder.ReadyToBuild 
builder) {
         ObjectStorageBlobsDAO dao = builder.build();
-        dao.createContainer(containerName());
+        dao.createContainer(containerName()).block();
 
-        BlobId blobId = dao.save(BYTES).join();
+        BlobId blobId = dao.save(BYTES).block();
 
         InputStream inputStream = dao.read(blobId);
         assertThat(inputStream).hasSameContentAs(new 
ByteArrayInputStream(BYTES));

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
 
b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
index 78c6a8a..95c1fec 100644
--- 
a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
+++ 
b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
@@ -26,7 +26,6 @@ import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
@@ -48,6 +47,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import com.google.common.base.Strings;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 @ExtendWith(DockerSwiftExtension.class)
 public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
@@ -85,7 +86,7 @@ public class ObjectStorageBlobsDAOTest implements 
MetricableBlobStoreContract {
             .blobIdFactory(blobIdFactory);
         blobStore = daoBuilder.getSupplier().get();
         objectStorageBlobsDAO = daoBuilder.build();
-        objectStorageBlobsDAO.createContainer(containerName);
+        objectStorageBlobsDAO.createContainer(containerName).block();
         testee = new 
MetricableBlobStore(metricsTestExtension.getMetricFactory(), 
objectStorageBlobsDAO);
     }
 
@@ -106,18 +107,18 @@ public class ObjectStorageBlobsDAOTest implements 
MetricableBlobStoreContract {
     }
 
     @Test
-    void createContainerShouldMakeTheContainerToExist() throws Exception {
+    void createContainerShouldMakeTheContainerToExist() {
         ContainerName containerName = 
ContainerName.of(UUID.randomUUID().toString());
-        objectStorageBlobsDAO.createContainer(containerName).get();
+        objectStorageBlobsDAO.createContainer(containerName).block();
         assertThat(blobStore.containerExists(containerName.value())).isTrue();
     }
 
     @Test
-    void 
createContainerShouldNotFailWithRuntimeExceptionWhenCreateContainerTwice() 
throws Exception {
+    void 
createContainerShouldNotFailWithRuntimeExceptionWhenCreateContainerTwice() {
         ContainerName containerName = 
ContainerName.of(UUID.randomUUID().toString());
 
-        objectStorageBlobsDAO.createContainer(containerName).get();
-        assertThatCode(() -> 
objectStorageBlobsDAO.createContainer(containerName).get())
+        objectStorageBlobsDAO.createContainer(containerName).block();
+        assertThatCode(() -> 
objectStorageBlobsDAO.createContainer(containerName).block())
             .doesNotThrowAnyException();
     }
 
@@ -130,7 +131,7 @@ public class ObjectStorageBlobsDAOTest implements 
MetricableBlobStoreContract {
             .payloadCodec(new AESPayloadCodec(CRYPTO_CONFIG))
             .build();
         byte[] bytes = "James is the best!".getBytes(StandardCharsets.UTF_8);
-        BlobId blobId = encryptedDao.save(bytes).join();
+        BlobId blobId = encryptedDao.save(bytes).block();
 
         InputStream read = encryptedDao.read(blobId);
         assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes));
@@ -145,7 +146,7 @@ public class ObjectStorageBlobsDAOTest implements 
MetricableBlobStoreContract {
             .payloadCodec(new AESPayloadCodec(CRYPTO_CONFIG))
             .build();
         byte[] bytes = "James is the best!".getBytes(StandardCharsets.UTF_8);
-        BlobId blobId = encryptedDao.save(bytes).join();
+        BlobId blobId = encryptedDao.save(bytes).block();
 
         InputStream encryptedIs = testee.read(blobId);
         assertThat(encryptedIs).isNotNull();
@@ -166,24 +167,25 @@ public class ObjectStorageBlobsDAOTest implements 
MetricableBlobStoreContract {
     @Test
     void saveBytesShouldNotCompleteWhenDoesNotAwait() {
         // String need to be big enough to get async thread busy hence could 
not return result instantly
-        CompletableFuture<BlobId> blobIdFuture = 
testee.save(BIG_STRING.getBytes(StandardCharsets.UTF_8));
-        assertThat(blobIdFuture)
-            .isNotCompleted();
+        Mono<BlobId> blobIdFuture = testee
+            .save(BIG_STRING.getBytes(StandardCharsets.UTF_8))
+            .subscribeOn(Schedulers.elastic());
+        assertThat(blobIdFuture.toFuture()).isNotCompleted();
     }
 
     @Test
     void saveInputStreamShouldNotCompleteWhenDoesNotAwait() {
-        CompletableFuture<BlobId> blobIdFuture = testee.save(new 
ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)));
-        assertThat(blobIdFuture)
-            .isNotCompleted();
+        Mono<BlobId> blobIdFuture = testee
+            .save(new 
ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)))
+            .subscribeOn(Schedulers.elastic());
+        assertThat(blobIdFuture.toFuture()).isNotCompleted();
     }
 
     @Test
     void readBytesShouldNotCompleteWhenDoesNotAwait() {
-        BlobId blobId = 
testee().save(BIG_STRING.getBytes(StandardCharsets.UTF_8)).join();
-        CompletableFuture<byte[]> resultFuture = testee.readBytes(blobId);
-        assertThat(resultFuture)
-            .isNotCompleted();
+        BlobId blobId = 
testee().save(BIG_STRING.getBytes(StandardCharsets.UTF_8)).block();
+        Mono<byte[]> resultFuture = 
testee.readBytes(blobId).subscribeOn(Schedulers.elastic());
+        assertThat(resultFuture.toFuture()).isNotCompleted();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
 
b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
index c69367f..950fcf7 100644
--- 
a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
+++ 
b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PushbackInputStream;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import reactor.core.publisher.Mono;
 
 public class UnionBlobStore implements BlobStore {
 
@@ -80,11 +80,11 @@ public class UnionBlobStore implements BlobStore {
     }
 
     @Override
-    public CompletableFuture<BlobId> save(byte[] data) {
+    public Mono<BlobId> save(byte[] data) {
         try {
             return saveToCurrentFallbackIfFails(
-                currentBlobStore.save(data),
-                () -> legacyBlobStore.save(data));
+                Mono.defer(() -> currentBlobStore.save(data)),
+                () -> Mono.defer(() -> legacyBlobStore.save(data)));
         } catch (Exception e) {
             LOGGER.error("exception directly happens while saving bytes data, 
fall back to legacy blob store", e);
             return legacyBlobStore.save(data);
@@ -92,11 +92,11 @@ public class UnionBlobStore implements BlobStore {
     }
 
     @Override
-    public CompletableFuture<BlobId> save(InputStream data) {
+    public Mono<BlobId> save(InputStream data) {
         try {
             return saveToCurrentFallbackIfFails(
-                currentBlobStore.save(data),
-                () -> legacyBlobStore.save(data));
+                Mono.defer(() -> currentBlobStore.save(data)),
+                () -> Mono.defer(() -> legacyBlobStore.save(data)));
         } catch (Exception e) {
             LOGGER.error("exception directly happens while saving InputStream 
data, fall back to legacy blob store", e);
             return legacyBlobStore.save(data);
@@ -104,12 +104,12 @@ public class UnionBlobStore implements BlobStore {
     }
 
     @Override
-    public CompletableFuture<byte[]> readBytes(BlobId blobId) {
+    public Mono<byte[]> readBytes(BlobId blobId) {
         try {
             return readBytesFallBackIfFailsOrEmptyResult(blobId);
         } catch (Exception e) {
             LOGGER.error("exception directly happens while readBytes, fall 
back to legacy blob store", e);
-            return legacyBlobStore.readBytes(blobId);
+            return Mono.defer(() -> legacyBlobStore.readBytes(blobId));
         }
     }
 
@@ -141,39 +141,24 @@ public class UnionBlobStore implements BlobStore {
         return false;
     }
 
-    private CompletableFuture<byte[]> 
readBytesFallBackIfFailsOrEmptyResult(BlobId blobId) {
-        return currentBlobStore.readBytes(blobId)
-            .thenApply(Optional::ofNullable)
-            .exceptionally(this::logAndReturnEmptyOptional)
-            .thenCompose(maybeBytes -> readFromLegacyIfNeeded(maybeBytes, 
blobId));
+    private Mono<byte[]> readBytesFallBackIfFailsOrEmptyResult(BlobId blobId) {
+        return Mono.defer(() -> currentBlobStore.readBytes(blobId))
+            .onErrorResume(this::logAndReturnEmpty)
+            .switchIfEmpty(legacyBlobStore.readBytes(blobId));
     }
 
-    private CompletableFuture<BlobId> saveToCurrentFallbackIfFails(
-        CompletableFuture<BlobId> currentSavingOperation,
-        Supplier<CompletableFuture<BlobId>> fallbackSavingOperationSupplier) {
+    private Mono<BlobId> saveToCurrentFallbackIfFails(
+        Mono<BlobId> currentSavingOperation,
+        Supplier<Mono<BlobId>> fallbackSavingOperationSupplier) {
 
         return currentSavingOperation
-            .thenApply(Optional::ofNullable)
-            .exceptionally(this::logAndReturnEmptyOptional)
-            .thenCompose(maybeBlobId -> saveToLegacyIfNeeded(maybeBlobId, 
fallbackSavingOperationSupplier));
+            .onErrorResume(this::logAndReturnEmpty)
+            .switchIfEmpty(fallbackSavingOperationSupplier.get());
     }
 
-    private <T> Optional<T> logAndReturnEmptyOptional(Throwable throwable) {
+    private <T> Mono<T> logAndReturnEmpty(Throwable throwable) {
         LOGGER.error("error happens from current blob store, fall back to 
legacy blob store", throwable);
-        return Optional.empty();
-    }
-
-    private CompletableFuture<BlobId> saveToLegacyIfNeeded(Optional<BlobId> 
maybeBlobId,
-                                                           
Supplier<CompletableFuture<BlobId>> saveToLegacySupplier) {
-        return maybeBlobId
-            .map(CompletableFuture::completedFuture)
-            .orElseGet(saveToLegacySupplier);
-    }
-
-    private CompletableFuture<byte[]> readFromLegacyIfNeeded(Optional<byte[]> 
readFromCurrentResult, BlobId blodId) {
-        return readFromCurrentResult
-            .map(CompletableFuture::completedFuture)
-            .orElseGet(() -> legacyBlobStore.readBytes(blodId));
+        return Mono.empty();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
 
b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
index b614ed7..db1a5b5 100644
--- 
a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
+++ 
b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
@@ -37,7 +37,6 @@ import org.apache.james.blob.api.BlobStoreContract;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.ObjectStoreException;
 import org.apache.james.blob.memory.MemoryBlobStore;
-import org.apache.james.util.CompletableFutureUtil;
 import org.apache.james.util.StreamUtils;
 import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -50,23 +49,25 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.testcontainers.shaded.com.google.common.base.MoreObjects;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Mono;
+
 class UnionBlobStoreTest implements BlobStoreContract {
 
-    private static class FutureThrowingBlobStore implements BlobStore {
+    private static class FailingBlobStore implements BlobStore {
 
         @Override
-        public CompletableFuture<BlobId> save(byte[] data) {
-            return CompletableFutureUtil.exceptionallyFuture(new 
RuntimeException("broken everywhere"));
+        public Mono<BlobId> save(byte[] data) {
+            return Mono.error(new RuntimeException("broken everywhere"));
         }
 
         @Override
-        public CompletableFuture<BlobId> save(InputStream data) {
-            return CompletableFutureUtil.exceptionallyFuture(new 
RuntimeException("broken everywhere"));
+        public Mono<BlobId> save(InputStream data) {
+            return Mono.error(new RuntimeException("broken everywhere"));
         }
 
         @Override
-        public CompletableFuture<byte[]> readBytes(BlobId blobId) {
-            return CompletableFutureUtil.exceptionallyFuture(new 
RuntimeException("broken everywhere"));
+        public Mono<byte[]> readBytes(BlobId blobId) {
+            return Mono.error(new RuntimeException("broken everywhere"));
         }
 
         @Override
@@ -84,17 +85,17 @@ class UnionBlobStoreTest implements BlobStoreContract {
     private static class ThrowingBlobStore implements BlobStore {
 
         @Override
-        public CompletableFuture<BlobId> save(byte[] data) {
+        public Mono<BlobId> save(byte[] data) {
             throw new RuntimeException("broken everywhere");
         }
 
         @Override
-        public CompletableFuture<BlobId> save(InputStream data) {
+        public Mono<BlobId> save(InputStream data) {
             throw new RuntimeException("broken everywhere");
         }
 
         @Override
-        public CompletableFuture<byte[]> readBytes(BlobId blobId) {
+        public Mono<byte[]> readBytes(BlobId blobId) {
             throw new RuntimeException("broken everywhere");
         }
 
@@ -141,13 +142,13 @@ class UnionBlobStoreTest implements BlobStoreContract {
     class CurrentSaveThrowsExceptionDirectly {
 
         @Test
-        void saveShouldFallBackToLegacyWhenCurrentGotException() throws 
Exception {
+        void saveShouldFallBackToLegacyWhenCurrentGotException() {
             MemoryBlobStore legacyBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
             UnionBlobStore unionBlobStore = UnionBlobStore.builder()
                 .current(new ThrowingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get();
+            BlobId blobId = unionBlobStore.save(BLOB_CONTENT).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(unionBlobStore.read(blobId))
@@ -158,13 +159,13 @@ class UnionBlobStoreTest implements BlobStoreContract {
         }
 
         @Test
-        void saveInputStreamShouldFallBackToLegacyWhenCurrentGotException() 
throws Exception {
+        void saveInputStreamShouldFallBackToLegacyWhenCurrentGotException() {
             MemoryBlobStore legacyBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
             UnionBlobStore unionBlobStore = UnionBlobStore.builder()
                 .current(new ThrowingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = unionBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).get();
+            BlobId blobId = unionBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(unionBlobStore.read(blobId))
@@ -179,13 +180,13 @@ class UnionBlobStoreTest implements BlobStoreContract {
     class CurrentSaveCompletesExceptionally {
 
         @Test
-        void saveShouldFallBackToLegacyWhenCurrentCompletedExceptionally() 
throws Exception {
+        void saveShouldFallBackToLegacyWhenCurrentCompletedExceptionally() {
             MemoryBlobStore legacyBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
             UnionBlobStore unionBlobStore = UnionBlobStore.builder()
-                .current(new FutureThrowingBlobStore())
+                .current(new FailingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get();
+            BlobId blobId = unionBlobStore.save(BLOB_CONTENT).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(unionBlobStore.read(blobId))
@@ -196,13 +197,13 @@ class UnionBlobStoreTest implements BlobStoreContract {
         }
 
         @Test
-        void 
saveInputStreamShouldFallBackToLegacyWhenCurrentCompletedExceptionally() throws 
Exception {
+        void 
saveInputStreamShouldFallBackToLegacyWhenCurrentCompletedExceptionally() {
             MemoryBlobStore legacyBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
             UnionBlobStore unionBlobStore = UnionBlobStore.builder()
-                .current(new FutureThrowingBlobStore())
+                .current(new FailingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = unionBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).get();
+            BlobId blobId = unionBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(unionBlobStore.read(blobId))
@@ -218,29 +219,29 @@ class UnionBlobStoreTest implements BlobStoreContract {
     class CurrentReadThrowsExceptionDirectly {
 
         @Test
-        void readShouldReturnFallbackToLegacyWhenCurrentGotException() throws 
Exception {
+        void readShouldReturnFallbackToLegacyWhenCurrentGotException() {
             MemoryBlobStore legacyBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
             UnionBlobStore unionBlobStore = UnionBlobStore.builder()
                 .current(new ThrowingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get();
+            BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block();
 
             assertThat(unionBlobStore.read(blobId))
                 .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
         }
 
         @Test
-        void readBytesShouldReturnFallbackToLegacyWhenCurrentGotException() 
throws Exception {
+        void readBytesShouldReturnFallbackToLegacyWhenCurrentGotException() {
             MemoryBlobStore legacyBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
 
             UnionBlobStore unionBlobStore = UnionBlobStore.builder()
                 .current(new ThrowingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get();
+            BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block();
 
-            assertThat(unionBlobStore.readBytes(blobId).get())
+            assertThat(unionBlobStore.readBytes(blobId).block())
                 .isEqualTo(BLOB_CONTENT);
         }
 
@@ -250,28 +251,28 @@ class UnionBlobStoreTest implements BlobStoreContract {
     class CurrentReadCompletesExceptionally {
 
         @Test
-        void 
readShouldReturnFallbackToLegacyWhenCurrentCompletedExceptionally() throws 
Exception {
+        void 
readShouldReturnFallbackToLegacyWhenCurrentCompletedExceptionally() {
             MemoryBlobStore legacyBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
             UnionBlobStore unionBlobStore = UnionBlobStore.builder()
-                .current(new FutureThrowingBlobStore())
+                .current(new FailingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get();
+            BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block();
 
             assertThat(unionBlobStore.read(blobId))
                 .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
         }
 
         @Test
-        void 
readBytesShouldReturnFallbackToLegacyWhenCurrentCompletedExceptionally() throws 
Exception {
+        void 
readBytesShouldReturnFallbackToLegacyWhenCurrentCompletedExceptionally() {
             MemoryBlobStore legacyBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
             UnionBlobStore unionBlobStore = UnionBlobStore.builder()
-                .current(new FutureThrowingBlobStore())
+                .current(new FailingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get();
+            BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block();
 
-            assertThat(unionBlobStore.readBytes(blobId).get())
+            assertThat(unionBlobStore.readBytes(blobId).block())
                 .isEqualTo(BLOB_CONTENT);
         }
     }
@@ -281,7 +282,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
     class CurrentAndLegacyCouldNotComplete {
 
 
-        Stream<Function<UnionBlobStore, CompletableFuture<?>>> 
blobStoreOperationsReturnFutures() {
+        Stream<Function<UnionBlobStore, Mono<?>>> 
blobStoreOperationsReturnFutures() {
             return Stream.of(
                 blobStore -> blobStore.save(BLOB_CONTENT),
                 blobStore -> blobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)),
@@ -297,15 +298,15 @@ class UnionBlobStoreTest implements BlobStoreContract {
             List<UnionBlobStore> futureThrowingUnionBlobStores = 
ImmutableList.of(
                 UnionBlobStore.builder()
                     .current(new ThrowingBlobStore())
-                    .legacy(new FutureThrowingBlobStore())
+                    .legacy(new FailingBlobStore())
                     .build(),
                 UnionBlobStore.builder()
-                    .current(new FutureThrowingBlobStore())
+                    .current(new FailingBlobStore())
                     .legacy(new ThrowingBlobStore())
                     .build(),
                 UnionBlobStore.builder()
-                    .current(new FutureThrowingBlobStore())
-                    .legacy(new FutureThrowingBlobStore())
+                    .current(new FailingBlobStore())
+                    .legacy(new FailingBlobStore())
                     .build());
 
             return blobStoreOperationsReturnFutures()
@@ -338,74 +339,74 @@ class UnionBlobStoreTest implements BlobStoreContract {
         @ParameterizedTest
         @MethodSource("blobStoresCauseReturnExceptionallyFutures")
         void operationShouldReturnExceptionallyFuture(UnionBlobStore 
blobStoreReturnsExceptionallyFuture,
-                                                      Function<UnionBlobStore, 
CompletableFuture<?>> blobStoreOperation) {
-            
assertThat(blobStoreOperation.apply(blobStoreReturnsExceptionallyFuture))
-                .isCompletedExceptionally();
+                                                      Function<UnionBlobStore, 
Mono<?>> blobStoreOperation) {
+            Mono<?> mono = 
blobStoreOperation.apply(blobStoreReturnsExceptionallyFuture);
+            
assertThatThrownBy(mono::block).isInstanceOf(RuntimeException.class);
         }
     }
 
     @Test
-    void readShouldReturnFromCurrentWhenAvailable() throws Exception {
-        BlobId blobId = currentBlobStore.save(BLOB_CONTENT).get();
+    void readShouldReturnFromCurrentWhenAvailable() {
+        BlobId blobId = currentBlobStore.save(BLOB_CONTENT).block();
 
         assertThat(unionBlobStore.read(blobId))
             .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
     }
 
     @Test
-    void readShouldReturnFromLegacyWhenCurrentNotAvailable() throws Exception {
-        BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get();
+    void readShouldReturnFromLegacyWhenCurrentNotAvailable() {
+        BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block();
 
         assertThat(unionBlobStore.read(blobId))
             .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
     }
 
     @Test
-    void readBytesShouldReturnFromCurrentWhenAvailable() throws Exception {
-        BlobId blobId = currentBlobStore.save(BLOB_CONTENT).get();
+    void readBytesShouldReturnFromCurrentWhenAvailable() {
+        BlobId blobId = currentBlobStore.save(BLOB_CONTENT).block();
 
-        assertThat(unionBlobStore.readBytes(blobId).get())
+        assertThat(unionBlobStore.readBytes(blobId).block())
             .isEqualTo(BLOB_CONTENT);
     }
 
     @Test
-    void readBytesShouldReturnFromLegacyWhenCurrentNotAvailable() throws 
Exception {
-        BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get();
+    void readBytesShouldReturnFromLegacyWhenCurrentNotAvailable() {
+        BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).block();
 
-        assertThat(unionBlobStore.readBytes(blobId).get())
+        assertThat(unionBlobStore.readBytes(blobId).block())
             .isEqualTo(BLOB_CONTENT);
     }
 
     @Test
-    void saveShouldWriteToCurrent() throws Exception {
-        BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get();
+    void saveShouldWriteToCurrent() {
+        BlobId blobId = unionBlobStore.save(BLOB_CONTENT).block();
 
-        assertThat(currentBlobStore.readBytes(blobId).get())
+        assertThat(currentBlobStore.readBytes(blobId).block())
             .isEqualTo(BLOB_CONTENT);
     }
 
     @Test
-    void saveShouldNotWriteToLegacy() throws Exception {
-        BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get();
+    void saveShouldNotWriteToLegacy() {
+        BlobId blobId = unionBlobStore.save(BLOB_CONTENT).block();
 
-        assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).join())
-            .hasCauseInstanceOf(ObjectStoreException.class);
+        assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).block())
+            .isInstanceOf(ObjectStoreException.class);
     }
 
     @Test
-    void saveInputStreamShouldWriteToCurrent() throws Exception {
-        BlobId blobId = unionBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).get();
+    void saveInputStreamShouldWriteToCurrent() {
+        BlobId blobId = unionBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).block();
 
-        assertThat(currentBlobStore.readBytes(blobId).get())
+        assertThat(currentBlobStore.readBytes(blobId).block())
             .isEqualTo(BLOB_CONTENT);
     }
 
     @Test
-    void saveInputStreamShouldNotWriteToLegacy() throws Exception {
-        BlobId blobId = unionBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).get();
+    void saveInputStreamShouldNotWriteToLegacy() {
+        BlobId blobId = unionBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).block();
 
-        assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).join())
-            .isNotInstanceOf(ObjectStoreException.class);
+        assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).block())
+            .isInstanceOf(ObjectStoreException.class);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
----------------------------------------------------------------------
diff --git 
a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
 
b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
index d8de475..4bec4a4 100644
--- 
a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
+++ 
b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
@@ -70,9 +70,9 @@ class MimeMessageStoreTest {
             .setText("Important mail content")
             .build();
 
-        MimeMessagePartsId parts = testee.save(message).join();
+        MimeMessagePartsId parts = testee.save(message).block();
 
-        MimeMessage retrievedMessage = testee.read(parts).join();
+        MimeMessage retrievedMessage = testee.read(parts).block();
 
         assertThat(MimeMessageUtil.asString(retrievedMessage))
             .isEqualTo(MimeMessageUtil.asString(message));
@@ -86,9 +86,9 @@ class MimeMessageStoreTest {
             .setSubject("Important Mail")
             .build();
 
-        MimeMessagePartsId parts = testee.save(message).join();
+        MimeMessagePartsId parts = testee.save(message).block();
 
-        MimeMessage retrievedMessage = testee.read(parts).join();
+        MimeMessage retrievedMessage = testee.read(parts).block();
 
         assertThat(MimeMessageUtil.asString(retrievedMessage))
             .isEqualTo(MimeMessageUtil.asString(message));
@@ -105,14 +105,14 @@ class MimeMessageStoreTest {
             .setText("Important mail content")
             .build();
 
-        MimeMessagePartsId parts = testee.save(message).join();
+        MimeMessagePartsId parts = testee.save(message).block();
 
         SoftAssertions.assertSoftly(
             softly -> {
                 BlobId headerBlobId = parts.getHeaderBlobId();
                 BlobId bodyBlobId = parts.getBodyBlobId();
 
-                softly.assertThat(new 
String(blobStore.readBytes(headerBlobId).join(), StandardCharsets.UTF_8))
+                softly.assertThat(new 
String(blobStore.readBytes(headerBlobId).block(), StandardCharsets.UTF_8))
                     .isEqualTo("Date: Thu, 6 Sep 2018 13:29:13 +0700 
(ICT)\r\n" +
                         "From: [email protected]\r\n" +
                         "To: [email protected]\r\n" +
@@ -121,7 +121,7 @@ class MimeMessageStoreTest {
                         "MIME-Version: 1.0\r\n" +
                         "Content-Type: text/plain; charset=UTF-8\r\n" +
                         "Content-Transfer-Encoding: 7bit\r\n\r\n");
-                softly.assertThat(new 
String(blobStore.readBytes(bodyBlobId).join(), StandardCharsets.UTF_8))
+                softly.assertThat(new 
String(blobStore.readBytes(bodyBlobId).block(), StandardCharsets.UTF_8))
                     .isEqualTo("Important mail content");
             });
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java
 
b/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java
index 0aeb26d..8cb907a 100644
--- 
a/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java
+++ 
b/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java
@@ -20,8 +20,8 @@
 package org.apache.james.modules.objectstorage;
 
 import java.io.FileNotFoundException;
+import java.time.Duration;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import javax.inject.Singleton;
@@ -70,7 +70,7 @@ public class ObjectStorageDependenciesModule extends 
AbstractModule {
             .blobIdFactory(blobIdFactory)
             .payloadCodec(configuration.getPayloadCodec())
             .build();
-        dao.createContainer(configuration.getNamespace()).get(1, 
TimeUnit.MINUTES);
+        
dao.createContainer(configuration.getNamespace()).block(Duration.ofMinutes(1));
         return dao;
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/ObjectStorageBlobStoreModuleTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/ObjectStorageBlobStoreModuleTest.java
 
b/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/ObjectStorageBlobStoreModuleTest.java
index 692c2b7..3678f22 100644
--- 
a/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/ObjectStorageBlobStoreModuleTest.java
+++ 
b/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/ObjectStorageBlobStoreModuleTest.java
@@ -123,7 +123,7 @@ class ObjectStorageBlobStoreModuleTest {
                 .with(binder -> 
binder.bind(ObjectStorageBlobConfiguration.class).toInstance(configuration)));
 
         ObjectStorageBlobsDAO dao = 
injector.getInstance(ObjectStorageBlobsDAO.class);
-        dao.createContainer(configuration.getNamespace());
+        dao.createContainer(configuration.getNamespace()).block();
 
         BlobStore blobStore = injector.getInstance(Key.get(BlobStore.class, 
Names.named(MetricableBlobStore.BLOB_STORE_IMPLEMENTATION)));
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index 6f005e8..893035a 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -61,6 +61,7 @@ public class CassandraMailRepository implements 
MailRepository {
         MailKey mailKey = MailKey.forMail(mail);
 
         Mono.fromFuture(mimeMessageStore.save(mail.getMessage())
+            .toFuture()
             .thenCompose(Throwing.function(parts -> mailDAO.store(url, mail,
                 parts.getHeaderBlobId(),
                 parts.getBodyBlobId()))))
@@ -101,6 +102,7 @@ public class CassandraMailRepository implements 
MailRepository {
             .build();
 
         return mimeMessageStore.read(parts)
+            .toFuture()
             .thenApply(mimeMessage -> mailDTO.getMailBuilder()
                 .mimeMessage(mimeMessage)
                 .build());

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
index b39ed2d..683919a 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
@@ -95,17 +95,13 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
         class FailingStore implements Store<MimeMessage, MimeMessagePartsId> {
 
             @Override
-            public CompletableFuture<MimeMessagePartsId> save(MimeMessage 
mimeMessage) {
-                return CompletableFuture.supplyAsync(() -> {
-                    throw new RuntimeException("Expected failure while 
saving");
-                });
+            public Mono<MimeMessagePartsId> save(MimeMessage mimeMessage) {
+                return Mono.error(new RuntimeException("Expected failure while 
saving"));
             }
 
             @Override
-            public CompletableFuture<MimeMessage> read(MimeMessagePartsId 
blobIds) {
-                return CompletableFuture.supplyAsync(() -> {
-                    throw new RuntimeException("Expected failure while 
reading");
-                });
+            public Mono<MimeMessage> read(MimeMessagePartsId blobIds) {
+                return Mono.error(new RuntimeException("Expected failure while 
reading"));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index 7a93fe0..349c878 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -69,7 +69,7 @@ class Enqueuer {
 
     private CompletableFuture<MimeMessagePartsId> saveMail(Mail mail) throws 
MailQueue.MailQueueException {
         try {
-            return mimeMessageStore.save(mail.getMessage());
+            return mimeMessageStore.save(mail.getMessage()).toFuture();
         } catch (MessagingException e) {
             throw new MailQueue.MailQueueException("Error while saving blob", 
e);
         }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
index 0016702..f69c533 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
@@ -46,7 +46,7 @@ class MailLoader {
                     .headerBlobId(blobIdFactory.from(dto.getHeaderBlobId()))
                     .bodyBlobId(blobIdFactory.from(dto.getBodyBlobId()))
                     .build())
-                .join();
+                .block();
 
             return dto.toMailWithMimeMessage(mimeMessage);
         } catch (AddressException e) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index 4279c15..29da254 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -115,7 +115,7 @@ public class CassandraMailQueueBrowser {
 
     private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext 
enqueuedItemWithSlicingContext) {
         EnqueuedItem enqueuedItem = 
enqueuedItemWithSlicingContext.getEnqueuedItem();
-        return 
Mono.fromCompletionStage(mimeMessageStore.read(enqueuedItem.getPartsId()))
+        return mimeMessageStore.read(enqueuedItem.getPartsId())
             .map(mimeMessage -> toMail(enqueuedItem, mimeMessage));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
index 57732cb..a97999a 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -59,7 +59,7 @@ public class CassandraMailQueueMailDelete {
     Mono<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
         return deletedMailsDao
             .markAsDeleted(mailQueueName, mailKey)
-            .doOnTerminate(() -> maybeUpdateBrowseStart(mailQueueName));
+            .doOnNext(ignored -> maybeUpdateBrowseStart(mailQueueName));
     }
 
     Mono<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/ba4d902e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index 211917a..f318e5e 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -117,7 +117,7 @@ public class CassandraMailQueueView implements 
MailQueueView {
             .filter(mailReference -> 
deleteCondition.shouldBeDeleted(mailReference.getMail()))
             .map(mailReference -> 
cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), 
mailQueueName))
             .count()
-            .doOnTerminate(() -> 
cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
+            .doOnNext(ignored -> 
cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
             .block();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to