This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1c91118d92d8ae677aa301ec90af28f777146e3b
Author: Matthieu Baechler <matth...@apache.org>
AuthorDate: Fri Jan 10 17:44:55 2020 +0100

    JAMES-3028 rewrite CassandraBlobStore using CassandraDumpBlobStore
---
 .../mail/CassandraAttachmentFallbackTest.java      |   2 +-
 .../cassandra/mail/CassandraMessageDAOTest.java    |   2 +-
 .../migration/AttachmentMessageIdCreationTest.java |   2 +-
 .../mail/migration/AttachmentV2MigrationTest.java  |   2 +-
 .../james/blob/cassandra/CassandraBlobStore.java   | 192 ++++++---------------
 .../blob/cassandra/CassandraDumbBlobStore.java     |   3 -
 .../blob/cassandra/CassandraBlobStoreTest.java     |  15 +-
 .../blob/cassandra/CassandraDumbBlobStoreTest.java |   1 -
 .../cassandra/CassandraMailRepositoryTest.java     |   2 +-
 ...aMailRepositoryWithFakeImplementationsTest.java |   4 +-
 .../RabbitMQMailQueueConfigurationChangeTest.java  |   2 +-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      |   2 +-
 12 files changed, 69 insertions(+), 160 deletions(-)

diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
index 4cf6df1..98aacf0 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
@@ -68,7 +68,7 @@ class CassandraAttachmentFallbackTest {
         attachmentDAOV2 = new CassandraAttachmentDAOV2(BLOB_ID_FACTORY, 
cassandra.getConf());
         attachmentDAO = new CassandraAttachmentDAO(cassandra.getConf(),
             CassandraConfiguration.DEFAULT_CONFIGURATION);
-        blobStore = new CassandraBlobStore(cassandra.getConf());
+        blobStore = CassandraBlobStore.forTesting(cassandra.getConf());
         attachmentMessageIdDAO = new 
CassandraAttachmentMessageIdDAO(cassandra.getConf(), new 
CassandraMessageId.Factory());
         CassandraAttachmentOwnerDAO ownerDAO = new 
CassandraAttachmentOwnerDAO(cassandra.getConf());
         attachmentMapper = new CassandraAttachmentMapper(attachmentDAO, 
attachmentDAOV2, blobStore, attachmentMessageIdDAO, ownerDAO);
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index 9581a15..bec5100 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -93,7 +93,7 @@ class CassandraMessageDAOTest {
     void setUp(CassandraCluster cassandra) {
         messageIdFactory = new CassandraMessageId.Factory();
         messageId = messageIdFactory.generate();
-        CassandraBlobStore blobStore = new 
CassandraBlobStore(cassandra.getConf());
+        CassandraBlobStore blobStore = 
CassandraBlobStore.forTesting(cassandra.getConf());
         HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
         testee = new CassandraMessageDAO(cassandra.getConf(), 
cassandra.getTypesProvider(), blobStore, blobIdFactory,
             new CassandraMessageId.Factory());
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
index 7317a97..531c719 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java
@@ -86,7 +86,7 @@ class AttachmentMessageIdCreationTest {
     void setUp(CassandraCluster cassandra) {
         CassandraMessageId.Factory messageIdFactory = new 
CassandraMessageId.Factory();
 
-        blobStore = new CassandraBlobStore(cassandra.getConf());
+        blobStore = CassandraBlobStore.forTesting(cassandra.getConf());
         cassandraMessageDAO = new CassandraMessageDAO(cassandra.getConf(), 
cassandra.getTypesProvider(),
             blobStore, new HashBlobId.Factory(), messageIdFactory);
 
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
index 1ddaf1b..4e40a47 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
@@ -72,7 +72,7 @@ class AttachmentV2MigrationTest {
         attachmentDAO = new CassandraAttachmentDAO(cassandra.getConf(),
             CassandraConfiguration.DEFAULT_CONFIGURATION);
         attachmentDAOV2 = new CassandraAttachmentDAOV2(BLOB_ID_FACTORY, 
cassandra.getConf());
-        blobsStore = new CassandraBlobStore(cassandra.getConf());
+        blobsStore = CassandraBlobStore.forTesting(cassandra.getConf());
         migration = new AttachmentV2Migration(attachmentDAO, attachmentDAOV2, 
blobsStore);
 
         attachment1 = Attachment.builder()
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 5976354..4a7d876 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -20,202 +20,114 @@
 package org.apache.james.blob.cassandra;
 
 import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
-import org.apache.james.blob.api.ObjectNotFoundException;
-import org.apache.james.blob.cassandra.utils.DataChunker;
-import org.apache.james.util.ReactorUtils;
 
 import com.datastax.driver.core.Session;
+import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import reactor.core.publisher.Flux;
+import com.google.common.hash.Hashing;
+import com.google.common.hash.HashingInputStream;
+import com.google.common.io.FileBackedOutputStream;
+
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
 public class CassandraBlobStore implements BlobStore {
 
-    private static final int PREFETCH = 16;
-    private static final int MAX_CONCURRENCY = 1;
-    private final CassandraDefaultBucketDAO defaultBucketDAO;
-    private final CassandraBucketDAO bucketDAO;
-    private final DataChunker dataChunker;
-    private final CassandraConfiguration configuration;
+    public static final boolean LAZY_RESSOURCE_CLEANUP = false;
+    public static final int FILE_THRESHOLD = 10000;
     private final HashBlobId.Factory blobIdFactory;
+    private final BucketName defaultBucketName;
+    private final CassandraDumbBlobStore dumbBlobStore;
 
     @Inject
-    CassandraBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, 
CassandraBucketDAO bucketDAO, CassandraConfiguration cassandraConfiguration, 
HashBlobId.Factory blobIdFactory) {
-        this.defaultBucketDAO = defaultBucketDAO;
-        this.bucketDAO = bucketDAO;
-        this.configuration = cassandraConfiguration;
+    CassandraBlobStore(HashBlobId.Factory blobIdFactory, BucketName 
defaultBucketName, CassandraDumbBlobStore dumbBlobStore) {
         this.blobIdFactory = blobIdFactory;
-        this.dataChunker = new DataChunker();
+        this.defaultBucketName = defaultBucketName;
+        this.dumbBlobStore = dumbBlobStore;
     }
 
     @VisibleForTesting
-    public CassandraBlobStore(Session session) {
-        this(new CassandraDefaultBucketDAO(session),
-            new CassandraBucketDAO(new HashBlobId.Factory(), session),
-            CassandraConfiguration.DEFAULT_CONFIGURATION,
-            new HashBlobId.Factory());
+    public static CassandraBlobStore forTesting(Session session) {
+        HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+        CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, 
session);
+        CassandraDefaultBucketDAO defaultBucketDAO = new 
CassandraDefaultBucketDAO(session);
+        return new CassandraBlobStore(
+            blobIdFactory,
+            BucketName.DEFAULT,
+            new CassandraDumbBlobStore(defaultBucketDAO, bucketDAO, 
CassandraConfiguration.DEFAULT_CONFIGURATION, BucketName.DEFAULT));
     }
 
     @Override
     public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy 
storagePolicy) {
+        Preconditions.checkNotNull(bucketName);
         Preconditions.checkNotNull(data);
-        return saveAsMono(bucketName, data);
-    }
 
-    private Mono<BlobId> saveAsMono(BucketName bucketName, byte[] data) {
         BlobId blobId = blobIdFactory.forPayload(data);
-        return Mono.fromCallable(() -> dataChunker.chunk(data, 
configuration.getBlobPartSize()))
-            .flatMap(chunks -> saveBlobParts(bucketName, blobId, chunks))
-            .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, 
blobId, numberOfChunk))
-            .thenReturn(blobId);
-    }
 
-    private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, 
Flux<ByteBuffer> chunksAsFlux) {
-        return chunksAsFlux
-            .publishOn(Schedulers.elastic(), PREFETCH)
-            .index()
-            .flatMap(pair -> writePart(bucketName, blobId, 
pair.getT1().intValue(), pair.getT2())
-                .then(Mono.just(getChunkNum(pair))))
-            .collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
-            .<Integer>handle((t, sink) -> t.ifPresent(sink::next))
-            .map(this::numToCount)
-            .defaultIfEmpty(0);
+        return dumbBlobStore.save(bucketName, blobId, data)
+            .then(Mono.just(blobId));
     }
 
-
-    private int numToCount(int number) {
-        return number + 1;
+    @Override
+    public Mono<BlobId> save(BucketName bucketName, InputStream data, 
StoragePolicy storagePolicy) {
+        Preconditions.checkNotNull(bucketName);
+        Preconditions.checkNotNull(data);
+        HashingInputStream hashingInputStream = new 
HashingInputStream(Hashing.sha256(), data);
+        return Mono.using(
+            () -> new FileBackedOutputStream(FILE_THRESHOLD),
+            fileBackedOutputStream -> saveAndGenerateBlobId(bucketName, 
hashingInputStream, fileBackedOutputStream),
+            Throwing.consumer(FileBackedOutputStream::reset).sneakyThrow(),
+            LAZY_RESSOURCE_CLEANUP);
     }
 
-    private Integer getChunkNum(Tuple2<Long, ByteBuffer> pair) {
-        return pair.getT1().intValue();
+    private Mono<BlobId> saveAndGenerateBlobId(BucketName bucketName, 
HashingInputStream hashingInputStream, FileBackedOutputStream 
fileBackedOutputStream) {
+        return Mono.fromCallable(() -> {
+            IOUtils.copy(hashingInputStream, fileBackedOutputStream);
+            return 
Tuples.of(blobIdFactory.from(hashingInputStream.hash().toString()), 
fileBackedOutputStream.asByteSource());
+        })
+            .flatMap(tuple -> dumbBlobStore.save(bucketName, tuple.getT1(), 
tuple.getT2()).thenReturn(tuple.getT1()));
     }
 
     @Override
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
-        return readBlobParts(bucketName, blobId)
-            .collectList()
-            .map(this::byteBuffersToBytesArray);
+        Preconditions.checkNotNull(bucketName);
+        return dumbBlobStore.readBytes(bucketName, blobId);
     }
 
     @Override
     public InputStream read(BucketName bucketName, BlobId blobId) {
-        return ReactorUtils.toInputStream(readBlobParts(bucketName, blobId));
-    }
-
-    @Override
-    public BucketName getDefaultBucketName() {
-        return BucketName.DEFAULT;
-    }
-
-    private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId 
blobId) {
-        return selectRowCount(bucketName, blobId)
-            .publishOn(Schedulers.elastic())
-            .single()
-            .onErrorResume(NoSuchElementException.class, e -> Mono.error(
-                new ObjectNotFoundException(String.format("Could not retrieve 
blob metadata for %s", blobId))))
-            .flatMapMany(rowCount -> Flux.range(0, rowCount))
-            .publishOn(Schedulers.elastic(), PREFETCH)
-            .flatMapSequential(partIndex -> readPart(bucketName, blobId, 
partIndex)
-                .single()
-                .onErrorResume(NoSuchElementException.class, e -> Mono.error(
-                    new ObjectNotFoundException(String.format("Missing blob 
part for blobId %s and position %d", blobId, partIndex)))),
-                MAX_CONCURRENCY, PREFETCH);
-    }
-
-    @Override
-    public Mono<BlobId> save(BucketName bucketName, InputStream data, 
StoragePolicy storagePolicy) {
-        Preconditions.checkNotNull(data);
-        return Mono.fromCallable(() -> IOUtils.toByteArray(data))
-            .flatMap(bytes -> saveAsMono(bucketName, bytes));
+        Preconditions.checkNotNull(bucketName);
+        return dumbBlobStore.read(bucketName, blobId);
     }
 
     @Override
     public Mono<Void> deleteBucket(BucketName bucketName) {
         Preconditions.checkNotNull(bucketName);
-        Preconditions.checkArgument(!isDefaultBucket(bucketName), "Deleting 
the default bucket is forbidden");
 
-        return bucketDAO.listAll()
-            .filter(bucketNameBlobIdPair -> 
bucketNameBlobIdPair.getKey().equals(bucketName))
-            .map(Pair::getValue)
-            .flatMap(blobId -> delete(bucketName, blobId))
-            .then();
+        return dumbBlobStore.deleteBucket(bucketName);
     }
 
     @Override
-    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
-        if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.deletePosition(blobId)
-                .then(defaultBucketDAO.deleteParts(blobId));
-        } else {
-            return bucketDAO.deletePosition(bucketName, blobId)
-                .then(bucketDAO.deleteParts(bucketName, blobId));
-        }
-    }
-
-    private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, 
Integer partIndex) {
-        if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.readPart(blobId, partIndex);
-        } else {
-            return bucketDAO.readPart(bucketName, blobId, partIndex);
-        }
-    }
-
-    private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) 
{
-        if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.selectRowCount(blobId);
-        } else {
-            return bucketDAO.selectRowCount(bucketName, blobId);
-        }
-    }
-
-    private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId 
blobId, Integer numberOfChunk) {
-        if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.saveBlobPartsReferences(blobId, 
numberOfChunk);
-        } else {
-            return bucketDAO.saveBlobPartsReferences(bucketName, blobId, 
numberOfChunk);
-        }
+    public BucketName getDefaultBucketName() {
+        return defaultBucketName;
     }
 
-    private Mono<Void> writePart(BucketName bucketName, BlobId blobId, int 
position, ByteBuffer data) {
-        if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.writePart(data, blobId, position);
-        } else {
-            return bucketDAO.writePart(data, bucketName, blobId, position);
-        }
-    }
+    @Override
+    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
+        Preconditions.checkNotNull(bucketName);
+        Preconditions.checkNotNull(blobId);
 
-    private boolean isDefaultBucket(BucketName bucketName) {
-        return bucketName.equals(getDefaultBucketName());
+        return dumbBlobStore.delete(bucketName, blobId);
     }
 
-    private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) {
-        int targetSize = byteBuffers
-            .stream()
-            .mapToInt(ByteBuffer::remaining)
-            .sum();
-
-        return byteBuffers
-            .stream()
-            .reduce(ByteBuffer.allocate(targetSize), (accumulator, element) -> 
accumulator.put(element))
-            .array();
-    }
 }
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
index 1ac59f6..581c4f1 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
@@ -52,19 +52,16 @@ public class CassandraDumbBlobStore implements 
DumbBlobStore {
     private final CassandraBucketDAO bucketDAO;
     private final DataChunker dataChunker;
     private final CassandraConfiguration configuration;
-    private final HashBlobId.Factory blobIdFactory;
     private final BucketName defaultBucket;
 
     @Inject
     CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO,
                            CassandraBucketDAO bucketDAO,
                            CassandraConfiguration cassandraConfiguration,
-                           HashBlobId.Factory blobIdFactory,
                            BucketName defaultBucket) {
         this.defaultBucketDAO = defaultBucketDAO;
         this.bucketDAO = bucketDAO;
         this.configuration = cassandraConfiguration;
-        this.blobIdFactory = blobIdFactory;
         this.defaultBucket = defaultBucket;
         this.dataChunker = new DataChunker();
     }
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
index 63c774e..51f6da8 100644
--- 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
@@ -35,6 +35,7 @@ import 
org.apache.james.backends.cassandra.CassandraClusterExtension;
 import 
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.MetricableBlobStore;
 import org.apache.james.blob.api.MetricableBlobStoreContract;
@@ -47,7 +48,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import com.google.common.base.Strings;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.HashingInputStream;
-
 import reactor.core.publisher.Mono;
 
 public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
@@ -65,14 +65,15 @@ public class CassandraBlobStoreTest implements 
MetricableBlobStoreContract {
         HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
         CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, 
cassandra.getConf());
         defaultBucketDAO = spy(new 
CassandraDefaultBucketDAO(cassandra.getConf()));
+        CassandraConfiguration cassandraConfiguration = 
CassandraConfiguration.builder()
+            .blobPartSize(CHUNK_SIZE)
+            .build();
         testee = new MetricableBlobStore(
             metricsTestExtension.getMetricFactory(),
-            new CassandraBlobStore(defaultBucketDAO,
-                bucketDAO,
-                CassandraConfiguration.builder()
-                    .blobPartSize(CHUNK_SIZE)
-                    .build(),
-                blobIdFactory));
+            new CassandraBlobStore(
+                blobIdFactory,
+                BucketName.DEFAULT,
+                new CassandraDumbBlobStore(defaultBucketDAO, bucketDAO, 
cassandraConfiguration, BucketName.DEFAULT)));
     }
 
     @Override
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java
index 0e81087..42ce749 100644
--- 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java
@@ -50,7 +50,6 @@ public class CassandraDumbBlobStoreTest implements 
DumbBlobStoreContract {
                 CassandraConfiguration.builder()
                     .blobPartSize(CHUNK_SIZE)
                     .build(),
-                blobIdFactory,
             BucketName.DEFAULT);
     }
 
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
index fa39f8c..f329125 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
@@ -57,7 +57,7 @@ class CassandraMailRepositoryTest implements 
MailRepositoryContract {
         CassandraMailRepositoryMailDaoAPI mailDAO = new 
MergingCassandraMailRepositoryMailDao(v1, v2);
         CassandraMailRepositoryKeysDAO keysDAO = new 
CassandraMailRepositoryKeysDAO(cassandra.getConf(), 
CassandraUtils.WITH_DEFAULT_CONFIGURATION);
         CassandraMailRepositoryCountDAO countDAO = new 
CassandraMailRepositoryCountDAO(cassandra.getConf());
-        CassandraBlobStore blobStore = new 
CassandraBlobStore(cassandra.getConf());
+        CassandraBlobStore blobStore = 
CassandraBlobStore.forTesting(cassandra.getConf());
 
         cassandraMailRepository = new CassandraMailRepository(URL,
             keysDAO, countDAO, mailDAO, 
MimeMessageStore.factory(blobStore).mimeMessageStore());
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
index b59cabe..203b594 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
@@ -127,7 +127,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
             FailingMailDAO mailDAO = new FailingMailDAO();
             keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), 
