JAMES-2082 Add configuration options for blob part size and on the fly migration


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9c46faae
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9c46faae
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9c46faae

Branch: refs/heads/master
Commit: 9c46faae934374892bfe550aaa80a8bc20dfae8e
Parents: 8cb1905
Author: benwa <btell...@linagora.com>
Authored: Fri Jul 7 08:01:43 2017 +0700
Committer: Antoine Duprat <adup...@linagora.com>
Committed: Mon Jul 10 14:23:56 2017 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraConfiguration.java       | 54 ++++++++++++++++++--
 .../cassandra/CassandraConfigurationTest.java   |  6 +++
 .../destination/conf/cassandra.properties       |  4 +-
 .../destination/conf/cassandra.properties       |  4 +-
 .../cassandra/mail/CassandraBlobsDAO.java       | 17 ++++--
 .../cassandra/mail/CassandraMessageDAOV2.java   | 14 +++--
 .../mail/CassandraMessageIdMapper.java          |  2 +-
 .../cassandra/mail/CassandraMessageMapper.java  |  3 +-
 .../mail/migration/V1ToV2Migration.java         | 22 +++++---
 .../cassandra/mail/CassandraBlobsDAOTest.java   |  9 +++-
 .../mail/migration/V1ToV2MigrationTest.java     |  5 +-
 .../modules/mailbox/CassandraSessionModule.java |  6 +++
 .../mailbox/CassandraSessionModuleTest.java     |  2 +
 .../modules/mailbox/cassandra.properties        |  2 +
 src/site/xdoc/server/config-cassandra.xml       |  4 ++
 15 files changed, 127 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
