JAMES-2082 Add configuration options for blob part size and on the fly migration
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9c46faae Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9c46faae Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9c46faae Branch: refs/heads/master Commit: 9c46faae934374892bfe550aaa80a8bc20dfae8e Parents: 8cb1905 Author: benwa <btell...@linagora.com> Authored: Fri Jul 7 08:01:43 2017 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Mon Jul 10 14:23:56 2017 +0200 ---------------------------------------------------------------------- .../cassandra/CassandraConfiguration.java | 54 ++++++++++++++++++-- .../cassandra/CassandraConfigurationTest.java | 6 +++ .../destination/conf/cassandra.properties | 4 +- .../destination/conf/cassandra.properties | 4 +- .../cassandra/mail/CassandraBlobsDAO.java | 17 ++++-- .../cassandra/mail/CassandraMessageDAOV2.java | 14 +++-- .../mail/CassandraMessageIdMapper.java | 2 +- .../cassandra/mail/CassandraMessageMapper.java | 3 +- .../mail/migration/V1ToV2Migration.java | 22 +++++--- .../cassandra/mail/CassandraBlobsDAOTest.java | 9 +++- .../mail/migration/V1ToV2MigrationTest.java | 5 +- .../modules/mailbox/CassandraSessionModule.java | 6 +++ .../mailbox/CassandraSessionModuleTest.java | 2 + .../modules/mailbox/cassandra.properties | 2 + src/site/xdoc/server/config-cassandra.xml | 4 ++ 15 files changed, 127 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java index ff3cad9..6ca8487 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java @@ -38,6 +38,8 @@ public class CassandraConfiguration { public static final int DEFAULT_UID_MAX_RETRY = 100000; public static final int DEFAULT_ACL_MAX_RETRY = 1000; public static final int DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW = 100; + public static final boolean DEFAULT_ON_THE_FLY_MIGRATION_V1_TO_V2 = false; + public static final int DEFAULT_BLOB_PART_SIZE = 100 * 1024; public static class Builder { private Optional<Integer> messageReadChunkSize = Optional.empty(); @@ -49,6 +51,8 @@ public class CassandraConfiguration { private Optional<Integer> uidMaxRetry = Optional.empty(); private Optional<Integer> aclMaxRetry = Optional.empty(); private Optional<Integer> fetchNextPageInAdvanceRow = Optional.empty(); + private Optional<Integer> blobPartSize = Optional.empty(); + private Optional<Boolean> onTheFlyV1ToV2Migration = Optional.empty(); public Builder messageReadChunkSize(int value) { Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive"); @@ -104,6 +108,17 @@ public class CassandraConfiguration { return this; } + public Builder blobPartSize(int value) { + Preconditions.checkArgument(value > 0, "blobPartSize needs to be strictly positive"); + this.blobPartSize = Optional.of(value); + return this; + } + + public Builder onTheFlyV1ToV2Migration(boolean value) { + this.onTheFlyV1ToV2Migration = Optional.of(value); + return this; + } + public Builder messageReadChunkSize(Optional<Integer> value) { value.ifPresent(this::messageReadChunkSize); return this; @@ -149,6 +164,16 @@ public class CassandraConfiguration { return this; } + public Builder blobPartSize(Optional<Integer> value) { + value.ifPresent(this::blobPartSize); + return this; + } + + public Builder onTheFlyV1ToV2Migration(Optional<Boolean> value) { + value.ifPresent(this::onTheFlyV1ToV2Migration); + return this; + } + public CassandraConfiguration build() { return new CassandraConfiguration(aclMaxRetry.orElse(DEFAULT_ACL_MAX_RETRY), messageReadChunkSize.orElse(DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ), @@ -158,7 +183,9 @@ public class CassandraConfiguration { flagsUpdateMessageMaxRetry.orElse(DEFAULT_FLAGS_UPDATE_MESSAGE_MAX_RETRY), modSeqMaxRetry.orElse(DEFAULT_MODSEQ_MAX_RETRY), uidMaxRetry.orElse(DEFAULT_UID_MAX_RETRY), - fetchNextPageInAdvanceRow.orElse(DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW)); + fetchNextPageInAdvanceRow.orElse(DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW), + blobPartSize.orElse(DEFAULT_BLOB_PART_SIZE), + onTheFlyV1ToV2Migration.orElse(DEFAULT_ON_THE_FLY_MIGRATION_V1_TO_V2)); } } @@ -175,11 +202,13 @@ public class CassandraConfiguration { private final int uidMaxRetry; private final int aclMaxRetry; private final int fetchNextPageInAdvanceRow; + private final int blobPartSize; + private final boolean onTheFlyV1ToV2Migration; @VisibleForTesting CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize, int flagsUpdateChunkSize, - int flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry, int modSeqMaxRetry, - int uidMaxRetry, int fetchNextPageInAdvanceRow) { + int flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry, int modSeqMaxRetry, + int uidMaxRetry, int fetchNextPageInAdvanceRow, int blobPartSize, boolean onTheFlyV1ToV2Migration) { this.aclMaxRetry = aclMaxRetry; this.messageReadChunkSize = messageReadChunkSize; this.expungeChunkSize = expungeChunkSize; @@ -189,6 +218,16 @@ public class CassandraConfiguration { this.uidMaxRetry = uidMaxRetry; this.fetchNextPageInAdvanceRow = fetchNextPageInAdvanceRow; this.flagsUpdateChunkSize = flagsUpdateChunkSize; + this.blobPartSize = blobPartSize; + this.onTheFlyV1ToV2Migration = onTheFlyV1ToV2Migration; + } + + public int getBlobPartSize() { + return blobPartSize; + } + + public boolean isOnTheFlyV1ToV2Migration() { + return onTheFlyV1ToV2Migration; } public int getFlagsUpdateChunkSize() { @@ -240,7 +279,9 @@ public class CassandraConfiguration { && Objects.equals(this.modSeqMaxRetry, that.modSeqMaxRetry) && Objects.equals(this.uidMaxRetry, that.uidMaxRetry) && Objects.equals(this.flagsUpdateChunkSize, that.flagsUpdateChunkSize) - && Objects.equals(this.fetchNextPageInAdvanceRow, that.fetchNextPageInAdvanceRow); + && Objects.equals(this.fetchNextPageInAdvanceRow, that.fetchNextPageInAdvanceRow) + && Objects.equals(this.blobPartSize, that.blobPartSize) + && Objects.equals(this.onTheFlyV1ToV2Migration, that.onTheFlyV1ToV2Migration); } return false; } @@ -248,7 +289,8 @@ public class CassandraConfiguration { @Override public final int hashCode() { return Objects.hash(aclMaxRetry, messageReadChunkSize, expungeChunkSize, flagsUpdateMessageIdMaxRetry, - flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow, flagsUpdateChunkSize); + flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow, flagsUpdateChunkSize, + blobPartSize, onTheFlyV1ToV2Migration); } @Override @@ -263,6 +305,8 @@ public class CassandraConfiguration { .add("fetchNextPageInAdvanceRow", fetchNextPageInAdvanceRow) .add("flagsUpdateChunkSize", flagsUpdateChunkSize) .add("uidMaxRetry", uidMaxRetry) + .add("blobPartSize", blobPartSize) + .add("onTheFlyV1ToV2Migration", onTheFlyV1ToV2Migration) .toString(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java index 10f111e..e0cc667 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java @@ -202,6 +202,8 @@ public class CassandraConfigurationTest { int flagsUpdateChunkSize = 7; int messageReadChunkSize = 8; int expungeChunkSize = 9; + int blobPartSize = 10; + boolean onTheFlyV1ToV2Migration = true; CassandraConfiguration configuration = CassandraConfiguration.builder() .aclMaxRetry(aclMaxRetry) @@ -213,6 +215,8 @@ public class CassandraConfigurationTest { .flagsUpdateChunkSize(flagsUpdateChunkSize) .messageReadChunkSize(messageReadChunkSize) .expungeChunkSize(expungeChunkSize) + .blobPartSize(blobPartSize) + .onTheFlyV1ToV2Migration(onTheFlyV1ToV2Migration) .build(); softly.assertThat(configuration.getAclMaxRetry()).isEqualTo(aclMaxRetry); @@ -224,6 +228,8 @@ public class CassandraConfigurationTest { softly.assertThat(configuration.getFlagsUpdateChunkSize()).isEqualTo(flagsUpdateChunkSize); softly.assertThat(configuration.getMessageReadChunkSize()).isEqualTo(messageReadChunkSize); softly.assertThat(configuration.getExpungeChunkSize()).isEqualTo(expungeChunkSize); + softly.assertThat(configuration.getBlobPartSize()).isEqualTo(blobPartSize); + softly.assertThat(configuration.isOnTheFlyV1ToV2Migration()).isEqualTo(onTheFlyV1ToV2Migration); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/dockerfiles/run/guice/cassandra-ldap/destination/conf/cassandra.properties ---------------------------------------------------------------------- diff --git a/dockerfiles/run/guice/cassandra-ldap/destination/conf/cassandra.properties b/dockerfiles/run/guice/cassandra-ldap/destination/conf/cassandra.properties index 3b273e6..54c9900 100644 --- a/dockerfiles/run/guice/cassandra-ldap/destination/conf/cassandra.properties +++ b/dockerfiles/run/guice/cassandra-ldap/destination/conf/cassandra.properties @@ -16,4 +16,6 @@ cassandra.retryConnection.minDelay=5000 # fetch.advance.row.count=1000 # chunk.size.flags.update=20 # chunk.size.message.read=100 -# chunk.size.expunge=100 \ No newline at end of file +# chunk.size.expunge=100 +# mailbox.blob.part.size=102400 +# migration.v1.v2.on.the.fly=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/dockerfiles/run/guice/cassandra/destination/conf/cassandra.properties ---------------------------------------------------------------------- diff --git a/dockerfiles/run/guice/cassandra/destination/conf/cassandra.properties b/dockerfiles/run/guice/cassandra/destination/conf/cassandra.properties index 54540e4..710674e 100644 --- a/dockerfiles/run/guice/cassandra/destination/conf/cassandra.properties +++ b/dockerfiles/run/guice/cassandra/destination/conf/cassandra.properties @@ -25,4 +25,6 @@ cassandra.retryConnection.minDelay=5000 # fetch.advance.row.count=1000 # chunk.size.flags.update=20 # chunk.size.message.read=100 -# chunk.size.expunge=100 \ No newline at end of file +# chunk.size.expunge=100 +# mailbox.blob.part.size=102400 +# migration.v1.v2.on.the.fly=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java index e73fd32..f6e3d21 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java @@ -33,6 +33,7 @@ import java.util.stream.Stream; import javax.inject.Inject; import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.mailbox.cassandra.ids.BlobId; import org.apache.james.mailbox.cassandra.mail.utils.DataChunker; @@ -45,22 +46,23 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.github.steveash.guavate.Guavate; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Bytes; public class CassandraBlobsDAO { - - public static final int CHUNK_SIZE = 1024 * 100; private final CassandraAsyncExecutor cassandraAsyncExecutor; private final PreparedStatement insert; private final PreparedStatement insertPart; private final PreparedStatement select; private final PreparedStatement selectPart; private final DataChunker dataChunker; + private final CassandraConfiguration configuration; @Inject - public CassandraBlobsDAO(Session session) { + public CassandraBlobsDAO(Session session, CassandraConfiguration cassandraConfiguration) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); + this.configuration = cassandraConfiguration; this.dataChunker = new DataChunker(); this.insert = prepareInsert(session); this.select = prepareSelect(session); @@ -69,6 +71,11 @@ public class CassandraBlobsDAO { this.selectPart = prepareSelectPart(session); } + @VisibleForTesting + public CassandraBlobsDAO(Session session) { + this(session, CassandraConfiguration.DEFAULT_CONFIGURATION); + } + private PreparedStatement prepareSelect(Session session) { return session.prepare(select() .from(BlobTable.TABLE_NAME) @@ -106,8 +113,8 @@ public class CassandraBlobsDAO { } private CompletableFuture<Integer> saveBlobParts(byte[] data, BlobId blobId) { - return FluentFutureStream.of( - dataChunker.chunk(data, CHUNK_SIZE) + return FluentFutureStream.<Pair<Integer, Void>> of( + dataChunker.chunk(data, configuration.getBlobPartSize()) .map(pair -> writePart(pair.getRight(), blobId, pair.getKey()) .thenApply(partId -> Pair.of(pair.getKey(), partId)))) .completableFuture() http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java index 5fb4fed..ad126fc 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java @@ -53,6 +53,7 @@ import javax.mail.util.SharedByteArrayInputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.mailbox.cassandra.ids.BlobId; @@ -82,10 +83,10 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.UDTValue; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.github.steveash.guavate.Guavate; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Bytes; public class CassandraMessageDAOV2 { - public static final int CHUNK_SIZE_ON_READ = 100; public static final long DEFAULT_LONG_VALUE = 0L; public static final String DEFAULT_OBJECT_VALUE = null; private static final byte[] EMPTY_BYTE_ARRAY = {}; @@ -93,6 +94,7 @@ public class CassandraMessageDAOV2 { private final CassandraAsyncExecutor cassandraAsyncExecutor; private final CassandraTypesProvider typesProvider; private final CassandraBlobsDAO blobsDAO; + private final CassandraConfiguration configuration; private final PreparedStatement insert; private final PreparedStatement delete; private final PreparedStatement selectMetadata; @@ -101,10 +103,11 @@ public class CassandraMessageDAOV2 { private final PreparedStatement selectBody; @Inject - public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO) { + public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO, CassandraConfiguration cassandraConfiguration) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.typesProvider = typesProvider; this.blobsDAO = blobsDAO; + this.configuration = cassandraConfiguration; this.insert = prepareInsert(session); this.delete = prepareDelete(session); this.selectMetadata = prepareSelect(session, METADATA); @@ -113,6 +116,11 @@ public class CassandraMessageDAOV2 { this.selectBody = prepareSelect(session, BODY); } + @VisibleForTesting + public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO) { + this(session, typesProvider, blobsDAO, CassandraConfiguration.DEFAULT_CONFIGURATION); + } + private PreparedStatement prepareSelect(Session session, String[] fields) { return session.prepare(select(fields) .from(TABLE_NAME) @@ -196,7 +204,7 @@ public class CassandraMessageDAOV2 { public CompletableFuture<Stream<MessageResult>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { return CompletableFutureUtil.chainAll( limit.applyOnStream(messageIds.stream().distinct()) - .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ)), + .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())), ids -> rowToMessages(fetchType, ids)) .thenApply(stream -> stream.flatMap(Function.identity())); } http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 62e9f85..1d554f4 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -89,7 +89,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { this.mailboxSession = mailboxSession; this.attachmentLoader = new AttachmentLoader(attachmentMapper); this.cassandraConfiguration = cassandraConfiguration; - this.v1ToV2Migration = new V1ToV2Migration(messageDAOV1, messageDAOV2, attachmentMapper); + this.v1ToV2Migration = new V1ToV2Migration(messageDAOV1, messageDAOV2, attachmentMapper, cassandraConfiguration); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 6335389..9ce7d41 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -55,7 +55,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -import org.apache.james.util.CompletableFutureUtil; import org.apache.james.util.FluentFutureStream; import org.apache.james.util.streams.JamesCollectors; import org.slf4j.Logger; @@ -109,7 +108,7 @@ public class CassandraMessageMapper implements MessageMapper { this.attachmentLoader = new AttachmentLoader(attachmentMapper); this.applicableFlagDAO = applicableFlagDAO; this.deletedMessageDAO = deletedMessageDAO; - this.v1ToV2Migration = new V1ToV2Migration(messageDAO, messageDAOV2, attachmentMapper); + this.v1ToV2Migration = new V1ToV2Migration(messageDAO, messageDAOV2, attachmentMapper, cassandraConfiguration); this.cassandraConfiguration = cassandraConfiguration; } http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java index ea14568..30b93cb 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java @@ -21,18 +21,18 @@ package org.apache.james.mailbox.cassandra.mail.migration; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; -import org.apache.james.mailbox.cassandra.mail.utils.Limit; import org.apache.james.mailbox.cassandra.mail.AttachmentLoader; import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMapper; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2; import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation; import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment; +import org.apache.james.mailbox.cassandra.mail.utils.Limit; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; @@ -47,11 +47,14 @@ public class V1ToV2Migration { private final CassandraMessageDAO messageDAOV1; private final CassandraMessageDAOV2 messageDAOV2; private final AttachmentLoader attachmentLoader; + private final CassandraConfiguration cassandraConfiguration; - public V1ToV2Migration(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, CassandraAttachmentMapper attachmentMapper) { + public V1ToV2Migration(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, + CassandraAttachmentMapper attachmentMapper, CassandraConfiguration cassandraConfiguration) { this.messageDAOV1 = messageDAOV1; this.messageDAOV2 = messageDAOV2; this.attachmentLoader = new AttachmentLoader(attachmentMapper); + this.cassandraConfiguration = cassandraConfiguration; } public CompletableFuture<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> @@ -70,11 +73,18 @@ public class V1ToV2Migration { private CompletableFuture<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> performV1ToV2Migration(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageV1) { return attachmentLoader.addAttachmentToMessages(Stream.of(messageV1), MessageMapper.FetchType.Full) .thenApply(stream -> stream.findAny().get()) - .thenCompose(this::saveInV2FromV1) - .thenCompose(this::deleteInV1) + .thenCompose(this::performV1ToV2Migration) .thenApply(any -> messageV1); } + private CompletableFuture<Void> performV1ToV2Migration(SimpleMailboxMessage message) { + if (!cassandraConfiguration.isOnTheFlyV1ToV2Migration()) { + return CompletableFuture.completedFuture(null); + } + return saveInV2FromV1(message) + .thenCompose(this::deleteInV1); + } + private CompletableFuture<Void> deleteInV1(Optional<SimpleMailboxMessage> optional) { return optional.map(SimpleMailboxMessage::getMessageId) .map(messageId -> (CassandraMessageId) messageId) @@ -82,7 +92,7 @@ public class V1ToV2Migration { .orElse(CompletableFuture.completedFuture(null)); } - private CompletionStage<Optional<SimpleMailboxMessage>> saveInV2FromV1(SimpleMailboxMessage message) { + private CompletableFuture<Optional<SimpleMailboxMessage>> saveInV2FromV1(SimpleMailboxMessage message) { try { return messageDAOV2.save(message).thenApply(any -> Optional.of(message)); } catch (MailboxException e) { http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java index 8890e53..96275e2 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java @@ -26,6 +26,7 @@ import java.util.Optional; import org.apache.commons.io.Charsets; import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.mailbox.cassandra.ids.BlobId; import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule; import org.junit.After; @@ -35,7 +36,8 @@ import org.junit.Test; import com.google.common.base.Strings; public class CassandraBlobsDAOTest { - private static final int MULTIPLE_CHUNK_SIZE = 3 * CassandraBlobsDAO.CHUNK_SIZE; + private static final int CHUNK_SIZE = 1024; + private static final int MULTIPLE_CHUNK_SIZE = 3; private CassandraCluster cassandra; private CassandraBlobsDAO testee; @@ -44,7 +46,10 @@ public class CassandraBlobsDAOTest { cassandra = CassandraCluster.create(new CassandraBlobModule()); cassandra.ensureAllTables(); - testee = new CassandraBlobsDAO(cassandra.getConf()); + testee = new CassandraBlobsDAO(cassandra.getConf(), + CassandraConfiguration.builder() + .blobPartSize(CHUNK_SIZE) + .build()); } @After http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java index 6416d62..0f1f9d1 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java @@ -28,6 +28,7 @@ import javax.mail.util.SharedByteArrayInputStream; import org.apache.commons.io.IOUtils; import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.init.CassandraModuleComposite; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.ids.CassandraId; @@ -97,7 +98,9 @@ public class V1ToV2MigrationTest { CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf()); messageDAOV2 = new CassandraMessageDAOV2(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO); attachmentMapper = new CassandraAttachmentMapper(cassandra.getConf()); - testee = new V1ToV2Migration(messageDAOV1, messageDAOV2, attachmentMapper); + testee = new V1ToV2Migration(messageDAOV1, messageDAOV2, attachmentMapper, CassandraConfiguration.builder() + .onTheFlyV1ToV2Migration(true) + .build()); messageIdFactory = new CassandraMessageId.Factory(); http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java index 9b4806f..037b04b 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java @@ -86,6 +86,8 @@ public class CassandraSessionModule extends AbstractModule { private static final String CHUNK_SIZE_FLAGS_UPDATE = "chunk.size.flags.update"; private static final String CHUNK_SIZE_MESSAGE_READ = "chunk.size.message.read"; private static final String CHUNK_SIZE_EXPUNGE = "chunk.size.expunge"; + private static final String BLOB_PART_SIZE = "mailbox.blob.part.size"; + private static final String MIGRATION_V1_V2_ON_THE_FLY = "migration.v1.v2.on.the.fly"; @Override protected void configure() { @@ -277,6 +279,10 @@ public class CassandraSessionModule extends AbstractModule { propertiesConfiguration.getInteger(CHUNK_SIZE_MESSAGE_READ, null))) .expungeChunkSize(Optional.ofNullable( propertiesConfiguration.getInteger(CHUNK_SIZE_EXPUNGE, null))) + .blobPartSize(Optional.ofNullable( + propertiesConfiguration.getInteger(BLOB_PART_SIZE, null))) + .onTheFlyV1ToV2Migration(Optional.ofNullable( + propertiesConfiguration.getBoolean(MIGRATION_V1_V2_ON_THE_FLY, null))) .build(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java index 893f76d..10a0398 100644 --- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java +++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java @@ -59,6 +59,8 @@ public class CassandraSessionModuleTest { .flagsUpdateChunkSize(7) .messageReadChunkSize(8) .expungeChunkSize(9) + .blobPartSize(10) + .onTheFlyV1ToV2Migration(true) .build()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties index 217be79..e420c68 100644 --- a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties +++ b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties @@ -7,3 +7,5 @@ fetch.advance.row.count=6 chunk.size.flags.update=7 chunk.size.message.read=8 chunk.size.expunge=9 +mailbox.blob.part.size=10 +migration.v1.v2.on.the.fly=true http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/src/site/xdoc/server/config-cassandra.xml ---------------------------------------------------------------------- diff --git a/src/site/xdoc/server/config-cassandra.xml b/src/site/xdoc/server/config-cassandra.xml index bf9fb90..4006387 100644 --- a/src/site/xdoc/server/config-cassandra.xml +++ b/src/site/xdoc/server/config-cassandra.xml @@ -111,6 +111,10 @@ <dd>Optional. Defaults to 100.<br/> Controls the number of messages to be retrieved in parallel.</dd> <dt><strong>chunk.size.expunge</strong></dt> <dd>Optional. Defaults to 100.<br/> Controls the number of messages to be expunged in parallel.</dd> + <dt><strong>mailbox.blob.part.size</strong></dt> + <dd>Optional. Defaults to 102400 (100KB).<br/> Controls the size of blob parts used to store messages.</dd> + <dt><strong>migration.v1.v2.on.the.fly</strong></dt> + <dd>Optional. Defaults to false.<br/> Controls wether v1 to v2 migration should be run on the fly.</dd> </dl> --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org