This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1c91118d92d8ae677aa301ec90af28f777146e3b Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Fri Jan 10 17:44:55 2020 +0100 JAMES-3028 rewrite CassandraBlobStore using CassandraDumpBlobStore --- .../mail/CassandraAttachmentFallbackTest.java | 2 +- .../cassandra/mail/CassandraMessageDAOTest.java | 2 +- .../migration/AttachmentMessageIdCreationTest.java | 2 +- .../mail/migration/AttachmentV2MigrationTest.java | 2 +- .../james/blob/cassandra/CassandraBlobStore.java | 192 ++++++--------------- .../blob/cassandra/CassandraDumbBlobStore.java | 3 - .../blob/cassandra/CassandraBlobStoreTest.java | 15 +- .../blob/cassandra/CassandraDumbBlobStoreTest.java | 1 - .../cassandra/CassandraMailRepositoryTest.java | 2 +- ...aMailRepositoryWithFakeImplementationsTest.java | 4 +- .../RabbitMQMailQueueConfigurationChangeTest.java | 2 +- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 2 +- 12 files changed, 69 insertions(+), 160 deletions(-) diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java index 4cf6df1..98aacf0 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java @@ -68,7 +68,7 @@ class CassandraAttachmentFallbackTest { attachmentDAOV2 = new CassandraAttachmentDAOV2(BLOB_ID_FACTORY, cassandra.getConf()); attachmentDAO = new CassandraAttachmentDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION); - blobStore = new CassandraBlobStore(cassandra.getConf()); + blobStore = CassandraBlobStore.forTesting(cassandra.getConf()); attachmentMessageIdDAO = new CassandraAttachmentMessageIdDAO(cassandra.getConf(), new CassandraMessageId.Factory()); CassandraAttachmentOwnerDAO ownerDAO = new CassandraAttachmentOwnerDAO(cassandra.getConf()); attachmentMapper = new CassandraAttachmentMapper(attachmentDAO, attachmentDAOV2, blobStore, attachmentMessageIdDAO, ownerDAO); diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java index 9581a15..bec5100 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java @@ -93,7 +93,7 @@ class CassandraMessageDAOTest { void setUp(CassandraCluster cassandra) { messageIdFactory = new CassandraMessageId.Factory(); messageId = messageIdFactory.generate(); - CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); + CassandraBlobStore blobStore = CassandraBlobStore.forTesting(cassandra.getConf()); HashBlobId.Factory blobIdFactory = new HashBlobId.Factory(); testee = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobStore, blobIdFactory, new CassandraMessageId.Factory()); diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java index 7317a97..531c719 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreationTest.java @@ -86,7 +86,7 @@ class AttachmentMessageIdCreationTest { void setUp(CassandraCluster cassandra) { CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory(); - blobStore = new CassandraBlobStore(cassandra.getConf()); + blobStore = CassandraBlobStore.forTesting(cassandra.getConf()); cassandraMessageDAO = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobStore, new HashBlobId.Factory(), messageIdFactory); diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java index 1ddaf1b..4e40a47 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java @@ -72,7 +72,7 @@ class AttachmentV2MigrationTest { attachmentDAO = new CassandraAttachmentDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION); attachmentDAOV2 = new CassandraAttachmentDAOV2(BLOB_ID_FACTORY, cassandra.getConf()); - blobsStore = new CassandraBlobStore(cassandra.getConf()); + blobsStore = CassandraBlobStore.forTesting(cassandra.getConf()); migration = new AttachmentV2Migration(attachmentDAO, attachmentDAOV2, blobsStore); attachment1 = Attachment.builder() diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java index 5976354..4a7d876 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java @@ -20,202 +20,114 @@ package org.apache.james.blob.cassandra; import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.Comparator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.stream.Collectors; import javax.inject.Inject; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.HashBlobId; -import org.apache.james.blob.api.ObjectNotFoundException; -import org.apache.james.blob.cassandra.utils.DataChunker; -import org.apache.james.util.ReactorUtils; import com.datastax.driver.core.Session; +import com.github.fge.lambdas.Throwing; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import reactor.core.publisher.Flux; +import com.google.common.hash.Hashing; +import com.google.common.hash.HashingInputStream; +import com.google.common.io.FileBackedOutputStream; + import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; public class CassandraBlobStore implements BlobStore { - private static final int PREFETCH = 16; - private static final int MAX_CONCURRENCY = 1; - private final CassandraDefaultBucketDAO defaultBucketDAO; - private final CassandraBucketDAO bucketDAO; - private final DataChunker dataChunker; - private final CassandraConfiguration configuration; + public static final boolean LAZY_RESSOURCE_CLEANUP = false; + public static final int FILE_THRESHOLD = 10000; private final HashBlobId.Factory blobIdFactory; + private final BucketName defaultBucketName; + private final CassandraDumbBlobStore dumbBlobStore; @Inject - CassandraBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, CassandraBucketDAO bucketDAO, CassandraConfiguration cassandraConfiguration, HashBlobId.Factory blobIdFactory) { - this.defaultBucketDAO = defaultBucketDAO; - this.bucketDAO = bucketDAO; - this.configuration = cassandraConfiguration; + CassandraBlobStore(HashBlobId.Factory blobIdFactory, BucketName defaultBucketName, CassandraDumbBlobStore dumbBlobStore) { this.blobIdFactory = blobIdFactory; - this.dataChunker = new DataChunker(); + this.defaultBucketName = defaultBucketName; + this.dumbBlobStore = dumbBlobStore; } @VisibleForTesting - public CassandraBlobStore(Session session) { - this(new CassandraDefaultBucketDAO(session), - new CassandraBucketDAO(new HashBlobId.Factory(), session), - CassandraConfiguration.DEFAULT_CONFIGURATION, - new HashBlobId.Factory()); + public static CassandraBlobStore forTesting(Session session) { + HashBlobId.Factory blobIdFactory = new HashBlobId.Factory(); + CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, session); + CassandraDefaultBucketDAO defaultBucketDAO = new CassandraDefaultBucketDAO(session); + return new CassandraBlobStore( + blobIdFactory, + BucketName.DEFAULT, + new CassandraDumbBlobStore(defaultBucketDAO, bucketDAO, CassandraConfiguration.DEFAULT_CONFIGURATION, BucketName.DEFAULT)); } @Override public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) { + Preconditions.checkNotNull(bucketName); Preconditions.checkNotNull(data); - return saveAsMono(bucketName, data); - } - private Mono<BlobId> saveAsMono(BucketName bucketName, byte[] data) { BlobId blobId = blobIdFactory.forPayload(data); - return Mono.fromCallable(() -> dataChunker.chunk(data, configuration.getBlobPartSize())) - .flatMap(chunks -> saveBlobParts(bucketName, blobId, chunks)) - .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk)) - .thenReturn(blobId); - } - private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux) { - return chunksAsFlux - .publishOn(Schedulers.elastic(), PREFETCH) - .index() - .flatMap(pair -> writePart(bucketName, blobId, pair.getT1().intValue(), pair.getT2()) - .then(Mono.just(getChunkNum(pair)))) - .collect(Collectors.maxBy(Comparator.comparingInt(x -> x))) - .<Integer>handle((t, sink) -> t.ifPresent(sink::next)) - .map(this::numToCount) - .defaultIfEmpty(0); + return dumbBlobStore.save(bucketName, blobId, data) + .then(Mono.just(blobId)); } - - private int numToCount(int number) { - return number + 1; + @Override + public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(data); + HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data); + return Mono.using( + () -> new FileBackedOutputStream(FILE_THRESHOLD), + fileBackedOutputStream -> saveAndGenerateBlobId(bucketName, hashingInputStream, fileBackedOutputStream), + Throwing.consumer(FileBackedOutputStream::reset).sneakyThrow(), + LAZY_RESSOURCE_CLEANUP); } - private Integer getChunkNum(Tuple2<Long, ByteBuffer> pair) { - return pair.getT1().intValue(); + private Mono<BlobId> saveAndGenerateBlobId(BucketName bucketName, HashingInputStream hashingInputStream, FileBackedOutputStream fileBackedOutputStream) { + return Mono.fromCallable(() -> { + IOUtils.copy(hashingInputStream, fileBackedOutputStream); + return Tuples.of(blobIdFactory.from(hashingInputStream.hash().toString()), fileBackedOutputStream.asByteSource()); + }) + .flatMap(tuple -> dumbBlobStore.save(bucketName, tuple.getT1(), tuple.getT2()).thenReturn(tuple.getT1())); } @Override public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { - return readBlobParts(bucketName, blobId) - .collectList() - .map(this::byteBuffersToBytesArray); + Preconditions.checkNotNull(bucketName); + return dumbBlobStore.readBytes(bucketName, blobId); } @Override public InputStream read(BucketName bucketName, BlobId blobId) { - return ReactorUtils.toInputStream(readBlobParts(bucketName, blobId)); - } - - @Override - public BucketName getDefaultBucketName() { - return BucketName.DEFAULT; - } - - private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) { - return selectRowCount(bucketName, blobId) - .publishOn(Schedulers.elastic()) - .single() - .onErrorResume(NoSuchElementException.class, e -> Mono.error( - new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)))) - .flatMapMany(rowCount -> Flux.range(0, rowCount)) - .publishOn(Schedulers.elastic(), PREFETCH) - .flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex) - .single() - .onErrorResume(NoSuchElementException.class, e -> Mono.error( - new ObjectNotFoundException(String.format("Missing blob part for blobId %s and position %d", blobId, partIndex)))), - MAX_CONCURRENCY, PREFETCH); - } - - @Override - public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { - Preconditions.checkNotNull(data); - return Mono.fromCallable(() -> IOUtils.toByteArray(data)) - .flatMap(bytes -> saveAsMono(bucketName, bytes)); + Preconditions.checkNotNull(bucketName); + return dumbBlobStore.read(bucketName, blobId); } @Override public Mono<Void> deleteBucket(BucketName bucketName) { Preconditions.checkNotNull(bucketName); - Preconditions.checkArgument(!isDefaultBucket(bucketName), "Deleting the default bucket is forbidden"); - return bucketDAO.listAll() - .filter(bucketNameBlobIdPair -> bucketNameBlobIdPair.getKey().equals(bucketName)) - .map(Pair::getValue) - .flatMap(blobId -> delete(bucketName, blobId)) - .then(); + return dumbBlobStore.deleteBucket(bucketName); } @Override - public Mono<Void> delete(BucketName bucketName, BlobId blobId) { - if (isDefaultBucket(bucketName)) { - return defaultBucketDAO.deletePosition(blobId) - .then(defaultBucketDAO.deleteParts(blobId)); - } else { - return bucketDAO.deletePosition(bucketName, blobId) - .then(bucketDAO.deleteParts(bucketName, blobId)); - } - } - - private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) { - if (isDefaultBucket(bucketName)) { - return defaultBucketDAO.readPart(blobId, partIndex); - } else { - return bucketDAO.readPart(bucketName, blobId, partIndex); - } - } - - private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) { - if (isDefaultBucket(bucketName)) { - return defaultBucketDAO.selectRowCount(blobId); - } else { - return bucketDAO.selectRowCount(bucketName, blobId); - } - } - - private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId blobId, Integer numberOfChunk) { - if (isDefaultBucket(bucketName)) { - return defaultBucketDAO.saveBlobPartsReferences(blobId, numberOfChunk); - } else { - return bucketDAO.saveBlobPartsReferences(bucketName, blobId, numberOfChunk); - } + public BucketName getDefaultBucketName() { + return defaultBucketName; } - private Mono<Void> writePart(BucketName bucketName, BlobId blobId, int position, ByteBuffer data) { - if (isDefaultBucket(bucketName)) { - return defaultBucketDAO.writePart(data, blobId, position); - } else { - return bucketDAO.writePart(data, bucketName, blobId, position); - } - } + @Override + public Mono<Void> delete(BucketName bucketName, BlobId blobId) { + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(blobId); - private boolean isDefaultBucket(BucketName bucketName) { - return bucketName.equals(getDefaultBucketName()); + return dumbBlobStore.delete(bucketName, blobId); } - private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) { - int targetSize = byteBuffers - .stream() - .mapToInt(ByteBuffer::remaining) - .sum(); - - return byteBuffers - .stream() - .reduce(ByteBuffer.allocate(targetSize), (accumulator, element) -> accumulator.put(element)) - .array(); - } } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java index 1ac59f6..581c4f1 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java @@ -52,19 +52,16 @@ public class CassandraDumbBlobStore implements DumbBlobStore { private final CassandraBucketDAO bucketDAO; private final DataChunker dataChunker; private final CassandraConfiguration configuration; - private final HashBlobId.Factory blobIdFactory; private final BucketName defaultBucket; @Inject CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, CassandraBucketDAO bucketDAO, CassandraConfiguration cassandraConfiguration, - HashBlobId.Factory blobIdFactory, BucketName defaultBucket) { this.defaultBucketDAO = defaultBucketDAO; this.bucketDAO = bucketDAO; this.configuration = cassandraConfiguration; - this.blobIdFactory = blobIdFactory; this.defaultBucket = defaultBucket; this.dataChunker = new DataChunker(); } diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java index 63c774e..51f6da8 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java @@ -35,6 +35,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.api.MetricableBlobStore; import org.apache.james.blob.api.MetricableBlobStoreContract; @@ -47,7 +48,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import com.google.common.base.Strings; import com.google.common.hash.Hashing; import com.google.common.hash.HashingInputStream; - import reactor.core.publisher.Mono; public class CassandraBlobStoreTest implements MetricableBlobStoreContract { @@ -65,14 +65,15 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract { HashBlobId.Factory blobIdFactory = new HashBlobId.Factory(); CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, cassandra.getConf()); defaultBucketDAO = spy(new CassandraDefaultBucketDAO(cassandra.getConf())); + CassandraConfiguration cassandraConfiguration = CassandraConfiguration.builder() + .blobPartSize(CHUNK_SIZE) + .build(); testee = new MetricableBlobStore( metricsTestExtension.getMetricFactory(), - new CassandraBlobStore(defaultBucketDAO, - bucketDAO, - CassandraConfiguration.builder() - .blobPartSize(CHUNK_SIZE) - .build(), - blobIdFactory)); + new CassandraBlobStore( + blobIdFactory, + BucketName.DEFAULT, + new CassandraDumbBlobStore(defaultBucketDAO, bucketDAO, cassandraConfiguration, BucketName.DEFAULT))); } @Override diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java index 0e81087..42ce749 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java @@ -50,7 +50,6 @@ public class CassandraDumbBlobStoreTest implements DumbBlobStoreContract { CassandraConfiguration.builder() .blobPartSize(CHUNK_SIZE) .build(), - blobIdFactory, BucketName.DEFAULT); } diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java index fa39f8c..f329125 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java @@ -57,7 +57,7 @@ class CassandraMailRepositoryTest implements MailRepositoryContract { CassandraMailRepositoryMailDaoAPI mailDAO = new MergingCassandraMailRepositoryMailDao(v1, v2); CassandraMailRepositoryKeysDAO keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION); CassandraMailRepositoryCountDAO countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf()); - CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); + CassandraBlobStore blobStore = CassandraBlobStore.forTesting(cassandra.getConf()); cassandraMailRepository = new CassandraMailRepository(URL, keysDAO, countDAO, mailDAO, MimeMessageStore.factory(blobStore).mimeMessageStore()); diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java index b59cabe..203b594 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java @@ -127,7 +127,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest { FailingMailDAO mailDAO = new FailingMailDAO(); keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION); CassandraMailRepositoryCountDAO countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf()); - CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); + CassandraBlobStore blobStore = CassandraBlobStore.forTesting(cassandra.getConf()); cassandraMailRepository = new CassandraMailRepository(URL, keysDAO, countDAO, mailDAO, MimeMessageStore.factory(blobStore).mimeMessageStore()); @@ -212,7 +212,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest { CassandraMailRepositoryMailDaoAPI mailDAO = new CassandraMailRepositoryMailDAO(cassandra.getConf(), BLOB_ID_FACTORY, cassandra.getTypesProvider()); FailingKeysDAO keysDAO = new FailingKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION); countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf()); - CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); + CassandraBlobStore blobStore = CassandraBlobStore.forTesting(cassandra.getConf()); cassandraMailRepository = new CassandraMailRepository(URL, keysDAO, countDAO, mailDAO, MimeMessageStore.factory(blobStore).mimeMessageStore()); diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java index 0e599f3..f363509 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java @@ -92,7 +92,7 @@ class RabbitMQMailQueueConfigurationChangeTest { @BeforeEach void setup(CassandraCluster cassandra) throws Exception { - CassandraBlobStore blobsDAO = new CassandraBlobStore(cassandra.getConf()); + CassandraBlobStore blobsDAO = CassandraBlobStore.forTesting(cassandra.getConf()); mimeMessageStoreFactory = MimeMessageStore.factory(blobsDAO); clock = new UpdatableTickingClock(IN_SLICE_1); mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI()); diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index f8f7bd3..a745662 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -291,7 +291,7 @@ class RabbitMQMailQueueTest { } private void setUp(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, RabbitMQMailQueueConfiguration configuration) throws Exception { - CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf()); + CassandraBlobStore blobStore = CassandraBlobStore.forTesting(cassandra.getConf()); MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore); clock = new UpdatableTickingClock(IN_SLICE_1); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org