This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit bea796d50e3311030ef57ff655be8723b769db8f Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Jan 3 17:52:26 2020 +0700 JAMES-2921 BlobStore API should include a StorageStrategy --- .../cassandra/mail/CassandraAttachmentMapper.java | 6 +- .../cassandra/mail/CassandraMessageDAO.java | 6 +- .../mail/migration/AttachmentV2Migration.java | 4 +- .../mail/CassandraAttachmentFallbackTest.java | 7 +- .../mail/migration/AttachmentV2MigrationTest.java | 16 +++-- .../vault/blob/BlobStoreDeletedMessageVault.java | 5 +- .../java/org/apache/james/blob/api/BlobStore.java | 14 ++-- .../apache/james/blob/api/MetricableBlobStore.java | 8 +-- .../main/java/org/apache/james/blob/api/Store.java | 6 +- .../apache/james/blob/api/BlobStoreContract.java | 31 +++++---- .../james/blob/api/BucketBlobStoreContract.java | 27 ++++---- .../james/blob/api/DeleteBlobStoreContract.java | 23 ++++--- .../blob/api/MetricableBlobStoreContract.java | 25 +++---- .../james/blob/cassandra/CassandraBlobStore.java | 4 +- .../blob/cassandra/CassandraBlobStoreTest.java | 9 +-- .../file/LocalFileBlobExportMechanismTest.java | 11 +-- .../apache/james/blob/memory/MemoryBlobStore.java | 6 +- .../blob/objectstorage/ObjectStorageBlobStore.java | 10 +-- .../ObjectStorageBlobStoreContract.java | 3 +- .../objectstorage/ObjectStorageBlobStoreTest.java | 15 ++-- .../apache/james/blob/union/UnionBlobStore.java | 33 +++++---- .../james/blob/union/UnionBlobStoreTest.java | 79 +++++++++++----------- .../apache/james/blob/mail/MimeMessageStore.java | 6 +- .../swift/ObjectStorageBlobStoreModuleTest.java | 3 +- .../james/webadmin/vault/routes/ExportService.java | 4 +- .../linshare/LinshareBlobExportMechanismTest.java | 9 +-- 26 files changed, 205 insertions(+), 165 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java index fddf40f..867a002 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java @@ -19,6 +19,8 @@ package org.apache.james.mailbox.cassandra.mail; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; + import java.util.Collection; import java.util.List; @@ -110,7 +112,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper { @Override public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException { ownerDAO.addOwner(attachment.getAttachmentId(), owner) - .then(blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes())) + .then(blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST)) .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) .flatMap(attachmentDAOV2::storeAttachment) .block(); @@ -137,7 +139,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper { } public Mono<Void> storeAttachmentAsync(Attachment attachment, MessageId ownerMessageId) { - return blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes()) + return blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST) .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) .flatMap(daoAttachment -> storeAttachmentWithIndex(daoAttachment, ownerMessageId)); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index ac232a4..16f81fd 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -23,6 +23,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED; import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID; import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.ATTACHMENTS; import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY; @@ -182,8 +184,8 @@ public class CassandraMessageDAO { byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent()); byte[] bodyContent = IOUtils.toByteArray(message.getBodyContent()); - Mono<BlobId> bodyFuture = blobStore.save(blobStore.getDefaultBucketName(), bodyContent); - Mono<BlobId> headerFuture = blobStore.save(blobStore.getDefaultBucketName(), headerContent); + Mono<BlobId> bodyFuture = blobStore.save(blobStore.getDefaultBucketName(), bodyContent, LOW_COST); + Mono<BlobId> headerFuture = blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED); return headerFuture.zipWith(bodyFuture); } catch (IOException e) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java index 333224a..25474f5 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java @@ -19,6 +19,8 @@ package org.apache.james.mailbox.cassandra.mail.migration; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; + import javax.inject.Inject; import org.apache.james.backends.cassandra.migration.Migration; @@ -55,7 +57,7 @@ public class AttachmentV2Migration implements Migration { } private Mono<Void> migrateAttachment(Attachment attachment) { - return blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes()) + return blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST) .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) .flatMap(attachmentDAOV2::storeAttachment) .then(attachmentDAOV1.deleteAttachment(attachment.getAttachmentId())); diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java index 5c45b28..4cf6df1 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.mail; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -98,7 +99,7 @@ class CassandraAttachmentFallbackTest { .bytes("{\"property\":`\"different\"}".getBytes(StandardCharsets.UTF_8)) .build(); - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes()).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST).block(); attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block(); attachmentDAO.storeAttachment(otherAttachment).block(); @@ -133,7 +134,7 @@ class CassandraAttachmentFallbackTest { .bytes("{\"property\":`\"different\"}".getBytes(StandardCharsets.UTF_8)) .build(); - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes()).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST).block(); attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block(); attachmentDAO.storeAttachment(otherAttachment).block(); @@ -168,7 +169,7 @@ class CassandraAttachmentFallbackTest { .bytes("{\"property\":`\"different\"}".getBytes(StandardCharsets.UTF_8)) .build(); - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes()).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST).block(); attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block(); attachmentDAO.storeAttachment(otherAttachment).block(); diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java index 405ad9c..1ddaf1b 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java @@ -19,8 +19,10 @@ package org.apache.james.mailbox.cassandra.mail.migration; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -152,7 +154,7 @@ class AttachmentV2MigrationTest { when(attachmentDAO.retrieveAll()).thenReturn(Flux.just( attachment1, attachment2)); - when(blobsStore.save(any(BucketName.class), any(byte[].class))).thenThrow(new RuntimeException()); + when(blobsStore.save(any(BucketName.class), any(byte[].class), eq(LOW_COST))).thenThrow(new RuntimeException()); assertThat(migration.asTask().run()).isEqualTo(Task.Result.PARTIAL); } @@ -167,9 +169,9 @@ class AttachmentV2MigrationTest { when(attachmentDAO.retrieveAll()).thenReturn(Flux.just( attachment1, attachment2)); - when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment1.getBytes())) + when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment1.getBytes(), LOW_COST)) .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); - when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment2.getBytes())) + when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment2.getBytes(), LOW_COST)) .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); when(attachmentDAOV2.storeAttachment(any())).thenThrow(new RuntimeException()); @@ -186,9 +188,9 @@ class AttachmentV2MigrationTest { when(attachmentDAO.retrieveAll()).thenReturn(Flux.just( attachment1, attachment2)); - when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment1.getBytes())) + when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment1.getBytes(), LOW_COST)) .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); - when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment2.getBytes())) + when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment2.getBytes(), LOW_COST)) .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty()); when(attachmentDAO.deleteAttachment(any())).thenThrow(new RuntimeException()); @@ -206,9 +208,9 @@ class AttachmentV2MigrationTest { when(attachmentDAO.retrieveAll()).thenReturn(Flux.just( attachment1, attachment2)); - when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment1.getBytes())) + when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment1.getBytes(), LOW_COST)) .thenReturn(Mono.just(BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); - when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment2.getBytes())) + when(blobsStore.save(blobsStore.getDefaultBucketName(), attachment2.getBytes(), LOW_COST)) .thenThrow(new RuntimeException()); when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty()); when(attachmentDAO.deleteAttachment(any())).thenReturn(Mono.empty()); diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java index 7cc3905..e545e53 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java @@ -19,6 +19,8 @@ package org.apache.james.vault.blob; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; + import java.io.InputStream; import java.time.Clock; import java.time.ZonedDateTime; @@ -47,6 +49,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -95,7 +98,7 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault { } private Mono<Void> appendMessage(DeletedMessage deletedMessage, InputStream mimeMessage, BucketName bucketName) { - return blobStore.save(bucketName, mimeMessage) + return blobStore.save(bucketName, mimeMessage, LOW_COST) .map(blobId -> StorageInformation.builder() .bucketName(bucketName) .blobId(blobId)) diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java index 9c4f650..59414d5 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java @@ -25,16 +25,22 @@ import reactor.core.publisher.Mono; public interface BlobStore { - Mono<BlobId> save(BucketName bucketName, byte[] data); + enum StoragePolicy { + SIZE_BASED, + LOW_COST, + HIGH_PERFORMANCE + } + + Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy); - Mono<BlobId> save(BucketName bucketName, InputStream data); + Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy); Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId); InputStream read(BucketName bucketName, BlobId blobId); - default Mono<BlobId> save(BucketName bucketName, String data) { - return save(bucketName, data.getBytes(StandardCharsets.UTF_8)); + default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) { + return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy); } BucketName getDefaultBucketName(); diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java index f2a6d60..a081cec 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java @@ -50,15 +50,15 @@ public class MetricableBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(BucketName bucketName, byte[] data) { + public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) { return metricFactory - .runPublishingTimerMetric(SAVE_BYTES_TIMER_NAME, blobStoreImpl.save(bucketName, data)); + .runPublishingTimerMetric(SAVE_BYTES_TIMER_NAME, blobStoreImpl.save(bucketName, data, storagePolicy)); } @Override - public Mono<BlobId> save(BucketName bucketName, InputStream data) { + public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { return metricFactory - .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(bucketName, data)); + .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(bucketName, data, storagePolicy)); } @Override diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java index 5feef59..f37605c 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java @@ -71,14 +71,16 @@ public interface Store<T, I> { public static class BytesToSave implements ValueToSave { private final byte[] bytes; + private final BlobStore.StoragePolicy storagePolicy; - public BytesToSave(byte[] bytes) { + public BytesToSave(byte[] bytes, BlobStore.StoragePolicy storagePolicy) { this.bytes = bytes; + this.storagePolicy = storagePolicy; } @Override public Mono<BlobId> saveIn(BucketName bucketName, BlobStore blobStore) { - return blobStore.save(bucketName, bytes); + return blobStore.save(bucketName, bytes, storagePolicy); } } diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java index 004c44f..df822b1 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java @@ -19,6 +19,7 @@ package org.apache.james.blob.api; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -47,7 +48,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - assertThatThrownBy(() -> store.save(defaultBucketName, (byte[]) null).block()) + assertThatThrownBy(() -> store.save(defaultBucketName, (byte[]) null, LOW_COST).block()) .isInstanceOf(NullPointerException.class); } @@ -56,7 +57,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - assertThatThrownBy(() -> store.save(defaultBucketName, (String) null).block()) + assertThatThrownBy(() -> store.save(defaultBucketName, (String) null, LOW_COST).block()) .isInstanceOf(NullPointerException.class); } @@ -65,7 +66,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - assertThatThrownBy(() -> store.save(defaultBucketName, (InputStream) null).block()) + assertThatThrownBy(() -> store.save(defaultBucketName, (InputStream) null, LOW_COST).block()) .isInstanceOf(NullPointerException.class); } @@ -74,7 +75,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, EMPTY_BYTEARRAY).block(); + BlobId blobId = store.save(defaultBucketName, EMPTY_BYTEARRAY, LOW_COST).block(); byte[] bytes = store.readBytes(defaultBucketName, blobId).block(); @@ -86,7 +87,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, new String()).block(); + BlobId blobId = store.save(defaultBucketName, new String(), LOW_COST).block(); byte[] bytes = store.readBytes(defaultBucketName, blobId).block(); @@ -98,7 +99,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, new ByteArrayInputStream(EMPTY_BYTEARRAY)).block(); + BlobId blobId = store.save(defaultBucketName, new ByteArrayInputStream(EMPTY_BYTEARRAY), LOW_COST).block(); byte[] bytes = store.readBytes(defaultBucketName, blobId).block(); @@ -110,7 +111,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY).block(); + BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block(); assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66")); } @@ -120,7 +121,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, SHORT_STRING).block(); + BlobId blobId = store.save(defaultBucketName, SHORT_STRING, LOW_COST).block(); assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66")); } @@ -130,7 +131,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, new ByteArrayInputStream(SHORT_BYTEARRAY)).block(); + BlobId blobId = store.save(defaultBucketName, new ByteArrayInputStream(SHORT_BYTEARRAY), LOW_COST).block(); assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66")); } @@ -149,7 +150,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY).block(); + BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block(); byte[] bytes = store.readBytes(defaultBucketName, blobId).block(); @@ -161,7 +162,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, ELEVEN_KILOBYTES).block(); + BlobId blobId = store.save(defaultBucketName, ELEVEN_KILOBYTES, LOW_COST).block(); byte[] bytes = store.readBytes(defaultBucketName, blobId).block(); @@ -173,7 +174,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES).block(); + BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST).block(); byte[] bytes = store.readBytes(defaultBucketName, blobId).block(); @@ -194,7 +195,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY).block(); + BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block(); InputStream read = store.read(defaultBucketName, blobId); @@ -206,7 +207,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, ELEVEN_KILOBYTES).block(); + BlobId blobId = store.save(defaultBucketName, ELEVEN_KILOBYTES, LOW_COST).block(); InputStream read = store.read(defaultBucketName, blobId); @@ -219,7 +220,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt BucketName defaultBucketName = store.getDefaultBucketName(); // 12 MB of text - BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES).block(); + BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST).block(); InputStream read = store.read(defaultBucketName, blobId); diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreContract.java index 5a62c14..a52240e 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreContract.java @@ -19,6 +19,7 @@ package org.apache.james.blob.api; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -51,7 +52,7 @@ public interface BucketBlobStoreContract { default void deleteBucketShouldDeleteExistingBucketWithItsData() { BlobStore store = testee(); - BlobId blobId = store.save(CUSTOM, SHORT_BYTEARRAY).block(); + BlobId blobId = store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST).block(); store.deleteBucket(CUSTOM).block(); assertThatThrownBy(() -> store.read(CUSTOM, blobId)) @@ -62,7 +63,7 @@ public interface BucketBlobStoreContract { default void deleteBucketShouldBeIdempotent() { BlobStore store = testee(); - store.save(CUSTOM, SHORT_BYTEARRAY).block(); + store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST).block(); store.deleteBucket(CUSTOM).block(); assertThatCode(() -> store.deleteBucket(CUSTOM).block()) @@ -73,7 +74,7 @@ public interface BucketBlobStoreContract { default void saveBytesShouldThrowWhenNullBucketName() { BlobStore store = testee(); - assertThatThrownBy(() -> store.save(null, SHORT_BYTEARRAY).block()) + assertThatThrownBy(() -> store.save(null, SHORT_BYTEARRAY, LOW_COST).block()) .isInstanceOf(NullPointerException.class); } @@ -81,7 +82,7 @@ public interface BucketBlobStoreContract { default void saveStringShouldThrowWhenNullBucketName() { BlobStore store = testee(); - assertThatThrownBy(() -> store.save(null, SHORT_STRING).block()) + assertThatThrownBy(() -> store.save(null, SHORT_STRING, LOW_COST).block()) .isInstanceOf(NullPointerException.class); } @@ -89,7 +90,7 @@ public interface BucketBlobStoreContract { default void saveInputStreamShouldThrowWhenNullBucketName() { BlobStore store = testee(); - assertThatThrownBy(() -> store.save(null, new ByteArrayInputStream(SHORT_BYTEARRAY)).block()) + assertThatThrownBy(() -> store.save(null, new ByteArrayInputStream(SHORT_BYTEARRAY), LOW_COST).block()) .isInstanceOf(NullPointerException.class); } @@ -97,7 +98,7 @@ public interface BucketBlobStoreContract { default void readShouldThrowWhenNullBucketName() { BlobStore store = testee(); - BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY).block(); + BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST).block(); assertThatThrownBy(() -> store.read(null, blobId)) .isInstanceOf(NullPointerException.class); } @@ -106,7 +107,7 @@ public interface BucketBlobStoreContract { default void readBytesStreamShouldThrowWhenNullBucketName() { BlobStore store = testee(); - BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY).block(); + BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST).block(); assertThatThrownBy(() -> store.readBytes(null, blobId).block()) .isInstanceOf(NullPointerException.class); } @@ -115,7 +116,7 @@ public interface BucketBlobStoreContract { default void readStringShouldThrowWhenBucketDoesNotExist() { BlobStore store = testee(); - BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY).block(); + BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST).block(); assertThatThrownBy(() -> store.read(CUSTOM, blobId)) .isInstanceOf(ObjectStoreException.class); } @@ -124,7 +125,7 @@ public interface BucketBlobStoreContract { default void readBytesStreamShouldThrowWhenBucketDoesNotExist() { BlobStore store = testee(); - BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY).block(); + BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST).block(); assertThatThrownBy(() -> store.readBytes(CUSTOM, blobId).block()) .isInstanceOf(ObjectStoreException.class); } @@ -133,8 +134,8 @@ public interface BucketBlobStoreContract { default void shouldBeAbleToSaveDataInMultipleBuckets() { BlobStore store = testee(); - BlobId blobIdDefault = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY).block(); - BlobId blobIdCustom = store.save(CUSTOM, SHORT_BYTEARRAY).block(); + BlobId blobIdDefault = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST).block(); + BlobId blobIdCustom = store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST).block(); byte[] bytesDefault = store.readBytes(BucketName.DEFAULT, blobIdDefault).block(); byte[] bytesCustom = store.readBytes(CUSTOM, blobIdCustom).block(); @@ -147,7 +148,7 @@ public interface BucketBlobStoreContract { BlobStore store = testee(); ConcurrentTestRunner.builder() - .operation(((threadNumber, step) -> store.save(CUSTOM, SHORT_STRING + threadNumber + step).block())) + .operation(((threadNumber, step) -> store.save(CUSTOM, SHORT_STRING + threadNumber + step, LOW_COST).block())) .threadCount(10) .operationCount(10) .runSuccessfullyWithin(Duration.ofMinutes(1)); @@ -157,7 +158,7 @@ public interface BucketBlobStoreContract { default void deleteBucketConcurrentlyShouldNotFail() throws Exception { BlobStore store = testee(); - store.save(CUSTOM, SHORT_BYTEARRAY).block(); + store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST).block(); ConcurrentTestRunner.builder() .operation(((threadNumber, step) -> store.deleteBucket(CUSTOM).block())) diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java index 35f7df9..616f355 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java @@ -19,6 +19,7 @@ package org.apache.james.blob.api; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -61,7 +62,7 @@ public interface DeleteBlobStoreContract { BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY).block(); + BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block(); store.delete(defaultBucketName, blobId).block(); assertThatThrownBy(() -> store.read(defaultBucketName, blobId)) @@ -73,7 +74,7 @@ public interface DeleteBlobStoreContract { BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY).block(); + BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block(); store.delete(defaultBucketName, blobId).block(); assertThatCode(() -> store.delete(defaultBucketName, blobId).block()) @@ -85,8 +86,8 @@ public interface DeleteBlobStoreContract { BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobIdToDelete = store.save(defaultBucketName, SHORT_BYTEARRAY).block(); - BlobId otherBlobId = store.save(defaultBucketName, ELEVEN_KILOBYTES).block(); + BlobId blobIdToDelete = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block(); + BlobId otherBlobId = store.save(defaultBucketName, ELEVEN_KILOBYTES, LOW_COST).block(); store.delete(defaultBucketName, blobIdToDelete).block(); @@ -100,7 +101,7 @@ public interface DeleteBlobStoreContract { BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES).block(); + BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST).block(); ConcurrentTestRunner.builder() .operation(((threadNumber, step) -> store.delete(defaultBucketName, blobId).block())) @@ -121,8 +122,8 @@ public interface DeleteBlobStoreContract { BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId customBlobId = store.save(CUSTOM, "custom_string").block(); - BlobId defaultBlobId = store.save(defaultBucketName, SHORT_BYTEARRAY).block(); + BlobId customBlobId = store.save(CUSTOM, "custom_string", LOW_COST).block(); + BlobId defaultBlobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block(); store.delete(CUSTOM, customBlobId).block(); @@ -136,8 +137,8 @@ public interface DeleteBlobStoreContract { BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - store.save(CUSTOM, SHORT_BYTEARRAY).block(); - BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY).block(); + store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST).block(); + BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block(); store.delete(defaultBucketName, blobId).block(); @@ -151,7 +152,7 @@ public interface DeleteBlobStoreContract { BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES).block(); + BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST).block(); ConcurrentTestRunner.builder() .operation(((threadNumber, step) -> { @@ -178,7 +179,7 @@ public interface DeleteBlobStoreContract { BlobStore store = testee(); BucketName defaultBucketName = store.getDefaultBucketName(); - BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES).block(); + BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST).block(); ConcurrentTestRunner.builder() .operation(((threadNumber, step) -> { diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java index 6468328..a6aed4a 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java @@ -19,6 +19,7 @@ package org.apache.james.blob.api; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.apache.james.blob.api.MetricableBlobStore.DELETE_BUCKET_TIMER_NAME; import static org.apache.james.blob.api.MetricableBlobStore.DELETE_TIMER_NAME; import static org.apache.james.blob.api.MetricableBlobStore.READ_BYTES_TIMER_NAME; @@ -61,8 +62,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract { default void saveBytesShouldPublishSaveBytesTimerMetrics() { BlobStore store = testee(); - store.save(store.getDefaultBucketName(), BYTES_CONTENT).block(); - store.save(store.getDefaultBucketName(), BYTES_CONTENT).block(); + store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block(); + store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block(); assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME)) .hasSize(2); @@ -72,8 +73,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract { default void saveStringShouldPublishSaveBytesTimerMetrics() { BlobStore store = testee(); - store.save(store.getDefaultBucketName(), STRING_CONTENT).block(); - store.save(store.getDefaultBucketName(), STRING_CONTENT).block(); + store.save(store.getDefaultBucketName(), STRING_CONTENT, LOW_COST).block(); + store.save(store.getDefaultBucketName(), STRING_CONTENT, LOW_COST).block(); assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME)) .hasSize(2); @@ -83,8 +84,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract { default void saveInputStreamShouldPublishSaveInputStreamTimerMetrics() { BlobStore store = testee(); - store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT)).block(); - store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT)).block(); + store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT), LOW_COST).block(); + store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT), LOW_COST).block(); assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_INPUT_STREAM_TIMER_NAME)) .hasSize(2); @@ -94,7 +95,7 @@ public interface MetricableBlobStoreContract extends BlobStoreContract { default void readBytesShouldPublishReadBytesTimerMetrics() { BlobStore store = testee(); - BlobId blobId = store.save(store.getDefaultBucketName(), BYTES_CONTENT).block(); + BlobId blobId = store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block(); store.readBytes(store.getDefaultBucketName(), blobId).block(); store.readBytes(store.getDefaultBucketName(), blobId).block(); @@ -106,7 +107,7 @@ public interface MetricableBlobStoreContract extends BlobStoreContract { default void readShouldPublishReadTimerMetrics() { BlobStore store = testee(); - BlobId blobId = store.save(store.getDefaultBucketName(), BYTES_CONTENT).block(); + BlobId blobId = store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block(); store.read(store.getDefaultBucketName(), blobId); store.read(store.getDefaultBucketName(), blobId); @@ -119,8 +120,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract { BlobStore store = testee(); BucketName bucketName = BucketName.of("custom"); - store.save(BucketName.DEFAULT, BYTES_CONTENT).block(); - store.save(bucketName, BYTES_CONTENT).block(); + store.save(BucketName.DEFAULT, BYTES_CONTENT, LOW_COST).block(); + store.save(bucketName, BYTES_CONTENT, LOW_COST).block(); store.deleteBucket(bucketName).block(); @@ -132,8 +133,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract { default void deleteShouldPublishDeleteTimerMetrics() { BlobStore store = testee(); - BlobId blobId1 = store.save(store.getDefaultBucketName(), BYTES_CONTENT).block(); - BlobId blobId2 = store.save(store.getDefaultBucketName(), BYTES_CONTENT).block(); + BlobId blobId1 = store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block(); + BlobId blobId2 = store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block(); store.delete(BucketName.DEFAULT, blobId1).block(); store.delete(BucketName.DEFAULT, blobId2).block(); diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java index 7f7efe3..cd179f4 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java @@ -76,7 +76,7 @@ public class CassandraBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(BucketName bucketName, byte[] data) { + public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) { Preconditions.checkNotNull(data); return saveAsMono(bucketName, data); @@ -144,7 +144,7 @@ public class CassandraBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(BucketName bucketName, InputStream data) { + public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { Preconditions.checkNotNull(data); return Mono.fromCallable(() -> IOUtils.toByteArray(data)) .flatMap(bytes -> saveAsMono(bucketName, bytes)); diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java index 86b0c69..63c774e 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java @@ -19,6 +19,7 @@ package org.apache.james.blob.cassandra; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.spy; @@ -87,7 +88,7 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract { @Test void readBytesShouldReturnSplitSavedDataByChunk() { String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE); - BlobId blobId = testee.save(testee.getDefaultBucketName(), longString).block(); + BlobId blobId = testee.save(testee.getDefaultBucketName(), longString, LOW_COST).block(); byte[] bytes = testee.readBytes(testee.getDefaultBucketName(), blobId).block(); @@ -98,7 +99,7 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract { void readBytesShouldNotReturnInvalidResultsWhenPartialDataPresent() { int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE; String longString = Strings.repeat("0123456789\n", repeatCount); - BlobId blobId = testee.save(testee.getDefaultBucketName(), longString).block(); + BlobId blobId = testee.save(testee.getDefaultBucketName(), longString, LOW_COST).block(); when(defaultBucketDAO.readPart(blobId, 1)).thenReturn(Mono.empty()); @@ -111,7 +112,7 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract { void readShouldNotReturnInvalidResultsWhenPartialDataPresent() { int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE; String longString = Strings.repeat("0123456789\n", repeatCount); - BlobId blobId = testee.save(testee.getDefaultBucketName(), longString).block(); + BlobId blobId = testee.save(testee.getDefaultBucketName(), longString, LOW_COST).block(); when(defaultBucketDAO.readPart(blobId, 1)).thenReturn(Mono.empty()); @@ -131,7 +132,7 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract { void blobStoreShouldSupport100MBBlob() throws IOException { ZeroedInputStream data = new ZeroedInputStream(100_000_000); HashingInputStream writeHash = new HashingInputStream(Hashing.sha256(), data); - BlobId blobId = testee.save(testee.getDefaultBucketName(), writeHash).block(); + BlobId blobId = testee.save(testee.getDefaultBucketName(), writeHash, LOW_COST).block(); InputStream bytes = testee.read(testee.getDefaultBucketName(), blobId); HashingInputStream readHash = new HashingInputStream(Hashing.sha256(), bytes); diff --git a/server/blob/blob-export-file/src/test/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanismTest.java b/server/blob/blob-export-file/src/test/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanismTest.java index ca0cce5..174b2c3 100644 --- a/server/blob/blob-export-file/src/test/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanismTest.java +++ b/server/blob/blob-export-file/src/test/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanismTest.java @@ -19,6 +19,7 @@ package org.apache.james.blob.export.file; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; @@ -79,7 +80,7 @@ class LocalFileBlobExportMechanismTest { @Test void exportingBlobShouldSendAMail() { - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST).block(); String explanation = "The content of a deleted message vault had been shared with you."; testee.blobId(blobId) @@ -112,7 +113,7 @@ class LocalFileBlobExportMechanismTest { @Test void exportingBlobShouldCreateAFileWithTheCorrespondingContent(FileSystem fileSystem) { - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST).block(); testee.blobId(blobId) .with(MailAddressFixture.RECIPIENT1) @@ -150,7 +151,7 @@ class LocalFileBlobExportMechanismTest { @Test void exportingBlobShouldCreateAFileWithoutExtensionWhenNotDeclaringExtension() { - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST).block(); testee.blobId(blobId) .with(MailAddressFixture.RECIPIENT1) @@ -174,7 +175,7 @@ class LocalFileBlobExportMechanismTest { @Test void exportingBlobShouldCreateAFileWithExtensionWhenDeclaringExtension() { - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST).block(); testee.blobId(blobId) .with(MailAddressFixture.RECIPIENT1) @@ -199,7 +200,7 @@ class LocalFileBlobExportMechanismTest { @Test void exportingBlobShouldCreateAFileWithPrefixWhenDeclaringPrefix() { - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST).block(); String filePrefix = "deleted-message-of-...@james.org"; testee.blobId(blobId) diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java index 6d107ac..bd2615c 100644 --- a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java +++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java @@ -57,7 +57,7 @@ public class MemoryBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(BucketName bucketName, byte[] data) { + public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) { Preconditions.checkNotNull(bucketName); Preconditions.checkNotNull(data); @@ -72,12 +72,12 @@ public class MemoryBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(BucketName bucketName, InputStream data) { + public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { Preconditions.checkNotNull(bucketName); Preconditions.checkNotNull(data); try { byte[] bytes = IOUtils.toByteArray(data); - return save(bucketName, bytes); + return save(bucketName, bytes, storagePolicy); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java index fba4fa1..3c45fe8 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java @@ -97,7 +97,7 @@ public class ObjectStorageBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(BucketName bucketName, byte[] data) { + public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) { Preconditions.checkNotNull(data); ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); @@ -116,19 +116,19 @@ public class ObjectStorageBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(BucketName bucketName, InputStream data) { + public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { Preconditions.checkNotNull(data); - return Mono.defer(() -> savingStrategySelection(bucketName, data)); + return Mono.defer(() -> savingStrategySelection(bucketName, data, storagePolicy)); } - private Mono<BlobId> savingStrategySelection(BucketName bucketName, InputStream data) { + private Mono<BlobId> savingStrategySelection(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { InputStream bufferedData = new BufferedInputStream(data, BUFFERED_SIZE + 1); try { if (isItABigStream(bufferedData)) { return saveBigStream(bucketName, bufferedData); } else { - return save(bucketName, IOUtils.toByteArray(bufferedData)); + return save(bucketName, IOUtils.toByteArray(bufferedData), storagePolicy); } } catch (IOException e) { throw new RuntimeException(e); diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreContract.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreContract.java index 2e96f52..a5b1fec 100644 --- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreContract.java +++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreContract.java @@ -19,6 +19,7 @@ package org.apache.james.blob.objectstorage; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.assertj.core.api.Assertions.assertThat; import java.io.InputStream; @@ -38,7 +39,7 @@ public interface ObjectStorageBlobStoreContract { default void assertBlobStoreCanStoreAndRetrieve(ObjectStorageBlobStoreBuilder.ReadyToBuild builder) { ObjectStorageBlobStore blobStore = builder.build(); - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), CONTENT).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), CONTENT, LOW_COST).block(); InputStream inputStream = blobStore.read(blobStore.getDefaultBucketName(), blobId); assertThat(inputStream).hasSameContentAs(IOUtils.toInputStream(CONTENT, StandardCharsets.UTF_8)); diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java index e342525..263e105 100644 --- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java +++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java @@ -19,6 +19,7 @@ package org.apache.james.blob.objectstorage; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.assertj.core.api.Assertions.assertThat; import java.io.ByteArrayInputStream; @@ -119,7 +120,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract { .namespace(defaultBucketName) .build(); String content = "James is the best!"; - BlobId blobId = encryptedBlobStore.save(encryptedBlobStore.getDefaultBucketName(), content).block(); + BlobId blobId = encryptedBlobStore.save(encryptedBlobStore.getDefaultBucketName(), content, LOW_COST).block(); InputStream read = encryptedBlobStore.read(encryptedBlobStore.getDefaultBucketName(), blobId); String expectedContent = IOUtils.toString(read, Charsets.UTF_8); @@ -135,7 +136,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract { .namespace(defaultBucketName) .build(); String content = "James is the best!"; - BlobId blobId = encryptedBlobStore.save(encryptedBlobStore.getDefaultBucketName(), content).block(); + BlobId blobId = encryptedBlobStore.save(encryptedBlobStore.getDefaultBucketName(), content, LOW_COST).block(); InputStream encryptedIs = testee.read(encryptedBlobStore.getDefaultBucketName(), blobId); assertThat(encryptedIs).isNotNull(); @@ -150,7 +151,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract { @Test void deleteBucketShouldDeleteSwiftContainer() { BucketName bucketName = BucketName.of("azerty"); - objectStorageBlobStore.save(bucketName, "data").block(); + objectStorageBlobStore.save(bucketName, "data", LOW_COST).block(); objectStorageBlobStore.deleteBucket(bucketName).block(); @@ -177,7 +178,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract { void saveBytesShouldNotCompleteWhenDoesNotAwait() { // String need to be big enough to get async thread busy hence could not return result instantly Mono<BlobId> blobIdFuture = testee - .save(testee.getDefaultBucketName(), BIG_STRING.getBytes(StandardCharsets.UTF_8)) + .save(testee.getDefaultBucketName(), BIG_STRING.getBytes(StandardCharsets.UTF_8), LOW_COST) .subscribeOn(Schedulers.elastic()); assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @@ -185,7 +186,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract { @Test void saveStringShouldNotCompleteWhenDoesNotAwait() { Mono<BlobId> blobIdFuture = testee - .save(testee.getDefaultBucketName(), BIG_STRING) + .save(testee.getDefaultBucketName(), BIG_STRING, LOW_COST) .subscribeOn(Schedulers.elastic()); assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @@ -193,14 +194,14 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract { @Test void saveInputStreamShouldNotCompleteWhenDoesNotAwait() { Mono<BlobId> blobIdFuture = testee - .save(testee.getDefaultBucketName(), new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8))) + .save(testee.getDefaultBucketName(), new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)), LOW_COST) .subscribeOn(Schedulers.elastic()); assertThat(blobIdFuture.toFuture()).isNotCompleted(); } @Test void readBytesShouldNotCompleteWhenDoesNotAwait() { - BlobId blobId = testee().save(testee.getDefaultBucketName(), BIG_STRING).block(); + BlobId blobId = testee().save(testee.getDefaultBucketName(), BIG_STRING, LOW_COST).block(); Mono<byte[]> resultFuture = testee.readBytes(testee.getDefaultBucketName(), blobId).subscribeOn(Schedulers.elastic()); assertThat(resultFuture.toFuture()).isNotCompleted(); } diff --git a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java index b822b02..37f7c8a 100644 --- a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java +++ b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.PushbackInputStream; import java.util.Optional; -import java.util.function.BiFunction; import java.util.function.Function; import org.apache.james.blob.api.BlobId; @@ -42,6 +41,11 @@ import reactor.core.publisher.Mono; public class UnionBlobStore implements BlobStore { @FunctionalInterface + public interface StorageOperation<T> { + Mono<BlobId> save(BucketName bucketName, T data, StoragePolicy storagePolicy); + } + + @FunctionalInterface public interface RequireCurrent { RequireLegacy current(BlobStore blobStore); } @@ -83,26 +87,26 @@ public class UnionBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(BucketName bucketName, byte[] data) { + public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) { try { - return saveToCurrentFallbackIfFails(bucketName, data, + return saveToCurrentFallbackIfFails(bucketName, data, storagePolicy, currentBlobStore::save, legacyBlobStore::save); } catch (Exception e) { LOGGER.error("exception directly happens while saving bytes data, fall back to legacy blob store", e); - return legacyBlobStore.save(bucketName, data); + return legacyBlobStore.save(bucketName, data, storagePolicy); } } @Override - public Mono<BlobId> save(BucketName bucketName, String data) { + public Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) { try { - return saveToCurrentFallbackIfFails(bucketName, data, + return saveToCurrentFallbackIfFails(bucketName, data, storagePolicy, currentBlobStore::save, legacyBlobStore::save); } catch (Exception e) { LOGGER.error("exception directly happens while saving String data, fall back to legacy blob store", e); - return legacyBlobStore.save(bucketName, data); + return legacyBlobStore.save(bucketName, data, storagePolicy); } } @@ -118,14 +122,14 @@ public class UnionBlobStore implements BlobStore { } @Override - public Mono<BlobId> save(BucketName bucketName, InputStream data) { + public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { try { - return saveToCurrentFallbackIfFails(bucketName, data, + return saveToCurrentFallbackIfFails(bucketName, data, storagePolicy, currentBlobStore::save, legacyBlobStore::save); } catch (Exception e) { LOGGER.error("exception directly happens while saving InputStream data, fall back to legacy blob store", e); - return legacyBlobStore.save(bucketName, data); + return legacyBlobStore.save(bucketName, data, storagePolicy); } } @@ -190,12 +194,13 @@ public class UnionBlobStore implements BlobStore { private <T> Mono<BlobId> saveToCurrentFallbackIfFails( BucketName bucketName, T data, - BiFunction<BucketName, T, Mono<BlobId>> currentSavingOperation, - BiFunction<BucketName, T, Mono<BlobId>> fallbackSavingOperationSupplier) { + StoragePolicy storagePolicy, + StorageOperation<T> currentSavingOperation, + StorageOperation<T> fallbackSavingOperationSupplier) { - return Mono.defer(() -> currentSavingOperation.apply(bucketName, data)) + return Mono.defer(() -> currentSavingOperation.save(bucketName, data, storagePolicy)) .onErrorResume(this::logAndReturnEmpty) - .switchIfEmpty(Mono.defer(() -> fallbackSavingOperationSupplier.apply(bucketName, data))); + .switchIfEmpty(Mono.defer(() -> fallbackSavingOperationSupplier.save(bucketName, data, storagePolicy))); } private <T> Mono<T> logAndReturnEmpty(Throwable throwable) { diff --git a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java index e981edf..e8ccd81 100644 --- a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java +++ b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java @@ -19,6 +19,8 @@ package org.apache.james.blob.union; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LowCost; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LowCost; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -55,25 +57,24 @@ import reactor.core.publisher.Mono; class UnionBlobStoreTest implements BlobStoreContract { private static class FailingBlobStore implements BlobStore { - @Override - public Mono<BlobId> save(BucketName bucketName, byte[] data) { + public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { return Mono.error(new RuntimeException("broken everywhere")); } @Override - public Mono<BlobId> save(BucketName bucketName, String data) { + public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) { return Mono.error(new RuntimeException("broken everywhere")); } @Override - public BucketName getDefaultBucketName() { - return BucketName.DEFAULT; + public Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) { + return Mono.error(new RuntimeException("broken everywhere")); } @Override - public Mono<BlobId> save(BucketName bucketName, InputStream data) { - return Mono.error(new RuntimeException("broken everywhere")); + public BucketName getDefaultBucketName() { + return BucketName.DEFAULT; } @Override @@ -106,12 +107,12 @@ class UnionBlobStoreTest implements BlobStoreContract { private static class ThrowingBlobStore implements BlobStore { @Override - public Mono<BlobId> save(BucketName bucketName, byte[] data) { + public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) { throw new RuntimeException("broken everywhere"); } @Override - public Mono<BlobId> save(BucketName bucketName, String data) { + public Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) { throw new RuntimeException("broken everywhere"); } @@ -121,7 +122,7 @@ class UnionBlobStoreTest implements BlobStoreContract { } @Override - public Mono<BlobId> save(BucketName bucketName, InputStream data) { + public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) { throw new RuntimeException("broken everywhere"); } @@ -190,7 +191,7 @@ class UnionBlobStoreTest implements BlobStoreContract { .current(new ThrowingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(unionBlobStore.read(unionBlobStore.getDefaultBucketName(), blobId)) @@ -207,7 +208,7 @@ class UnionBlobStoreTest implements BlobStoreContract { .current(new ThrowingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), new ByteArrayInputStream(BLOB_CONTENT)).block(); + BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), new ByteArrayInputStream(BLOB_CONTENT), LowCost).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(unionBlobStore.read(unionBlobStore.getDefaultBucketName(), blobId)) @@ -228,7 +229,7 @@ class UnionBlobStoreTest implements BlobStoreContract { .current(new FailingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(unionBlobStore.read(unionBlobStore.getDefaultBucketName(), blobId)) @@ -245,7 +246,7 @@ class UnionBlobStoreTest implements BlobStoreContract { .current(new FailingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), new ByteArrayInputStream(BLOB_CONTENT)).block(); + BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), new ByteArrayInputStream(BLOB_CONTENT), LowCost).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(unionBlobStore.read(unionBlobStore.getDefaultBucketName(), blobId)) @@ -267,7 +268,7 @@ class UnionBlobStoreTest implements BlobStoreContract { .current(new ThrowingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); assertThat(unionBlobStore.read(unionBlobStore.getDefaultBucketName(), blobId)) .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); @@ -281,7 +282,7 @@ class UnionBlobStoreTest implements BlobStoreContract { .current(new ThrowingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); assertThat(unionBlobStore.readBytes(unionBlobStore.getDefaultBucketName(), blobId).block()) .isEqualTo(BLOB_CONTENT); @@ -299,7 +300,7 @@ class UnionBlobStoreTest implements BlobStoreContract { .current(new FailingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); assertThat(unionBlobStore.read(unionBlobStore.getDefaultBucketName(), blobId)) .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); @@ -312,7 +313,7 @@ class UnionBlobStoreTest implements BlobStoreContract { .current(new FailingBlobStore()) .legacy(legacyBlobStore) .build(); - BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); assertThat(unionBlobStore.readBytes(unionBlobStore.getDefaultBucketName(), blobId).block()) .isEqualTo(BLOB_CONTENT); @@ -326,9 +327,9 @@ class UnionBlobStoreTest implements BlobStoreContract { Stream<Function<UnionBlobStore, Mono<?>>> blobStoreOperationsReturnFutures() { return Stream.of( - blobStore -> blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT), - blobStore -> blobStore.save(blobStore.getDefaultBucketName(), STRING_CONTENT), - blobStore -> blobStore.save(blobStore.getDefaultBucketName(), new ByteArrayInputStream(BLOB_CONTENT)), + blobStore -> blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost), + blobStore -> blobStore.save(blobStore.getDefaultBucketName(), STRING_CONTENT, LowCost), + blobStore -> blobStore.save(blobStore.getDefaultBucketName(), new ByteArrayInputStream(BLOB_CONTENT), LowCost), blobStore -> blobStore.readBytes(blobStore.getDefaultBucketName(), BLOB_ID_FACTORY.randomId())); } @@ -390,7 +391,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void readShouldReturnFromCurrentWhenAvailable() { - BlobId blobId = currentBlobStore.save(currentBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = currentBlobStore.save(currentBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); assertThat(unionBlobStore.read(unionBlobStore.getDefaultBucketName(), blobId)) .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); @@ -398,7 +399,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void readShouldReturnFromLegacyWhenCurrentNotAvailable() { - BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); assertThat(unionBlobStore.read(unionBlobStore.getDefaultBucketName(), blobId)) .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); @@ -406,7 +407,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void readBytesShouldReturnFromCurrentWhenAvailable() { - BlobId blobId = currentBlobStore.save(currentBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = currentBlobStore.save(currentBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); assertThat(unionBlobStore.readBytes(currentBlobStore.getDefaultBucketName(), blobId).block()) .isEqualTo(BLOB_CONTENT); @@ -414,7 +415,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void readBytesShouldReturnFromLegacyWhenCurrentNotAvailable() { - BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = legacyBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); assertThat(unionBlobStore.readBytes(unionBlobStore.getDefaultBucketName(), blobId).block()) .isEqualTo(BLOB_CONTENT); @@ -422,7 +423,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void saveShouldWriteToCurrent() { - BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); assertThat(currentBlobStore.readBytes(currentBlobStore.getDefaultBucketName(), blobId).block()) .isEqualTo(BLOB_CONTENT); @@ -430,7 +431,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void saveShouldNotWriteToLegacy() { - BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT).block(); + BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), BLOB_CONTENT, LowCost).block(); assertThatThrownBy(() -> legacyBlobStore.readBytes(legacyBlobStore.getDefaultBucketName(), blobId).block()) .isInstanceOf(ObjectStoreException.class); @@ -438,7 +439,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void saveStringShouldWriteToCurrent() { - BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), STRING_CONTENT).block(); + BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), STRING_CONTENT, LowCost).block(); assertThat(currentBlobStore.readBytes(currentBlobStore.getDefaultBucketName(), blobId).block()) .isEqualTo(BLOB_CONTENT); @@ -446,7 +447,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void saveStringShouldNotWriteToLegacy() { - BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), STRING_CONTENT).block(); + BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), STRING_CONTENT, LowCost).block(); assertThatThrownBy(() -> legacyBlobStore.readBytes(legacyBlobStore.getDefaultBucketName(), blobId).block()) .isInstanceOf(ObjectStoreException.class); @@ -454,7 +455,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void saveInputStreamShouldWriteToCurrent() { - BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), new ByteArrayInputStream(BLOB_CONTENT)).block(); + BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), new ByteArrayInputStream(BLOB_CONTENT), LowCost).block(); assertThat(currentBlobStore.readBytes(currentBlobStore.getDefaultBucketName(), blobId).block()) .isEqualTo(BLOB_CONTENT); @@ -462,7 +463,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void saveInputStreamShouldNotWriteToLegacy() { - BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), new ByteArrayInputStream(BLOB_CONTENT)).block(); + BlobId blobId = unionBlobStore.save(unionBlobStore.getDefaultBucketName(), new ByteArrayInputStream(BLOB_CONTENT), LowCost).block(); assertThatThrownBy(() -> legacyBlobStore.readBytes(legacyBlobStore.getDefaultBucketName(), blobId).block()) .isInstanceOf(ObjectStoreException.class); @@ -512,8 +513,8 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void deleteBucketShouldDeleteBothCurrentAndLegacyBuckets() { - BlobId legacyBlobId = legacyBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT).block(); - BlobId currentBlobId = currentBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT).block(); + BlobId legacyBlobId = legacyBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT, LowCost).block(); + BlobId currentBlobId = currentBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT, LowCost).block(); unionBlobStore.deleteBucket(BucketName.DEFAULT).block(); @@ -525,7 +526,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void deleteBucketShouldDeleteCurrentBucketEvenWhenLegacyDoesNotExist() { - BlobId currentBlobId = currentBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT).block(); + BlobId currentBlobId = currentBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT, LowCost).block(); unionBlobStore.deleteBucket(BucketName.DEFAULT).block(); @@ -535,7 +536,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void deleteBucketShouldDeleteLegacyBucketEvenWhenCurrentDoesNotExist() { - BlobId legacyBlobId = legacyBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT).block(); + BlobId legacyBlobId = legacyBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT, LowCost).block(); unionBlobStore.deleteBucket(BucketName.DEFAULT).block(); @@ -564,8 +565,8 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void deleteShouldDeleteBothCurrentAndLegacyBlob() { - BlobId legacyBlobId = legacyBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT).block(); - BlobId currentBlobId = currentBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT).block(); + BlobId legacyBlobId = legacyBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT, LowCost).block(); + BlobId currentBlobId = currentBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT, LowCost).block(); unionBlobStore.delete(BucketName.DEFAULT, currentBlobId).block(); @@ -577,7 +578,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void deleteShouldDeleteCurrentBlobEvenWhenLegacyDoesNotExist() { - BlobId currentBlobId = currentBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT).block(); + BlobId currentBlobId = currentBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT, LowCost).block(); unionBlobStore.delete(BucketName.DEFAULT, currentBlobId).block(); @@ -587,7 +588,7 @@ class UnionBlobStoreTest implements BlobStoreContract { @Test void deleteShouldDeleteLegacyBlobEvenWhenCurrentDoesNotExist() { - BlobId legacyBlobId = legacyBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT).block(); + BlobId legacyBlobId = legacyBlobStore.save(BucketName.DEFAULT, BLOB_CONTENT, LowCost).block(); unionBlobStore.delete(BucketName.DEFAULT, legacyBlobId).block(); diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java index f0c2480..ad11196 100644 --- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java +++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java @@ -20,6 +20,8 @@ package org.apache.james.blob.mail; import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED; import static org.apache.james.blob.mail.MimeMessagePartsId.BODY_BLOB_TYPE; import static org.apache.james.blob.mail.MimeMessagePartsId.HEADER_BLOB_TYPE; @@ -76,8 +78,8 @@ public class MimeMessageStore { byte[] headerBytes = getHeaderBytes(messageAsArray, bodyStartOctet); byte[] bodyBytes = getBodyBytes(messageAsArray, bodyStartOctet); return Stream.of( - Pair.of(HEADER_BLOB_TYPE, new Store.Impl.BytesToSave(headerBytes)), - Pair.of(BODY_BLOB_TYPE, new Store.Impl.BytesToSave(bodyBytes))); + Pair.of(HEADER_BLOB_TYPE, new Store.Impl.BytesToSave(headerBytes, SIZE_BASED)), + Pair.of(BODY_BLOB_TYPE, new Store.Impl.BytesToSave(bodyBytes, LOW_COST))); } catch (MessagingException | IOException e) { throw new RuntimeException(e); } diff --git a/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/swift/ObjectStorageBlobStoreModuleTest.java b/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/swift/ObjectStorageBlobStoreModuleTest.java index 7911d16..05a386a 100644 --- a/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/swift/ObjectStorageBlobStoreModuleTest.java +++ b/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/swift/ObjectStorageBlobStoreModuleTest.java @@ -19,6 +19,7 @@ package org.apache.james.modules.objectstorage.swift; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.assertj.core.api.Assertions.assertThatCode; import java.util.Optional; @@ -137,7 +138,7 @@ class ObjectStorageBlobStoreModuleTest { BlobStore blobStore = injector.getInstance(Key.get(BlobStore.class, Names.named(MetricableBlobStore.BLOB_STORE_IMPLEMENTATION))); - assertThatCode(() -> blobStore.save(blobStore.getDefaultBucketName(), new byte[] {0x00})).doesNotThrowAnyException(); + assertThatCode(() -> blobStore.save(blobStore.getDefaultBucketName(), new byte[] {0x00}, LOW_COST)).doesNotThrowAnyException(); } } diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java index 1c03fdf..34d55f1 100644 --- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java +++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java @@ -19,6 +19,8 @@ package org.apache.james.webadmin.vault.routes; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; + import java.io.IOException; import java.util.Optional; import java.util.function.Predicate; @@ -85,7 +87,7 @@ class ExportService { try (FileBackedOutputStream fileOutputStream = new FileBackedOutputStream(FileUtils.ONE_MB_BI.intValue())) { zipper.zip(contentLoader(username), messages.toStream(), fileOutputStream); ByteSource byteSource = fileOutputStream.asByteSource(); - return blobStore.save(blobStore.getDefaultBucketName(), byteSource.openStream()).block(); + return blobStore.save(blobStore.getDefaultBucketName(), byteSource.openStream(), LOW_COST).block(); } } diff --git a/third-party/linshare/src/test/java/org/apache/james/linshare/LinshareBlobExportMechanismTest.java b/third-party/linshare/src/test/java/org/apache/james/linshare/LinshareBlobExportMechanismTest.java index 542900a..729bbeb 100644 --- a/third-party/linshare/src/test/java/org/apache/james/linshare/LinshareBlobExportMechanismTest.java +++ b/third-party/linshare/src/test/java/org/apache/james/linshare/LinshareBlobExportMechanismTest.java @@ -20,6 +20,7 @@ package org.apache.james.linshare; import static io.restassured.RestAssured.given; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.apache.james.linshare.LinshareFixture.USER_1; import static org.apache.james.linshare.LinshareFixture.USER_2; import static org.assertj.core.api.Assertions.assertThat; @@ -74,7 +75,7 @@ class LinshareBlobExportMechanismTest { @Test void exportShouldShareTheDocumentViaLinshare() throws Exception { - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT, LOW_COST).block(); String filePrefix = "deleted-message-of-...@james.org-"; testee.blobId(blobId) @@ -93,7 +94,7 @@ class LinshareBlobExportMechanismTest { @Test void exportShouldSendAnEmailToSharee() throws Exception { - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT, LOW_COST).block(); testee.blobId(blobId) .with(new MailAddress(USER_2.getUsername())) @@ -122,7 +123,7 @@ class LinshareBlobExportMechanismTest { @Test void exportShouldShareTheDocumentAndAllowDownloadViaLinshare() throws Exception { - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT, LOW_COST).block(); testee.blobId(blobId) .with(new MailAddress(USER_2.getUsername())) @@ -152,7 +153,7 @@ class LinshareBlobExportMechanismTest { @Test void exportWithFilePrefixShouldCreateFileWithCustomPrefix() throws Exception { - BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT).block(); + BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT, LOW_COST).block(); String filePrefix = "deleted-message-of-...@james.org"; testee.blobId(blobId) --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org