index ff3cad9..6ca8487 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/CassandraConfiguration.java
@@ -38,6 +38,8 @@ public class CassandraConfiguration {
     public static final int DEFAULT_UID_MAX_RETRY = 100000;
     public static final int DEFAULT_ACL_MAX_RETRY = 1000;
     public static final int DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW = 100;
+    public static final boolean DEFAULT_ON_THE_FLY_MIGRATION_V1_TO_V2 = false;
+    public static final int DEFAULT_BLOB_PART_SIZE = 100 * 1024;
 
     public static class Builder {
         private Optional<Integer> messageReadChunkSize = Optional.empty();
@@ -49,6 +51,8 @@ public class CassandraConfiguration {
         private Optional<Integer> uidMaxRetry = Optional.empty();
         private Optional<Integer> aclMaxRetry = Optional.empty();
         private Optional<Integer> fetchNextPageInAdvanceRow = Optional.empty();
+        private Optional<Integer> blobPartSize = Optional.empty();
+        private Optional<Boolean> onTheFlyV1ToV2Migration = Optional.empty();
 
         public Builder messageReadChunkSize(int value) {
             Preconditions.checkArgument(value > 0, "messageReadChunkSize needs 
to be strictly positive");
@@ -104,6 +108,17 @@ public class CassandraConfiguration {
             return this;
         }
 
+        public Builder blobPartSize(int value) {
+            Preconditions.checkArgument(value > 0, "blobPartSize needs to be 
strictly positive");
+            this.blobPartSize = Optional.of(value);
+            return this;
+        }
+
+        public Builder onTheFlyV1ToV2Migration(boolean value) {
+            this.onTheFlyV1ToV2Migration = Optional.of(value);
+            return this;
+        }
+
         public Builder messageReadChunkSize(Optional<Integer> value) {
             value.ifPresent(this::messageReadChunkSize);
             return this;
@@ -149,6 +164,16 @@ public class CassandraConfiguration {
             return this;
         }
 
+        public Builder blobPartSize(Optional<Integer> value) {
+            value.ifPresent(this::blobPartSize);
+            return this;
+        }
+
+        public Builder onTheFlyV1ToV2Migration(Optional<Boolean> value) {
+            value.ifPresent(this::onTheFlyV1ToV2Migration);
+            return this;
+        }
+
         public CassandraConfiguration build() {
             return new 
CassandraConfiguration(aclMaxRetry.orElse(DEFAULT_ACL_MAX_RETRY),
                 
messageReadChunkSize.orElse(DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ),
@@ -158,7 +183,9 @@ public class CassandraConfiguration {
                 
flagsUpdateMessageMaxRetry.orElse(DEFAULT_FLAGS_UPDATE_MESSAGE_MAX_RETRY),
                 modSeqMaxRetry.orElse(DEFAULT_MODSEQ_MAX_RETRY),
                 uidMaxRetry.orElse(DEFAULT_UID_MAX_RETRY),
-                
fetchNextPageInAdvanceRow.orElse(DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW));
+                
fetchNextPageInAdvanceRow.orElse(DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW),
+                blobPartSize.orElse(DEFAULT_BLOB_PART_SIZE),
+                
onTheFlyV1ToV2Migration.orElse(DEFAULT_ON_THE_FLY_MIGRATION_V1_TO_V2));
         }
     }
 
@@ -175,11 +202,13 @@ public class CassandraConfiguration {
     private final int uidMaxRetry;
     private final int aclMaxRetry;
     private final int fetchNextPageInAdvanceRow;
+    private final int blobPartSize;
+    private final boolean onTheFlyV1ToV2Migration;
 
     @VisibleForTesting
     CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int 
expungeChunkSize, int flagsUpdateChunkSize,
-                                  int flagsUpdateMessageIdMaxRetry, int 
flagsUpdateMessageMaxRetry, int modSeqMaxRetry,
-                                   int uidMaxRetry, int 
fetchNextPageInAdvanceRow) {
+                           int flagsUpdateMessageIdMaxRetry, int 
flagsUpdateMessageMaxRetry, int modSeqMaxRetry,
+                           int uidMaxRetry, int fetchNextPageInAdvanceRow, int 
blobPartSize, boolean onTheFlyV1ToV2Migration) {
         this.aclMaxRetry = aclMaxRetry;
         this.messageReadChunkSize = messageReadChunkSize;
         this.expungeChunkSize = expungeChunkSize;
@@ -189,6 +218,16 @@ public class CassandraConfiguration {
         this.uidMaxRetry = uidMaxRetry;
         this.fetchNextPageInAdvanceRow = fetchNextPageInAdvanceRow;
         this.flagsUpdateChunkSize = flagsUpdateChunkSize;
+        this.blobPartSize = blobPartSize;
+        this.onTheFlyV1ToV2Migration = onTheFlyV1ToV2Migration;
+    }
+
+    public int getBlobPartSize() {
+        return blobPartSize;
+    }
+
+    public boolean isOnTheFlyV1ToV2Migration() {
+        return onTheFlyV1ToV2Migration;
     }
 
     public int getFlagsUpdateChunkSize() {
@@ -240,7 +279,9 @@ public class CassandraConfiguration {
                 && Objects.equals(this.modSeqMaxRetry, that.modSeqMaxRetry)
                 && Objects.equals(this.uidMaxRetry, that.uidMaxRetry)
                 && Objects.equals(this.flagsUpdateChunkSize, 
that.flagsUpdateChunkSize)
-                && Objects.equals(this.fetchNextPageInAdvanceRow, 
that.fetchNextPageInAdvanceRow);
+                && Objects.equals(this.fetchNextPageInAdvanceRow, 
that.fetchNextPageInAdvanceRow)
+                && Objects.equals(this.blobPartSize, that.blobPartSize)
+                && Objects.equals(this.onTheFlyV1ToV2Migration, 
that.onTheFlyV1ToV2Migration);
         }
         return false;
     }
@@ -248,7 +289,8 @@ public class CassandraConfiguration {
     @Override
     public final int hashCode() {
         return Objects.hash(aclMaxRetry, messageReadChunkSize, 
expungeChunkSize, flagsUpdateMessageIdMaxRetry,
-            flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, 
fetchNextPageInAdvanceRow, flagsUpdateChunkSize);
+            flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, 
fetchNextPageInAdvanceRow, flagsUpdateChunkSize,
+            blobPartSize, onTheFlyV1ToV2Migration);
     }
 
     @Override
@@ -263,6 +305,8 @@ public class CassandraConfiguration {
             .add("fetchNextPageInAdvanceRow", fetchNextPageInAdvanceRow)
             .add("flagsUpdateChunkSize", flagsUpdateChunkSize)
             .add("uidMaxRetry", uidMaxRetry)
+            .add("blobPartSize", blobPartSize)
+            .add("onTheFlyV1ToV2Migration", onTheFlyV1ToV2Migration)
             .toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
index 10f111e..e0cc667 100644
--- 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
+++ 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraConfigurationTest.java
@@ -202,6 +202,8 @@ public class CassandraConfigurationTest {
         int flagsUpdateChunkSize = 7;
         int messageReadChunkSize = 8;
         int expungeChunkSize = 9;
+        int blobPartSize = 10;
+        boolean onTheFlyV1ToV2Migration = true;
 
         CassandraConfiguration configuration = CassandraConfiguration.builder()
             .aclMaxRetry(aclMaxRetry)
@@ -213,6 +215,8 @@ public class CassandraConfigurationTest {
             .flagsUpdateChunkSize(flagsUpdateChunkSize)
             .messageReadChunkSize(messageReadChunkSize)
             .expungeChunkSize(expungeChunkSize)
+            .blobPartSize(blobPartSize)
+            .onTheFlyV1ToV2Migration(onTheFlyV1ToV2Migration)
             .build();
 
         
softly.assertThat(configuration.getAclMaxRetry()).isEqualTo(aclMaxRetry);
@@ -224,6 +228,8 @@ public class CassandraConfigurationTest {
         
softly.assertThat(configuration.getFlagsUpdateChunkSize()).isEqualTo(flagsUpdateChunkSize);
         
softly.assertThat(configuration.getMessageReadChunkSize()).isEqualTo(messageReadChunkSize);
         
softly.assertThat(configuration.getExpungeChunkSize()).isEqualTo(expungeChunkSize);
+        
softly.assertThat(configuration.getBlobPartSize()).isEqualTo(blobPartSize);
+        
softly.assertThat(configuration.isOnTheFlyV1ToV2Migration()).isEqualTo(onTheFlyV1ToV2Migration);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/dockerfiles/run/guice/cassandra-ldap/destination/conf/cassandra.properties
----------------------------------------------------------------------
diff --git 
a/dockerfiles/run/guice/cassandra-ldap/destination/conf/cassandra.properties 
b/dockerfiles/run/guice/cassandra-ldap/destination/conf/cassandra.properties
index 3b273e6..54c9900 100644
--- a/dockerfiles/run/guice/cassandra-ldap/destination/conf/cassandra.properties
+++ b/dockerfiles/run/guice/cassandra-ldap/destination/conf/cassandra.properties
@@ -16,4 +16,6 @@ cassandra.retryConnection.minDelay=5000
 # fetch.advance.row.count=1000
 # chunk.size.flags.update=20
 # chunk.size.message.read=100
-# chunk.size.expunge=100
\ No newline at end of file
+# chunk.size.expunge=100
+# mailbox.blob.part.size=102400
+# migration.v1.v2.on.the.fly=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/dockerfiles/run/guice/cassandra/destination/conf/cassandra.properties
----------------------------------------------------------------------
diff --git 
a/dockerfiles/run/guice/cassandra/destination/conf/cassandra.properties 
b/dockerfiles/run/guice/cassandra/destination/conf/cassandra.properties
index 54540e4..710674e 100644
--- a/dockerfiles/run/guice/cassandra/destination/conf/cassandra.properties
+++ b/dockerfiles/run/guice/cassandra/destination/conf/cassandra.properties
@@ -25,4 +25,6 @@ cassandra.retryConnection.minDelay=5000
 # fetch.advance.row.count=1000
 # chunk.size.flags.update=20
 # chunk.size.message.read=100
-# chunk.size.expunge=100
\ No newline at end of file
+# chunk.size.expunge=100
+# mailbox.blob.part.size=102400
+# migration.v1.v2.on.the.fly=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
index e73fd32..f6e3d21 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
@@ -33,6 +33,7 @@ import java.util.stream.Stream;
 import javax.inject.Inject;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.mailbox.cassandra.ids.BlobId;
 import org.apache.james.mailbox.cassandra.mail.utils.DataChunker;
@@ -45,22 +46,23 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
 
 public class CassandraBlobsDAO {
-
-    public static final int CHUNK_SIZE = 1024 * 100;
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement insert;
     private final PreparedStatement insertPart;
     private final PreparedStatement select;
     private final PreparedStatement selectPart;
     private final DataChunker dataChunker;
+    private final CassandraConfiguration configuration;
 
     @Inject
-    public CassandraBlobsDAO(Session session) {
+    public CassandraBlobsDAO(Session session, CassandraConfiguration 
cassandraConfiguration) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+        this.configuration = cassandraConfiguration;
         this.dataChunker = new DataChunker();
         this.insert = prepareInsert(session);
         this.select = prepareSelect(session);
@@ -69,6 +71,11 @@ public class CassandraBlobsDAO {
         this.selectPart = prepareSelectPart(session);
     }
 
+    @VisibleForTesting
+    public CassandraBlobsDAO(Session session) {
+        this(session, CassandraConfiguration.DEFAULT_CONFIGURATION);
+    }
+
     private PreparedStatement prepareSelect(Session session) {
         return session.prepare(select()
             .from(BlobTable.TABLE_NAME)
@@ -106,8 +113,8 @@ public class CassandraBlobsDAO {
     }
 
     private CompletableFuture<Integer> saveBlobParts(byte[] data, BlobId 
blobId) {
-        return FluentFutureStream.of(
-            dataChunker.chunk(data, CHUNK_SIZE)
+        return FluentFutureStream.<Pair<Integer, Void>> of(
+            dataChunker.chunk(data, configuration.getBlobPartSize())
                 .map(pair -> writePart(pair.getRight(), blobId, pair.getKey())
                     .thenApply(partId -> Pair.of(pair.getKey(), partId))))
             .completableFuture()

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
index 5fb4fed..ad126fc 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
@@ -53,6 +53,7 @@ import javax.mail.util.SharedByteArrayInputStream;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.mailbox.cassandra.ids.BlobId;
@@ -82,10 +83,10 @@ import com.datastax.driver.core.Session;
 import com.datastax.driver.core.UDTValue;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Bytes;
 
 public class CassandraMessageDAOV2 {
-    public static final int CHUNK_SIZE_ON_READ = 100;
     public static final long DEFAULT_LONG_VALUE = 0L;
     public static final String DEFAULT_OBJECT_VALUE = null;
     private static final byte[] EMPTY_BYTE_ARRAY = {};
@@ -93,6 +94,7 @@ public class CassandraMessageDAOV2 {
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final CassandraTypesProvider typesProvider;
     private final CassandraBlobsDAO blobsDAO;
+    private final CassandraConfiguration configuration;
     private final PreparedStatement insert;
     private final PreparedStatement delete;
     private final PreparedStatement selectMetadata;
@@ -101,10 +103,11 @@ public class CassandraMessageDAOV2 {
     private final PreparedStatement selectBody;
 
     @Inject
-    public CassandraMessageDAOV2(Session session, CassandraTypesProvider 
typesProvider, CassandraBlobsDAO blobsDAO) {
+    public CassandraMessageDAOV2(Session session, CassandraTypesProvider 
typesProvider, CassandraBlobsDAO blobsDAO, CassandraConfiguration 
cassandraConfiguration) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.typesProvider = typesProvider;
         this.blobsDAO = blobsDAO;
+        this.configuration = cassandraConfiguration;
         this.insert = prepareInsert(session);
         this.delete = prepareDelete(session);
         this.selectMetadata = prepareSelect(session, METADATA);
@@ -113,6 +116,11 @@ public class CassandraMessageDAOV2 {
         this.selectBody = prepareSelect(session, BODY);
     }
 
+    @VisibleForTesting
+    public CassandraMessageDAOV2(Session session, CassandraTypesProvider 
typesProvider, CassandraBlobsDAO blobsDAO) {
+        this(session, typesProvider, blobsDAO, 
CassandraConfiguration.DEFAULT_CONFIGURATION);
+    }
+
     private PreparedStatement prepareSelect(Session session, String[] fields) {
         return session.prepare(select(fields)
             .from(TABLE_NAME)
@@ -196,7 +204,7 @@ public class CassandraMessageDAOV2 {
     public CompletableFuture<Stream<MessageResult>> 
retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType 
fetchType, Limit limit) {
         return CompletableFutureUtil.chainAll(
                 limit.applyOnStream(messageIds.stream().distinct())
-                    .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ)),
+                    
.collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())),
             ids -> rowToMessages(fetchType, ids))
             .thenApply(stream -> stream.flatMap(Function.identity()));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 62e9f85..1d554f4 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -89,7 +89,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         this.mailboxSession = mailboxSession;
         this.attachmentLoader = new AttachmentLoader(attachmentMapper);
         this.cassandraConfiguration = cassandraConfiguration;
-        this.v1ToV2Migration = new V1ToV2Migration(messageDAOV1, messageDAOV2, 
attachmentMapper);
+        this.v1ToV2Migration = new V1ToV2Migration(messageDAOV1, messageDAOV2, 
attachmentMapper, cassandraConfiguration);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 6335389..9ce7d41 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -55,7 +55,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.CompletableFutureUtil;
 import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.streams.JamesCollectors;
 import org.slf4j.Logger;
@@ -109,7 +108,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         this.attachmentLoader = new AttachmentLoader(attachmentMapper);
         this.applicableFlagDAO = applicableFlagDAO;
         this.deletedMessageDAO = deletedMessageDAO;
-        this.v1ToV2Migration = new V1ToV2Migration(messageDAO, messageDAOV2, 
attachmentMapper);
+        this.v1ToV2Migration = new V1ToV2Migration(messageDAO, messageDAOV2, 
attachmentMapper, cassandraConfiguration);
         this.cassandraConfiguration = cassandraConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
index ea14568..30b93cb 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
@@ -21,18 +21,18 @@ package org.apache.james.mailbox.cassandra.mail.migration;
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
-import org.apache.james.mailbox.cassandra.mail.utils.Limit;
 import org.apache.james.mailbox.cassandra.mail.AttachmentLoader;
 import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
 import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation;
 import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment;
+import org.apache.james.mailbox.cassandra.mail.utils.Limit;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
@@ -47,11 +47,14 @@ public class V1ToV2Migration {
     private final CassandraMessageDAO messageDAOV1;
     private final CassandraMessageDAOV2 messageDAOV2;
     private final AttachmentLoader attachmentLoader;
+    private final CassandraConfiguration cassandraConfiguration;
 
-    public V1ToV2Migration(CassandraMessageDAO messageDAOV1, 
CassandraMessageDAOV2 messageDAOV2, CassandraAttachmentMapper attachmentMapper) 
{
+    public V1ToV2Migration(CassandraMessageDAO messageDAOV1, 
CassandraMessageDAOV2 messageDAOV2,
+                           CassandraAttachmentMapper attachmentMapper, 
CassandraConfiguration cassandraConfiguration) {
         this.messageDAOV1 = messageDAOV1;
         this.messageDAOV2 = messageDAOV2;
         this.attachmentLoader = new AttachmentLoader(attachmentMapper);
+        this.cassandraConfiguration = cassandraConfiguration;
     }
 
     public CompletableFuture<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>>
@@ -70,11 +73,18 @@ public class V1ToV2Migration {
     private CompletableFuture<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> 
performV1ToV2Migration(Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> messageV1) {
         return attachmentLoader.addAttachmentToMessages(Stream.of(messageV1), 
MessageMapper.FetchType.Full)
             .thenApply(stream -> stream.findAny().get())
-            .thenCompose(this::saveInV2FromV1)
-            .thenCompose(this::deleteInV1)
+            .thenCompose(this::performV1ToV2Migration)
             .thenApply(any -> messageV1);
     }
 
+    private CompletableFuture<Void> 
performV1ToV2Migration(SimpleMailboxMessage message) {
+        if (!cassandraConfiguration.isOnTheFlyV1ToV2Migration()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return saveInV2FromV1(message)
+            .thenCompose(this::deleteInV1);
+    }
+
     private CompletableFuture<Void> deleteInV1(Optional<SimpleMailboxMessage> 
optional) {
         return optional.map(SimpleMailboxMessage::getMessageId)
             .map(messageId -> (CassandraMessageId) messageId)
@@ -82,7 +92,7 @@ public class V1ToV2Migration {
             .orElse(CompletableFuture.completedFuture(null));
     }
 
-    private CompletionStage<Optional<SimpleMailboxMessage>> 
saveInV2FromV1(SimpleMailboxMessage message) {
+    private CompletableFuture<Optional<SimpleMailboxMessage>> 
saveInV2FromV1(SimpleMailboxMessage message) {
         try {
             return messageDAOV2.save(message).thenApply(any -> 
Optional.of(message));
         } catch (MailboxException e) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
index 8890e53..96275e2 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
@@ -26,6 +26,7 @@ import java.util.Optional;
 
 import org.apache.commons.io.Charsets;
 import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.mailbox.cassandra.ids.BlobId;
 import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
 import org.junit.After;
@@ -35,7 +36,8 @@ import org.junit.Test;
 import com.google.common.base.Strings;
 
 public class CassandraBlobsDAOTest {
-    private static final int MULTIPLE_CHUNK_SIZE = 3 * 
CassandraBlobsDAO.CHUNK_SIZE;
+    private static final int CHUNK_SIZE = 1024;
+    private static final int MULTIPLE_CHUNK_SIZE = 3;
     private CassandraCluster cassandra;
     private CassandraBlobsDAO testee;
 
@@ -44,7 +46,10 @@ public class CassandraBlobsDAOTest {
         cassandra = CassandraCluster.create(new CassandraBlobModule());
         cassandra.ensureAllTables();
 
-        testee = new CassandraBlobsDAO(cassandra.getConf());
+        testee = new CassandraBlobsDAO(cassandra.getConf(),
+            CassandraConfiguration.builder()
+                .blobPartSize(CHUNK_SIZE)
+                .build());
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
index 6416d62..0f1f9d1 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
@@ -28,6 +28,7 @@ import javax.mail.util.SharedByteArrayInputStream;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -97,7 +98,9 @@ public class V1ToV2MigrationTest {
         CassandraBlobsDAO blobsDAO = new 
CassandraBlobsDAO(cassandra.getConf());
         messageDAOV2 = new CassandraMessageDAOV2(cassandra.getConf(), 
cassandra.getTypesProvider(), blobsDAO);
         attachmentMapper = new CassandraAttachmentMapper(cassandra.getConf());
-        testee = new V1ToV2Migration(messageDAOV1, messageDAOV2, 
attachmentMapper);
+        testee = new V1ToV2Migration(messageDAOV1, messageDAOV2, 
attachmentMapper, CassandraConfiguration.builder()
+            .onTheFlyV1ToV2Migration(true)
+            .build());
 
 
         messageIdFactory = new CassandraMessageId.Factory();

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
index 9b4806f..037b04b 100644
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
+++ 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
@@ -86,6 +86,8 @@ public class CassandraSessionModule extends AbstractModule {
     private static final String CHUNK_SIZE_FLAGS_UPDATE = 
"chunk.size.flags.update";
     private static final String CHUNK_SIZE_MESSAGE_READ = 
"chunk.size.message.read";
     private static final String CHUNK_SIZE_EXPUNGE = "chunk.size.expunge";
+    private static final String BLOB_PART_SIZE = "mailbox.blob.part.size";
+    private static final String MIGRATION_V1_V2_ON_THE_FLY = 
"migration.v1.v2.on.the.fly";
 
     @Override
     protected void configure() {
@@ -277,6 +279,10 @@ public class CassandraSessionModule extends AbstractModule 
{
                 propertiesConfiguration.getInteger(CHUNK_SIZE_MESSAGE_READ, 
null)))
             .expungeChunkSize(Optional.ofNullable(
                 propertiesConfiguration.getInteger(CHUNK_SIZE_EXPUNGE, null)))
+            .blobPartSize(Optional.ofNullable(
+                propertiesConfiguration.getInteger(BLOB_PART_SIZE, null)))
+            .onTheFlyV1ToV2Migration(Optional.ofNullable(
+                propertiesConfiguration.getBoolean(MIGRATION_V1_V2_ON_THE_FLY, 
null)))
             .build();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
 
b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
index 893f76d..10a0398 100644
--- 
a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
+++ 
b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/modules/mailbox/CassandraSessionModuleTest.java
@@ -59,6 +59,8 @@ public class CassandraSessionModuleTest {
                 .flagsUpdateChunkSize(7)
                 .messageReadChunkSize(8)
                 .expungeChunkSize(9)
+                .blobPartSize(10)
+                .onTheFlyV1ToV2Migration(true)
                 .build());
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
----------------------------------------------------------------------
diff --git 
a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
 
b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
index 217be79..e420c68 100644
--- 
a/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
+++ 
b/server/container/guice/cassandra-guice/src/test/resources/modules/mailbox/cassandra.properties
@@ -7,3 +7,5 @@ fetch.advance.row.count=6
 chunk.size.flags.update=7
 chunk.size.message.read=8
 chunk.size.expunge=9
+mailbox.blob.part.size=10
+migration.v1.v2.on.the.fly=true

http://git-wip-us.apache.org/repos/asf/james-project/blob/9c46faae/src/site/xdoc/server/config-cassandra.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/server/config-cassandra.xml 
b/src/site/xdoc/server/config-cassandra.xml
index bf9fb90..4006387 100644
--- a/src/site/xdoc/server/config-cassandra.xml
+++ b/src/site/xdoc/server/config-cassandra.xml
@@ -111,6 +111,10 @@
         <dd>Optional. Defaults to 100.<br/> Controls the number of messages to 
be retrieved in parallel.</dd>
         <dt><strong>chunk.size.expunge</strong></dt>
         <dd>Optional. Defaults to 100.<br/> Controls the number of messages to 
be expunged in parallel.</dd>
+        <dt><strong>mailbox.blob.part.size</strong></dt>
+        <dd>Optional. Defaults to 102400 (100KB).<br/> Controls the size of 
blob parts used to store messages.</dd>
+        <dt><strong>migration.v1.v2.on.the.fly</strong></dt>
+        <dd>Optional. Defaults to false.<br/> Controls wether v1 to v2 
migration should be run on the fly.</dd>
       </dl>
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to