CassandraUtils.WITH_DEFAULT_CONFIGURATION);
             CassandraMailRepositoryCountDAO countDAO = new 
CassandraMailRepositoryCountDAO(cassandra.getConf());
-            CassandraBlobStore blobStore = new 
CassandraBlobStore(cassandra.getConf());
+            CassandraBlobStore blobStore = 
CassandraBlobStore.forTesting(cassandra.getConf());
 
             cassandraMailRepository = new CassandraMailRepository(URL,
                     keysDAO, countDAO, mailDAO, 
MimeMessageStore.factory(blobStore).mimeMessageStore());
@@ -212,7 +212,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
             CassandraMailRepositoryMailDaoAPI mailDAO = new 
CassandraMailRepositoryMailDAO(cassandra.getConf(), BLOB_ID_FACTORY, 
cassandra.getTypesProvider());
             FailingKeysDAO keysDAO = new FailingKeysDAO(cassandra.getConf(), 
CassandraUtils.WITH_DEFAULT_CONFIGURATION);
             countDAO = new 
CassandraMailRepositoryCountDAO(cassandra.getConf());
-            CassandraBlobStore blobStore = new 
CassandraBlobStore(cassandra.getConf());
+            CassandraBlobStore blobStore = 
CassandraBlobStore.forTesting(cassandra.getConf());
 
             cassandraMailRepository = new CassandraMailRepository(URL,
                     keysDAO, countDAO, mailDAO, 
MimeMessageStore.factory(blobStore).mimeMessageStore());
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index 0e599f3..f363509 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -92,7 +92,7 @@ class RabbitMQMailQueueConfigurationChangeTest {
 
     @BeforeEach
     void setup(CassandraCluster cassandra) throws Exception {
-        CassandraBlobStore blobsDAO = new 
CassandraBlobStore(cassandra.getConf());
+        CassandraBlobStore blobsDAO = 
CassandraBlobStore.forTesting(cassandra.getConf());
         mimeMessageStoreFactory = MimeMessageStore.factory(blobsDAO);
         clock = new UpdatableTickingClock(IN_SLICE_1);
         mqManagementApi = new 
RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index f8f7bd3..a745662 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -291,7 +291,7 @@ class RabbitMQMailQueueTest {
     }
 
     private void setUp(CassandraCluster cassandra, 
MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, 
RabbitMQMailQueueConfiguration configuration) throws Exception {
-        CassandraBlobStore blobStore = new 
CassandraBlobStore(cassandra.getConf());
+        CassandraBlobStore blobStore = 
CassandraBlobStore.forTesting(cassandra.getConf());
         MimeMessageStore.Factory mimeMessageStoreFactory = 
MimeMessageStore.factory(blobStore);
         clock = new UpdatableTickingClock(IN_SLICE_1);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to