JAMES-2096 Write migration task The migration log allow to count migrated messages and allow to follow the migration process from Kibana
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/48043b4f Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/48043b4f Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/48043b4f Branch: refs/heads/master Commit: 48043b4f2baca6e45ccea3c3aa7ffcb10cff6995 Parents: fb284d3 Author: benwa <[email protected]> Authored: Mon Jul 24 18:07:19 2017 +0700 Committer: benwa <[email protected]> Committed: Tue Jul 25 17:51:15 2017 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageDAO.java | 107 +++++++++++++++++-- .../cassandra/mail/CassandraMessageDAOV2.java | 56 +++++++++- .../mail/migration/V1ToV2Migration.java | 25 ++++- .../mail/migration/V1ToV2MigrationTest.java | 46 ++++++++ 4 files changed, 224 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/48043b4f/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index 77e03d9..78f6372 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -42,6 +42,7 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageV1Table.T import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Date; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -56,6 +57,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.CassandraConfiguration; import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.cassandra.mail.utils.Limit; import org.apache.james.mailbox.cassandra.table.CassandraMessageV1Table.Attachments; @@ -65,6 +67,7 @@ import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.model.Cid; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; @@ -80,6 +83,7 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.UDTValue; import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.google.common.primitives.Bytes; @@ -93,10 +97,13 @@ public class CassandraMessageDAO { private final PreparedStatement selectHeaders; private final PreparedStatement selectFields; private final PreparedStatement selectBody; + private final PreparedStatement selectAll; + private CassandraUtils cassandraUtils; private final CassandraConfiguration cassandraConfiguration; @Inject - public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider, CassandraConfiguration cassandraConfiguration) { + public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider, CassandraConfiguration cassandraConfiguration, + CassandraUtils cassandraUtils) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.typesProvider = typesProvider; this.insert = prepareInsert(session); @@ -106,11 +113,17 @@ public class CassandraMessageDAO { this.selectFields = prepareSelect(session, FIELDS); this.selectBody = prepareSelect(session, BODY); this.cassandraConfiguration = cassandraConfiguration; + this.selectAll = prepareSelectAll(session); + this.cassandraUtils = cassandraUtils; } @VisibleForTesting public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider) { - this(session, typesProvider, CassandraConfiguration.DEFAULT_CONFIGURATION); + this(session, typesProvider, CassandraConfiguration.DEFAULT_CONFIGURATION, CassandraUtils.WITH_DEFAULT_CONFIGURATION); + } + + private PreparedStatement prepareSelectAll(Session session) { + return session.prepare(select().from(TABLE_NAME)); } private PreparedStatement prepareSelect(Session session, String[] fields) { @@ -139,6 +152,13 @@ public class CassandraMessageDAO { .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID)))); } + public Stream<RawMessage> readAll() { + return cassandraUtils.convertToStream( + cassandraAsyncExecutor.execute(selectAll.bind()) + .join()) + .map(this::fromRow); + } + public CompletableFuture<Void> save(MailboxMessage message) throws MailboxException { try { CassandraMessageId messageId = (CassandraMessageId) message.getMessageId(); @@ -223,14 +243,14 @@ public class CassandraMessageDAO { row.getInt(BODY_START_OCTET), buildContent(row, fetchType), messageIdWithMetaData.getFlags(), - getPropertyBuilder(row), + retrievePropertyBuilder(row), messageId.getMailboxId(), messageId.getUid(), messageIdWithMetaData.getModSeq()); - return Pair.of(messageWithoutAttachment, getAttachments(row, fetchType)); + return Pair.of(messageWithoutAttachment, retrieveAttachments(row, fetchType)); } - private PropertyBuilder getPropertyBuilder(Row row) { + private PropertyBuilder retrievePropertyBuilder(Row row) { PropertyBuilder property = new PropertyBuilder( row.getList(PROPERTIES, UDTValue.class).stream() .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE))) @@ -239,7 +259,7 @@ public class CassandraMessageDAO { return property; } - private Stream<MessageAttachmentRepresentation> getAttachments(Row row, FetchType fetchType) { + private Stream<MessageAttachmentRepresentation> retrieveAttachments(Row row, FetchType fetchType) { switch (fetchType) { case Full: case Body: @@ -313,4 +333,79 @@ public class CassandraMessageDAO { row.getBytes(field).get(headerContent); return headerContent; } + + private RawMessage fromRow(Row row) { + return new RawMessage( + row.getTimestamp(INTERNAL_DATE), + new CassandraMessageId.Factory().of(row.getUUID(MESSAGE_ID)), + row.getInt(BODY_START_OCTET), + row.getLong(FULL_CONTENT_OCTETS), + getFieldContent(BODY_CONTENT, row), + getFieldContent(HEADER_CONTENT, row), + retrievePropertyBuilder(row), + row.getLong(TEXTUAL_LINE_COUNT), + retrieveAttachments(row, FetchType.Full).collect(Guavate.toImmutableList())); + } + + public static class RawMessage { + private final Date internalDate; + private final MessageId messageId; + private final int bodyStartOctet; + private final long fullContentOctet; + private final byte[] bodyContent; + private final byte[] headerContent; + private final PropertyBuilder propertyBuilder; + private final long textuaLineCount; + private final List<MessageAttachmentRepresentation> attachments; + + private RawMessage(Date internalDate, MessageId messageId, int bodyStartOctet, long fullContentOctet, byte[] bodyContent, + byte[] headerContent, PropertyBuilder propertyBuilder, long textuaLineCount, + List<MessageAttachmentRepresentation> attachments) { + this.internalDate = internalDate; + this.messageId = messageId; + this.bodyStartOctet = bodyStartOctet; + this.fullContentOctet = fullContentOctet; + this.bodyContent = bodyContent; + this.headerContent = headerContent; + this.propertyBuilder = propertyBuilder; + this.textuaLineCount = textuaLineCount; + this.attachments = attachments; + } + + public Date getInternalDate() { + return internalDate; + } + + public MessageId getMessageId() { + return messageId; + } + + public int getBodyStartOctet() { + return bodyStartOctet; + } + + public long getFullContentOctet() { + return fullContentOctet; + } + + public byte[] getBodyContent() { + return bodyContent; + } + + public byte[] getHeaderContent() { + return headerContent; + } + + public PropertyBuilder getPropertyBuilder() { + return propertyBuilder; + } + + public long getTextuaLineCount() { + return textuaLineCount; + } + + public List<MessageAttachmentRepresentation> getAttachments() { + return attachments; + } + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/48043b4f/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java index c6817ff..6664028 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java @@ -69,6 +69,7 @@ import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.MessageAttachment; import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; import org.apache.james.mailbox.store.mail.model.MailboxMessage; +import org.apache.james.mailbox.store.mail.model.Property; import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty; import org.apache.james.util.CompletableFutureUtil; @@ -189,6 +190,49 @@ public class CassandraMessageDAOV2 { .collect(Guavate.toImmutableList()); } + private List<UDTValue> buildPropertiesUdt(List<Property> properties) { + return properties.stream() + .map(property -> typesProvider.getDefinedUserType(PROPERTIES) + .newValue() + .setString(Properties.NAMESPACE, property.getNamespace()) + .setString(Properties.NAME, property.getLocalName()) + .setString(Properties.VALUE, property.getValue())) + .collect(Guavate.toImmutableList()); + } + + private UDTValue toUDT(MessageAttachment messageAttachment) { + return typesProvider.getDefinedUserType(ATTACHMENTS) + .newValue() + .setString(Attachments.ID, messageAttachment.getAttachmentId().getId()) + .setString(Attachments.NAME, messageAttachment.getName().orNull()) + .setString(Attachments.CID, messageAttachment.getCid().transform(Cid::getValue).orNull()) + .setBool(Attachments.IS_INLINE, messageAttachment.isInline()); + } + + public CompletableFuture<Void> save(CassandraMessageDAO.RawMessage rawMessage) { + return CompletableFutureUtil.combine( + blobsDAO.save(rawMessage.getHeaderContent()), + blobsDAO.save(rawMessage.getBodyContent()), + Pair::of) + .thenCompose(pair -> + cassandraAsyncExecutor.executeVoid(boundWriteStatement(rawMessage, pair))); + } + + private BoundStatement boundWriteStatement(CassandraMessageDAO.RawMessage message, Pair<Optional<BlobId>, Optional<BlobId>> pair) { + CassandraMessageId messageId = (CassandraMessageId) message.getMessageId(); + return insert.bind() + .setUUID(MESSAGE_ID, messageId.get()) + .setTimestamp(INTERNAL_DATE, message.getInternalDate()) + .setInt(BODY_START_OCTET, message.getBodyStartOctet()) + .setLong(FULL_CONTENT_OCTETS, message.getFullContentOctet()) + .setLong(BODY_OCTECTS, message.getFullContentOctet() - message.getBodyStartOctet()) + .setString(BODY_CONTENT, pair.getRight().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE)) + .setString(HEADER_CONTENT, pair.getLeft().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE)) + .setLong(TEXTUAL_LINE_COUNT, message.getTextuaLineCount()) + .setList(PROPERTIES, buildPropertiesUdt(message.getPropertyBuilder().toProperties())) + .setList(ATTACHMENTS, buildAttachmentUdt(message)); + } + private List<UDTValue> buildPropertiesUdt(MailboxMessage message) { return message.getProperties().stream() .map(x -> typesProvider.getDefinedUserType(PROPERTIES) @@ -199,12 +243,18 @@ public class CassandraMessageDAOV2 { .collect(Guavate.toImmutableList()); } - private UDTValue toUDT(MessageAttachment messageAttachment) { + private ImmutableList<UDTValue> buildAttachmentUdt(CassandraMessageDAO.RawMessage message) { + return message.getAttachments().stream() + .map(this::toUDT) + .collect(Guavate.toImmutableList()); + } + + private UDTValue toUDT(MessageAttachmentRepresentation messageAttachment) { return typesProvider.getDefinedUserType(ATTACHMENTS) .newValue() .setString(Attachments.ID, messageAttachment.getAttachmentId().getId()) - .setString(Attachments.NAME, messageAttachment.getName().orNull()) - .setString(Attachments.CID, messageAttachment.getCid().transform(Cid::getValue).orNull()) + .setString(Attachments.NAME, messageAttachment.getName().orElse(null)) + .setString(Attachments.CID, messageAttachment.getCid().map(Cid::getValue).orElse(null)) .setBool(Attachments.IS_INLINE, messageAttachment.isInline()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/48043b4f/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java index d3a8cb6..0e1885c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java @@ -31,6 +31,7 @@ import javax.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.CassandraConfiguration; +import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.cassandra.mail.AttachmentLoader; import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMapper; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO; @@ -50,6 +51,7 @@ public class V1ToV2Migration implements Migration { private static final Logger LOGGER = LoggerFactory.getLogger(V1ToV2MigrationThread.class); private final CassandraMessageDAO messageDAOV1; + private final CassandraMessageDAOV2 messageDAOV2; private final AttachmentLoader attachmentLoader; private final CassandraConfiguration cassandraConfiguration; private final ExecutorService migrationExecutor; @@ -59,6 +61,7 @@ public class V1ToV2Migration implements Migration { public V1ToV2Migration(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, CassandraAttachmentMapper attachmentMapper, CassandraConfiguration cassandraConfiguration) { this.messageDAOV1 = messageDAOV1; + this.messageDAOV2 = messageDAOV2; this.attachmentLoader = new AttachmentLoader(attachmentMapper); this.cassandraConfiguration = cassandraConfiguration; this.migrationExecutor = Executors.newFixedThreadPool(cassandraConfiguration.getV1ToV2ThreadCount()); @@ -107,6 +110,26 @@ public class V1ToV2Migration implements Migration { @Override public boolean run() { - return false; + return messageDAOV1.readAll() + .map(this::migrate) + .reduce(true, (b1, b2) -> b1 && b2); + } + + private boolean migrate(CassandraMessageDAO.RawMessage rawMessage) { + try { + CassandraMessageId messageId = (CassandraMessageId) rawMessage.getMessageId(); + + messageDAOV2.save(rawMessage) + .thenCompose(any -> messageDAOV1.delete(messageId)) + .join(); + + LOGGER.debug("{} migrated", rawMessage.getMessageId()); + + return true; + } catch (Exception e) { + LOGGER.warn("Error while migrating {}", rawMessage.getMessageId(), e); + + return false; + } } } http://git-wip-us.apache.org/repos/asf/james-project/blob/48043b4f/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java index 57ec014..dff0aad 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java @@ -166,6 +166,52 @@ public class V1ToV2MigrationTest { } @Test + public void runShouldMigrateMessages() throws Exception { + SimpleMailboxMessage originalMessage = createMessage(messageId, CONTENT, BODY_START, + new PropertyBuilder(), ImmutableList.of()); + messageDAOV1.save(originalMessage).join(); + + testee.run(); + + CassandraMessageDAOV2.MessageResult messageResult = retrieveMessageOnV2().get(); + softly.assertThat(messageResult.message().getLeft().getMessageId()).isEqualTo(messageId); + softly.assertThat(IOUtils.toString(messageResult.message().getLeft().getContent(), Charsets.UTF_8)) + .isEqualTo(CONTENT); + softly.assertThat(messageResult.message().getRight().findAny().isPresent()).isFalse(); + } + + @Test + public void runShouldBeIdempotent() throws Exception { + SimpleMailboxMessage originalMessage = createMessage(messageId, CONTENT, BODY_START, + new PropertyBuilder(), ImmutableList.of()); + messageDAOV1.save(originalMessage).join(); + + testee.run(); + + testee.run(); + + CassandraMessageDAOV2.MessageResult messageResult = retrieveMessageOnV2().get(); + softly.assertThat(messageResult.message().getLeft().getMessageId()).isEqualTo(messageId); + softly.assertThat(IOUtils.toString(messageResult.message().getLeft().getContent(), Charsets.UTF_8)) + .isEqualTo(CONTENT); + softly.assertThat(messageResult.message().getRight().findAny().isPresent()).isFalse(); + } + + @Test + public void runShouldSucceedWhenOneMessage() throws Exception { + SimpleMailboxMessage originalMessage = createMessage(messageId, CONTENT, BODY_START, + new PropertyBuilder(), ImmutableList.of()); + messageDAOV1.save(originalMessage).join(); + + assertThat(testee.run()).isTrue(); + } + + @Test + public void runShouldSucceedWhenNoMessages() throws Exception { + assertThat(testee.run()).isTrue(); + } + + @Test public void migrationShouldWorkWithAttachments() throws Exception { SimpleMailboxMessage originalMessage = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder(), ImmutableList.of(messageAttachment)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
