JAMES-2096 Write migration task

The migration log allow to count migrated messages and allow to follow the 
migration process from Kibana


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/48043b4f
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/48043b4f
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/48043b4f

Branch: refs/heads/master
Commit: 48043b4f2baca6e45ccea3c3aa7ffcb10cff6995
Parents: fb284d3
Author: benwa <[email protected]>
Authored: Mon Jul 24 18:07:19 2017 +0700
Committer: benwa <[email protected]>
Committed: Tue Jul 25 17:51:15 2017 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageDAO.java     | 107 +++++++++++++++++--
 .../cassandra/mail/CassandraMessageDAOV2.java   |  56 +++++++++-
 .../mail/migration/V1ToV2Migration.java         |  25 ++++-
 .../mail/migration/V1ToV2MigrationTest.java     |  46 ++++++++
 4 files changed, 224 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/48043b4f/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 77e03d9..78f6372 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -42,6 +42,7 @@ import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV1Table.T
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Date;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -56,6 +57,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.cassandra.mail.utils.Limit;
 import 
org.apache.james.mailbox.cassandra.table.CassandraMessageV1Table.Attachments;
@@ -65,6 +67,7 @@ import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.Cid;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
@@ -80,6 +83,7 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.UDTValue;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.ByteStreams;
 import com.google.common.primitives.Bytes;
@@ -93,10 +97,13 @@ public class CassandraMessageDAO {
     private final PreparedStatement selectHeaders;
     private final PreparedStatement selectFields;
     private final PreparedStatement selectBody;
+    private final PreparedStatement selectAll;
+    private CassandraUtils cassandraUtils;
     private final CassandraConfiguration cassandraConfiguration;
 
     @Inject
-    public CassandraMessageDAO(Session session, CassandraTypesProvider 
typesProvider, CassandraConfiguration cassandraConfiguration) {
+    public CassandraMessageDAO(Session session, CassandraTypesProvider 
typesProvider, CassandraConfiguration cassandraConfiguration,
+                               CassandraUtils cassandraUtils) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.typesProvider = typesProvider;
         this.insert = prepareInsert(session);
@@ -106,11 +113,17 @@ public class CassandraMessageDAO {
         this.selectFields = prepareSelect(session, FIELDS);
         this.selectBody = prepareSelect(session, BODY);
         this.cassandraConfiguration = cassandraConfiguration;
+        this.selectAll = prepareSelectAll(session);
+        this.cassandraUtils = cassandraUtils;
     }
 
     @VisibleForTesting
     public CassandraMessageDAO(Session session, CassandraTypesProvider 
typesProvider) {
-        this(session, typesProvider, 
CassandraConfiguration.DEFAULT_CONFIGURATION);
+        this(session, typesProvider, 
CassandraConfiguration.DEFAULT_CONFIGURATION, 
CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+    }
+
+    private PreparedStatement prepareSelectAll(Session session) {
+        return session.prepare(select().from(TABLE_NAME));
     }
 
     private PreparedStatement prepareSelect(Session session, String[] fields) {
@@ -139,6 +152,13 @@ public class CassandraMessageDAO {
                 .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
     }
 
+    public Stream<RawMessage> readAll() {
+        return cassandraUtils.convertToStream(
+            cassandraAsyncExecutor.execute(selectAll.bind())
+                .join())
+            .map(this::fromRow);
+    }
+
     public CompletableFuture<Void> save(MailboxMessage message) throws 
MailboxException {
         try {
             CassandraMessageId messageId = (CassandraMessageId) 
message.getMessageId();
@@ -223,14 +243,14 @@ public class CassandraMessageDAO {
                 row.getInt(BODY_START_OCTET),
                 buildContent(row, fetchType),
                 messageIdWithMetaData.getFlags(),
-                getPropertyBuilder(row),
+                retrievePropertyBuilder(row),
                 messageId.getMailboxId(),
                 messageId.getUid(),
                 messageIdWithMetaData.getModSeq());
-        return Pair.of(messageWithoutAttachment, getAttachments(row, 
fetchType));
+        return Pair.of(messageWithoutAttachment, retrieveAttachments(row, 
fetchType));
     }
 
-    private PropertyBuilder getPropertyBuilder(Row row) {
+    private PropertyBuilder retrievePropertyBuilder(Row row) {
         PropertyBuilder property = new PropertyBuilder(
             row.getList(PROPERTIES, UDTValue.class).stream()
                 .map(x -> new 
SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), 
x.getString(Properties.VALUE)))
@@ -239,7 +259,7 @@ public class CassandraMessageDAO {
         return property;
     }
 
-    private Stream<MessageAttachmentRepresentation> getAttachments(Row row, 
FetchType fetchType) {
+    private Stream<MessageAttachmentRepresentation> retrieveAttachments(Row 
row, FetchType fetchType) {
         switch (fetchType) {
         case Full:
         case Body:
@@ -313,4 +333,79 @@ public class CassandraMessageDAO {
         row.getBytes(field).get(headerContent);
         return headerContent;
     }
+
+    private RawMessage fromRow(Row row) {
+        return new RawMessage(
+            row.getTimestamp(INTERNAL_DATE),
+            new CassandraMessageId.Factory().of(row.getUUID(MESSAGE_ID)),
+            row.getInt(BODY_START_OCTET),
+            row.getLong(FULL_CONTENT_OCTETS),
+            getFieldContent(BODY_CONTENT, row),
+            getFieldContent(HEADER_CONTENT, row),
+            retrievePropertyBuilder(row),
+            row.getLong(TEXTUAL_LINE_COUNT),
+            retrieveAttachments(row, 
FetchType.Full).collect(Guavate.toImmutableList()));
+    }
+
+    public static class RawMessage {
+        private final Date internalDate;
+        private final MessageId messageId;
+        private final int bodyStartOctet;
+        private final long fullContentOctet;
+        private final byte[] bodyContent;
+        private final byte[] headerContent;
+        private final PropertyBuilder propertyBuilder;
+        private final long textuaLineCount;
+        private final List<MessageAttachmentRepresentation> attachments;
+
+        private RawMessage(Date internalDate, MessageId messageId, int 
bodyStartOctet, long fullContentOctet, byte[] bodyContent,
+                          byte[] headerContent, PropertyBuilder 
propertyBuilder, long textuaLineCount,
+                          List<MessageAttachmentRepresentation> attachments) {
+            this.internalDate = internalDate;
+            this.messageId = messageId;
+            this.bodyStartOctet = bodyStartOctet;
+            this.fullContentOctet = fullContentOctet;
+            this.bodyContent = bodyContent;
+            this.headerContent = headerContent;
+            this.propertyBuilder = propertyBuilder;
+            this.textuaLineCount = textuaLineCount;
+            this.attachments = attachments;
+        }
+
+        public Date getInternalDate() {
+            return internalDate;
+        }
+
+        public MessageId getMessageId() {
+            return messageId;
+        }
+
+        public int getBodyStartOctet() {
+            return bodyStartOctet;
+        }
+
+        public long getFullContentOctet() {
+            return fullContentOctet;
+        }
+
+        public byte[] getBodyContent() {
+            return bodyContent;
+        }
+
+        public byte[] getHeaderContent() {
+            return headerContent;
+        }
+
+        public PropertyBuilder getPropertyBuilder() {
+            return propertyBuilder;
+        }
+
+        public long getTextuaLineCount() {
+            return textuaLineCount;
+        }
+
+        public List<MessageAttachmentRepresentation> getAttachments() {
+            return attachments;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/48043b4f/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
index c6817ff..6664028 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
@@ -69,6 +69,7 @@ import 
org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MessageAttachment;
 import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.Property;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty;
 import org.apache.james.util.CompletableFutureUtil;
@@ -189,6 +190,49 @@ public class CassandraMessageDAOV2 {
             .collect(Guavate.toImmutableList());
     }
 
+    private List<UDTValue> buildPropertiesUdt(List<Property> properties) {
+        return properties.stream()
+            .map(property -> typesProvider.getDefinedUserType(PROPERTIES)
+                .newValue()
+                .setString(Properties.NAMESPACE, property.getNamespace())
+                .setString(Properties.NAME, property.getLocalName())
+                .setString(Properties.VALUE, property.getValue()))
+            .collect(Guavate.toImmutableList());
+    }
+
+    private UDTValue toUDT(MessageAttachment messageAttachment) {
+        return typesProvider.getDefinedUserType(ATTACHMENTS)
+            .newValue()
+            .setString(Attachments.ID, 
messageAttachment.getAttachmentId().getId())
+            .setString(Attachments.NAME, messageAttachment.getName().orNull())
+            .setString(Attachments.CID, 
messageAttachment.getCid().transform(Cid::getValue).orNull())
+            .setBool(Attachments.IS_INLINE, messageAttachment.isInline());
+    }
+
+    public CompletableFuture<Void> save(CassandraMessageDAO.RawMessage 
rawMessage) {
+        return CompletableFutureUtil.combine(
+            blobsDAO.save(rawMessage.getHeaderContent()),
+            blobsDAO.save(rawMessage.getBodyContent()),
+            Pair::of)
+            .thenCompose(pair ->
+                
cassandraAsyncExecutor.executeVoid(boundWriteStatement(rawMessage, pair)));
+    }
+
+    private BoundStatement boundWriteStatement(CassandraMessageDAO.RawMessage 
message, Pair<Optional<BlobId>, Optional<BlobId>> pair) {
+        CassandraMessageId messageId = (CassandraMessageId) 
message.getMessageId();
+        return insert.bind()
+            .setUUID(MESSAGE_ID, messageId.get())
+            .setTimestamp(INTERNAL_DATE, message.getInternalDate())
+            .setInt(BODY_START_OCTET, message.getBodyStartOctet())
+            .setLong(FULL_CONTENT_OCTETS, message.getFullContentOctet())
+            .setLong(BODY_OCTECTS, message.getFullContentOctet() - 
message.getBodyStartOctet())
+            .setString(BODY_CONTENT, 
pair.getRight().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE))
+            .setString(HEADER_CONTENT, 
pair.getLeft().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE))
+            .setLong(TEXTUAL_LINE_COUNT, message.getTextuaLineCount())
+            .setList(PROPERTIES, 
buildPropertiesUdt(message.getPropertyBuilder().toProperties()))
+            .setList(ATTACHMENTS, buildAttachmentUdt(message));
+    }
+
     private List<UDTValue> buildPropertiesUdt(MailboxMessage message) {
         return message.getProperties().stream()
             .map(x -> typesProvider.getDefinedUserType(PROPERTIES)
@@ -199,12 +243,18 @@ public class CassandraMessageDAOV2 {
             .collect(Guavate.toImmutableList());
     }
 
-    private UDTValue toUDT(MessageAttachment messageAttachment) {
+    private ImmutableList<UDTValue> 
buildAttachmentUdt(CassandraMessageDAO.RawMessage message) {
+        return message.getAttachments().stream()
+            .map(this::toUDT)
+            .collect(Guavate.toImmutableList());
+    }
+
+    private UDTValue toUDT(MessageAttachmentRepresentation messageAttachment) {
         return typesProvider.getDefinedUserType(ATTACHMENTS)
             .newValue()
             .setString(Attachments.ID, 
messageAttachment.getAttachmentId().getId())
-            .setString(Attachments.NAME, messageAttachment.getName().orNull())
-            .setString(Attachments.CID, 
messageAttachment.getCid().transform(Cid::getValue).orNull())
+            .setString(Attachments.NAME, 
messageAttachment.getName().orElse(null))
+            .setString(Attachments.CID, 
messageAttachment.getCid().map(Cid::getValue).orElse(null))
             .setBool(Attachments.IS_INLINE, messageAttachment.isInline());
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/48043b4f/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
index d3a8cb6..0e1885c 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
@@ -31,6 +31,7 @@ import javax.inject.Inject;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraConfiguration;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.cassandra.mail.AttachmentLoader;
 import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
@@ -50,6 +51,7 @@ public class V1ToV2Migration implements Migration {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(V1ToV2MigrationThread.class);
 
     private final CassandraMessageDAO messageDAOV1;
+    private final CassandraMessageDAOV2 messageDAOV2;
     private final AttachmentLoader attachmentLoader;
     private final CassandraConfiguration cassandraConfiguration;
     private final ExecutorService migrationExecutor;
@@ -59,6 +61,7 @@ public class V1ToV2Migration implements Migration {
     public V1ToV2Migration(CassandraMessageDAO messageDAOV1, 
CassandraMessageDAOV2 messageDAOV2,
                            CassandraAttachmentMapper attachmentMapper, 
CassandraConfiguration cassandraConfiguration) {
         this.messageDAOV1 = messageDAOV1;
+        this.messageDAOV2 = messageDAOV2;
         this.attachmentLoader = new AttachmentLoader(attachmentMapper);
         this.cassandraConfiguration = cassandraConfiguration;
         this.migrationExecutor = 
Executors.newFixedThreadPool(cassandraConfiguration.getV1ToV2ThreadCount());
@@ -107,6 +110,26 @@ public class V1ToV2Migration implements Migration {
 
     @Override
     public boolean run() {
-        return false;
+        return messageDAOV1.readAll()
+            .map(this::migrate)
+            .reduce(true, (b1, b2) -> b1 && b2);
+    }
+
+    private boolean migrate(CassandraMessageDAO.RawMessage rawMessage) {
+        try {
+            CassandraMessageId messageId = (CassandraMessageId) 
rawMessage.getMessageId();
+
+            messageDAOV2.save(rawMessage)
+                .thenCompose(any -> messageDAOV1.delete(messageId))
+                .join();
+
+            LOGGER.debug("{} migrated", rawMessage.getMessageId());
+
+            return true;
+        } catch (Exception e) {
+            LOGGER.warn("Error while migrating {}", rawMessage.getMessageId(), 
e);
+
+            return false;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/48043b4f/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
index 57ec014..dff0aad 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
@@ -166,6 +166,52 @@ public class V1ToV2MigrationTest {
     }
 
     @Test
+    public void runShouldMigrateMessages() throws Exception {
+        SimpleMailboxMessage originalMessage = createMessage(messageId, 
CONTENT, BODY_START,
+            new PropertyBuilder(), ImmutableList.of());
+        messageDAOV1.save(originalMessage).join();
+
+        testee.run();
+
+        CassandraMessageDAOV2.MessageResult messageResult = 
retrieveMessageOnV2().get();
+        
softly.assertThat(messageResult.message().getLeft().getMessageId()).isEqualTo(messageId);
+        
softly.assertThat(IOUtils.toString(messageResult.message().getLeft().getContent(),
 Charsets.UTF_8))
+            .isEqualTo(CONTENT);
+        
softly.assertThat(messageResult.message().getRight().findAny().isPresent()).isFalse();
+    }
+
+    @Test
+    public void runShouldBeIdempotent() throws Exception {
+        SimpleMailboxMessage originalMessage = createMessage(messageId, 
CONTENT, BODY_START,
+            new PropertyBuilder(), ImmutableList.of());
+        messageDAOV1.save(originalMessage).join();
+
+        testee.run();
+
+        testee.run();
+
+        CassandraMessageDAOV2.MessageResult messageResult = 
retrieveMessageOnV2().get();
+        
softly.assertThat(messageResult.message().getLeft().getMessageId()).isEqualTo(messageId);
+        
softly.assertThat(IOUtils.toString(messageResult.message().getLeft().getContent(),
 Charsets.UTF_8))
+            .isEqualTo(CONTENT);
+        
softly.assertThat(messageResult.message().getRight().findAny().isPresent()).isFalse();
+    }
+
+    @Test
+    public void runShouldSucceedWhenOneMessage() throws Exception {
+        SimpleMailboxMessage originalMessage = createMessage(messageId, 
CONTENT, BODY_START,
+            new PropertyBuilder(), ImmutableList.of());
+        messageDAOV1.save(originalMessage).join();
+
+        assertThat(testee.run()).isTrue();
+    }
+
+    @Test
+    public void runShouldSucceedWhenNoMessages() throws Exception {
+        assertThat(testee.run()).isTrue();
+    }
+
+    @Test
     public void migrationShouldWorkWithAttachments() throws Exception {
         SimpleMailboxMessage originalMessage = createMessage(messageId, 
CONTENT, BODY_START,
             new PropertyBuilder(), ImmutableList.of(messageAttachment));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to