This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5ab6e6d765a51193254ca1aab1d36a49d0279d0d Author: Benoit Tellier <[email protected]> AuthorDate: Tue Oct 20 16:54:33 2020 +0700 JAMES-3430 Provide migration for MessageV3 table --- .../versions/CassandraSchemaVersionManager.java | 2 +- .../cassandra/mail/CassandraMessageDAO.java | 33 +++++ .../cassandra/mail/CassandraMessageDAOV3.java | 43 ++++++ .../cassandra/mail/MessageRepresentation.java | 20 ++- .../mail/migration/MessageV3Migration.java | 119 ++++++++++++++++ ...ageV3MigrationTaskAdditionalInformationDTO.java | 69 +++++++++ .../mail/migration/MessageV3MigrationTaskDTO.java | 59 ++++++++ .../cassandra/mail/CassandraMessageDAOTest.java | 1 + .../MessageV3MigrationTaskSerializationTest.java | 52 +++++++ .../mail/migration/MessageV3MigrationTest.java | 157 +++++++++++++++++++++ .../mailbox/store/mail/model/impl/Properties.java | 17 +++ .../modules/webadmin/CassandraRoutesModule.java | 3 + 12 files changed, 570 insertions(+), 5 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java index e38037a..29dae94 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java @@ -36,7 +36,7 @@ import reactor.core.publisher.Mono; public class CassandraSchemaVersionManager { public static final SchemaVersion MIN_VERSION = new SchemaVersion(5); - public static final SchemaVersion MAX_VERSION = new SchemaVersion(8); + public static final SchemaVersion MAX_VERSION = new SchemaVersion(9); public static final SchemaVersion DEFAULT_VERSION = MIN_VERSION; private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemaVersionManager.class); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index 632af57..8d97ff5 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -78,6 +78,7 @@ import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Bytes; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -92,7 +93,9 @@ public class CassandraMessageDAO { private final PreparedStatement insert; private final PreparedStatement delete; private final PreparedStatement select; + private final PreparedStatement selectAll; private final Cid.CidParser cidParser; + private final CassandraMessageId.Factory messageIdFactory; private final ConsistencyLevel consistencyLevel; @Inject @@ -100,8 +103,10 @@ public class CassandraMessageDAO { CassandraTypesProvider typesProvider, BlobStore blobStore, BlobId.Factory blobIdFactory, + CassandraMessageId.Factory messageIdFactory, CassandraConsistenciesConfiguration consistenciesConfiguration) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); + this.messageIdFactory = messageIdFactory; this.consistencyLevel = consistenciesConfiguration.getRegular(); this.typesProvider = typesProvider; this.blobStore = blobStore; @@ -110,6 +115,7 @@ public class CassandraMessageDAO { this.insert = prepareInsert(session); this.delete = prepareDelete(session); this.select = prepareSelect(session); + this.selectAll = prepareSelectAll(session); this.cidParser = Cid.parser().relaxed(); } @@ -119,6 +125,11 @@ public class CassandraMessageDAO { .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID)))); } + private PreparedStatement prepareSelectAll(Session session) { + return session.prepare(select() + .from(TABLE_NAME)); + } + private PreparedStatement prepareInsert(Session session) { return session.prepare(insertInto(TABLE_NAME) .value(MESSAGE_ID, bindMarker(MESSAGE_ID)) @@ -139,6 +150,11 @@ public class CassandraMessageDAO { .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID)))); } + public Flux<MessageRepresentation> list() { + return cassandraAsyncExecutor.executeRows(selectAll.bind()) + .map(this::message); + } + public Mono<Void> save(MailboxMessage message) throws MailboxException { return saveContent(message) .flatMap(pair -> cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair))); @@ -244,6 +260,23 @@ public class CassandraMessageDAO { bodyId)); } + private MessageRepresentation message(Row row) { + BlobId headerId = retrieveBlobId(HEADER_CONTENT, row); + BlobId bodyId = retrieveBlobId(BODY_CONTENT, row); + CassandraMessageId messageId = messageIdFactory.of(row.getUUID(MESSAGE_ID)); + + return new MessageRepresentation( + messageId, + row.getTimestamp(INTERNAL_DATE), + row.getLong(FULL_CONTENT_OCTETS), + row.getInt(BODY_START_OCTET), + new SharedByteArrayInputStream(EMPTY_BYTE_ARRAY), + getProperties(row), + getAttachments(row).collect(Guavate.toImmutableList()), + headerId, + bodyId); + } + private org.apache.james.mailbox.store.mail.model.impl.Properties getProperties(Row row) { PropertyBuilder property = new PropertyBuilder( row.getList(PROPERTIES, UDTValue.class).stream() diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java index 2c6b5ae..8f349da 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java @@ -159,6 +159,31 @@ public class CassandraMessageDAOV3 { .flatMap(pair -> cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair))); } + public Mono<Void> save(MessageRepresentation message) { + CassandraMessageId messageId = (CassandraMessageId) message.getMessageId(); + return cassandraAsyncExecutor.executeVoid(insert.bind() + .setUUID(MESSAGE_ID, messageId.get()) + .setTimestamp(INTERNAL_DATE, message.getInternalDate()) + .setInt(BODY_START_OCTET, message.getBodyStartOctet()) + .setLong(FULL_CONTENT_OCTETS, message.getSize()) + .setLong(BODY_OCTECTS, message.getSize() - message.getBodyStartOctet()) + .setString(BODY_CONTENT, message.getBodyId().asString()) + .setString(HEADER_CONTENT, message.getHeaderId().asString()) + .setLong(TEXTUAL_LINE_COUNT, Optional.ofNullable(message.getProperties().getTextualLineCount()).orElse(DEFAULT_LONG_VALUE)) + .setString(CONTENT_DESCRIPTION, message.getProperties().getContentDescription()) + .setString(CONTENT_DISPOSITION_TYPE, message.getProperties().getContentDispositionType()) + .setString(MEDIA_TYPE, message.getProperties().getMediaType()) + .setString(SUB_TYPE, message.getProperties().getSubType()) + .setString(CONTENT_ID, message.getProperties().getContentID()) + .setString(CONTENT_MD5, message.getProperties().getContentMD5()) + .setString(CONTENT_TRANSFER_ENCODING, message.getProperties().getContentTransferEncoding()) + .setString(CONTENT_LOCATION, message.getProperties().getContentLocation()) + .setList(CONTENT_LANGUAGE, message.getProperties().getContentLanguage()) + .setMap(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters()) + .setMap(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters()) + .setList(ATTACHMENTS, buildAttachmentUdt(message.getAttachments()))); + } + private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) throws MailboxException { try { byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent()); @@ -204,6 +229,12 @@ public class CassandraMessageDAOV3 { .collect(Guavate.toImmutableList()); } + private ImmutableList<UDTValue> buildAttachmentUdt(List<MessageAttachmentRepresentation> attachments) { + return attachments.stream() + .map(this::toUDT) + .collect(Guavate.toImmutableList()); + } + private UDTValue toUDT(MessageAttachmentMetadata messageAttachment) { UDTValue result = typesProvider.getDefinedUserType(ATTACHMENTS) .newValue() @@ -216,6 +247,18 @@ public class CassandraMessageDAOV3 { return result; } + private UDTValue toUDT(MessageAttachmentRepresentation messageAttachment) { + UDTValue result = typesProvider.getDefinedUserType(ATTACHMENTS) + .newValue() + .setString(Attachments.ID, messageAttachment.getAttachmentId().getId()) + .setBool(Attachments.IS_INLINE, messageAttachment.isInline()); + messageAttachment.getName() + .ifPresent(name -> result.setString(Attachments.NAME, name)); + messageAttachment.getCid() + .ifPresent(cid -> result.setString(Attachments.CID, cid.getValue())); + return result; + } + public Mono<MessageRepresentation> retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) { CassandraMessageId cassandraMessageId = (CassandraMessageId) id.getComposedMessageId().getMessageId(); return retrieveMessage(cassandraMessageId, fetchType); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java index 19fd945..06274e5 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java @@ -35,19 +35,19 @@ public class MessageRepresentation { private final MessageId messageId; private final Date internalDate; private final Long size; - private final Integer bodySize; + private final Integer bodyStartOctet; private final SharedByteArrayInputStream content; private final Properties properties; private final List<MessageAttachmentRepresentation> attachments; private final BlobId headerId; private final BlobId bodyId; - public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodySize, SharedByteArrayInputStream content, + public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodyStartOctet, SharedByteArrayInputStream content, Properties properties, List<MessageAttachmentRepresentation> attachments, BlobId headerId, BlobId bodyId) { this.messageId = messageId; this.internalDate = internalDate; this.size = size; - this.bodySize = bodySize; + this.bodyStartOctet = bodyStartOctet; this.content = content; this.properties = properties; this.attachments = attachments; @@ -62,7 +62,7 @@ public class MessageRepresentation { .uid(metadata.getComposedMessageId().getUid()) .modseq(metadata.getModSeq()) .internalDate(internalDate) - .bodyStartOctet(bodySize) + .bodyStartOctet(bodyStartOctet) .size(size) .content(content) .flags(metadata.getFlags()) @@ -71,6 +71,18 @@ public class MessageRepresentation { .build(); } + public Date getInternalDate() { + return internalDate; + } + + public Long getSize() { + return size; + } + + public Integer getBodyStartOctet() { + return bodyStartOctet; + } + public MessageId getMessageId() { return messageId; } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java new file mode 100644 index 0000000..2c37c4c --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java @@ -0,0 +1,119 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.migration; + +import java.time.Clock; +import java.time.Instant; +import java.util.Optional; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.migration.Migration; +import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; +import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO; +import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3; +import org.apache.james.mailbox.cassandra.mail.MessageRepresentation; +import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; +import org.apache.james.task.TaskType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Mono; + +public class MessageV3Migration implements Migration { + private static final int CONCURRENCY = 50; + + static class MessageV3MigrationTask implements Task { + private final MessageV3Migration migration; + + MessageV3MigrationTask(MessageV3Migration migration) { + this.migration = migration; + } + + @Override + public Result run() throws InterruptedException { + return migration.runTask(); + } + + @Override + public TaskType type() { + return TYPE; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(migration.getAdditionalInformation()); + } + } + + public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { + private final Instant timestamp; + + public AdditionalInformation(Instant timestamp) { + this.timestamp = timestamp; + } + + @Override + public Instant timestamp() { + return timestamp; + } + } + + public static final Logger LOGGER = LoggerFactory.getLogger(MessageV3Migration.class); + public static final TaskType TYPE = TaskType.of("cassandra-message-v3-migration"); + private final CassandraMessageDAO daoV2; + private final CassandraMessageDAOV3 daoV3; + + @Inject + public MessageV3Migration(CassandraMessageDAO daoV2, CassandraMessageDAOV3 daoV3) { + this.daoV2 = daoV2; + this.daoV3 = daoV3; + } + + @Override + public void apply() { + daoV2.list() + .flatMap(this::migrate, CONCURRENCY) + .doOnError(t -> LOGGER.error("Error while performing migration", t)) + .blockLast(); + } + + private Mono<Void> migrate(MessageRepresentation messageRepresentation) { + return daoV3.save(messageRepresentation) + .then(daoV2.delete((CassandraMessageId) messageRepresentation.getMessageId())) + .onErrorResume(error -> handleErrorMigrate(messageRepresentation, error)) + .then(); + } + + private Mono<Void> handleErrorMigrate(MessageRepresentation messageRepresentation, Throwable throwable) { + LOGGER.error("Error while performing migration for {}", messageRepresentation.getMessageId(), throwable); + return Mono.empty(); + } + + @Override + public Task asTask() { + return new MessageV3MigrationTask(this); + } + + AdditionalInformation getAdditionalInformation() { + return new AdditionalInformation(Clock.systemUTC().instant()); + } +} diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskAdditionalInformationDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskAdditionalInformationDTO.java new file mode 100644 index 0000000..c5609cc --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskAdditionalInformationDTO.java @@ -0,0 +1,69 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.migration; + +import java.time.Instant; + +import org.apache.james.json.DTOModule; +import org.apache.james.server.task.json.dto.AdditionalInformationDTO; +import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class MessageV3MigrationTaskAdditionalInformationDTO implements AdditionalInformationDTO { + private static MessageV3MigrationTaskAdditionalInformationDTO fromDomainObject(MessageV3Migration.AdditionalInformation additionalInformation, String type) { + return new MessageV3MigrationTaskAdditionalInformationDTO( + type, + additionalInformation.timestamp() + ); + } + + public static final AdditionalInformationDTOModule<MessageV3Migration.AdditionalInformation, MessageV3MigrationTaskAdditionalInformationDTO> MODULE = + DTOModule + .forDomainObject(MessageV3Migration.AdditionalInformation.class) + .convertToDTO(MessageV3MigrationTaskAdditionalInformationDTO.class) + .toDomainObjectConverter(MessageV3MigrationTaskAdditionalInformationDTO::toDomainObject) + .toDTOConverter(MessageV3MigrationTaskAdditionalInformationDTO::fromDomainObject) + .typeName(MessageV3Migration.TYPE.asString()) + .withFactory(AdditionalInformationDTOModule::new); + + private final String type; + private final Instant timestamp; + + public MessageV3MigrationTaskAdditionalInformationDTO(@JsonProperty("type") String type, + @JsonProperty("timestamp") Instant timestamp) { + this.type = type; + this.timestamp = timestamp; + } + + @Override + public Instant getTimestamp() { + return timestamp; + } + + @Override + public String getType() { + return type; + } + + private MessageV3Migration.AdditionalInformation toDomainObject() { + return new MessageV3Migration.AdditionalInformation(timestamp); + } +} diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskDTO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskDTO.java new file mode 100644 index 0000000..87a1fec --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskDTO.java @@ -0,0 +1,59 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.migration; + +import java.util.function.Function; + +import org.apache.james.json.DTOModule; +import org.apache.james.server.task.json.dto.TaskDTO; +import org.apache.james.server.task.json.dto.TaskDTOModule; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class MessageV3MigrationTaskDTO implements TaskDTO { + + private static MessageV3MigrationTaskDTO fromDomainObject(MessageV3Migration.MessageV3MigrationTask task, String type) { + return new MessageV3MigrationTaskDTO(type); + } + + public static final Function<MessageV3Migration, TaskDTOModule<MessageV3Migration.MessageV3MigrationTask, MessageV3MigrationTaskDTO>> MODULE = (migration) -> + DTOModule + .forDomainObject(MessageV3Migration.MessageV3MigrationTask.class) + .convertToDTO(MessageV3MigrationTaskDTO.class) + .toDomainObjectConverter(dto -> dto.toDomainObject(migration)) + .toDTOConverter(MessageV3MigrationTaskDTO::fromDomainObject) + .typeName(MessageV3Migration.TYPE.asString()) + .withFactory(TaskDTOModule::new); + + private final String type; + + public MessageV3MigrationTaskDTO(@JsonProperty("type") String type) { + this.type = type; + } + + @Override + public String getType() { + return type; + } + + private MessageV3Migration.MessageV3MigrationTask toDomainObject(MessageV3Migration migration) { + return new MessageV3Migration.MessageV3MigrationTask(migration); + } +} diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java index bb52781..5f044db 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java @@ -94,6 +94,7 @@ class CassandraMessageDAOTest { cassandra.getTypesProvider(), blobStore, blobIdFactory, + messageIdFactory, cassandraCluster.getCassandraConsistenciesConfiguration()); messageIdWithMetadata = ComposedMessageIdWithMetaData.builder() diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskSerializationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskSerializationTest.java new file mode 100644 index 0000000..1799543 --- /dev/null +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTaskSerializationTest.java @@ -0,0 +1,52 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.migration; + +import static org.mockito.Mockito.mock; + +import java.time.Instant; + +import org.apache.james.JsonSerializationVerifier; +import org.junit.jupiter.api.Test; + +class MessageV3MigrationTaskSerializationTest { + private static final Instant TIMESTAMP = Instant.parse("2018-11-13T12:00:55Z"); + private static final MessageV3Migration MIGRATION = mock(MessageV3Migration.class); + private static final MessageV3Migration.MessageV3MigrationTask TASK = new MessageV3Migration.MessageV3MigrationTask(MIGRATION); + private static final String SERIALIZED_TASK = "{\"type\": \"cassandra-message-v3-migration\"}"; + private static final MessageV3Migration.AdditionalInformation DETAILS = new MessageV3Migration.AdditionalInformation(TIMESTAMP); + private static final String SERIALIZED_ADDITIONAL_INFORMATION = "{\"type\": \"cassandra-message-v3-migration\", \"timestamp\":\"2018-11-13T12:00:55Z\"}"; + + @Test + void taskShouldBeSerializable() throws Exception { + JsonSerializationVerifier.dtoModule(MessageV3MigrationTaskDTO.MODULE.apply(MIGRATION)) + .bean(TASK) + .json(SERIALIZED_TASK) + .verify(); + } + + @Test + void additionalInformationShouldBeSerializable() throws Exception { + JsonSerializationVerifier.dtoModule(MessageV3MigrationTaskAdditionalInformationDTO.MODULE) + .bean(DETAILS) + .json(SERIALIZED_ADDITIONAL_INFORMATION) + .verify(); + } +} diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java new file mode 100644 index 0000000..51d9d31 --- /dev/null +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3MigrationTest.java @@ -0,0 +1,157 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.migration; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.List; + +import javax.mail.Flags; +import javax.mail.util.SharedByteArrayInputStream; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.components.CassandraModule; +import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.cassandra.CassandraBlobModule; +import org.apache.james.blob.cassandra.CassandraBlobStoreFactory; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.cassandra.ids.CassandraId; +import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; +import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO; +import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3; +import org.apache.james.mailbox.cassandra.mail.MessageRepresentation; +import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule; +import org.apache.james.mailbox.model.MessageAttachmentMetadata; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; +import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.assertj.core.api.SoftAssertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.google.common.collect.ImmutableList; + +class MessageV3MigrationTest { + private static final int BODY_START = 16; + private static final CassandraId MAILBOX_ID = CassandraId.timeBased(); + private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n"; + private static final MessageUid messageUid = MessageUid.of(1); + private static final List<MessageAttachmentMetadata> NO_ATTACHMENT = ImmutableList.of(); + + public static final CassandraModule MODULES = CassandraModule.aggregateModules( + CassandraMessageModule.MODULE, + CassandraBlobModule.MODULE, + CassandraSchemaVersionModule.MODULE); + + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULES); + + private CassandraMessageDAO daoV2; + private CassandraMessageDAOV3 daoV3; + private CassandraMessageId.Factory messageIdFactory; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf()) + .passthrough(); + HashBlobId.Factory blobIdFactory = new HashBlobId.Factory(); + daoV2 = new CassandraMessageDAO( + cassandra.getConf(), + cassandra.getTypesProvider(), + blobStore, + blobIdFactory, + new CassandraMessageId.Factory(), + cassandraCluster.getCassandraConsistenciesConfiguration()); + daoV3 = new CassandraMessageDAOV3( + cassandra.getConf(), + cassandra.getTypesProvider(), + blobStore, + blobIdFactory, + cassandraCluster.getCassandraConsistenciesConfiguration()); + messageIdFactory = new CassandraMessageId.Factory(); + } + + @Test + void migrationTaskShouldMoveDataToMostRecentDao() throws Exception{ + SimpleMailboxMessage message1 = createMessage(messageIdFactory.generate()); + SimpleMailboxMessage message2 = createMessage(messageIdFactory.generate()); + SimpleMailboxMessage message3 = createMessage(messageIdFactory.generate()); + SimpleMailboxMessage message4 = createMessage(messageIdFactory.generate()); + + daoV2.save(message1).block(); + daoV2.save(message2).block(); + daoV2.save(message3).block(); + daoV2.save(message4).block(); + + new MessageV3Migration(daoV2, daoV3).apply(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message1.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId()) + .isEqualTo(message1.getMessageId()); + softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message2.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId()) + .isEqualTo(message2.getMessageId()); + softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message3.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId()) + .isEqualTo(message3.getMessageId()); + softly.assertThat(daoV3.retrieveMessage((CassandraMessageId) message4.getMessageId(), MessageMapper.FetchType.Metadata).block().getMessageId()) + .isEqualTo(message4.getMessageId()); + + softly.assertThat(daoV2.list().collectList().block()).isEmpty(); + }); + } + + @Test + void migrationTaskShouldPreserveMessageContent() throws Exception{ + SimpleMailboxMessage message1 = createMessage(messageIdFactory.generate()); + daoV2.save(message1).block(); + MessageRepresentation original = daoV2.retrieveMessage((CassandraMessageId) message1.getMessageId(), MessageMapper.FetchType.Metadata).block(); + + new MessageV3Migration(daoV2, daoV3).apply(); + MessageRepresentation migrated = daoV3.retrieveMessage((CassandraMessageId) message1.getMessageId(), MessageMapper.FetchType.Metadata).block(); + + int start = 0; + int end = -1; + assertThat(migrated).isEqualToComparingOnlyGivenFields(original, "messageId", + "internalDate", "size", "bodyStartOctet", "properties", "attachments", "headerId", "bodyId"); + assertThat(migrated.getContent().newStream(start, end)) + .hasSameContentAs(original.getContent().newStream(start, end)); + } + + private SimpleMailboxMessage createMessage(MessageId messageId) { + return SimpleMailboxMessage.builder() + .messageId(messageId) + .mailboxId(MAILBOX_ID) + .uid(messageUid) + .internalDate(new Date()) + .bodyStartOctet(MessageV3MigrationTest.BODY_START) + .size(MessageV3MigrationTest.CONTENT.length()) + .content(new SharedByteArrayInputStream(MessageV3MigrationTest.CONTENT.getBytes(StandardCharsets.UTF_8))) + .flags(new Flags()) + .properties(new PropertyBuilder().build()) + .addAttachments(NO_ATTACHMENT) + .build(); + } +} \ No newline at end of file diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java index 5754b71..6b63d07 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/Properties.java @@ -41,6 +41,7 @@ import static org.apache.james.mailbox.store.mail.model.StandardNames.MIME_SUB_T import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; @@ -222,6 +223,22 @@ public class Properties { return new ArrayList<>(properties); } + @Override + public final boolean equals(Object o) { + if (o instanceof Properties) { + Properties that = (Properties) o; + + return Objects.equals(this.textualLineCount, that.textualLineCount) + && Objects.equals(this.properties, that.properties); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(textualLineCount, properties); + } + /** * Constructs a <code>String</code> with all attributes * in name = value format. diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java index 90e8911..ff0f783 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/webadmin/CassandraRoutesModule.java @@ -27,6 +27,7 @@ import org.apache.james.backends.cassandra.versions.SchemaTransition; import org.apache.james.backends.cassandra.versions.SchemaVersion; import org.apache.james.mailbox.cassandra.mail.migration.MailboxPathV2Migration; import org.apache.james.mailbox.cassandra.mail.migration.MailboxPathV3Migration; +import org.apache.james.mailbox.cassandra.mail.migration.MessageV3Migration; import org.apache.james.rrt.cassandra.migration.MappingsSourcesMigration; import org.apache.james.webadmin.Routes; import org.apache.james.webadmin.routes.CassandraMailboxMergingRoutes; @@ -42,6 +43,7 @@ public class CassandraRoutesModule extends AbstractModule { private static final SchemaTransition FROM_V5_TO_V6 = SchemaTransition.to(new SchemaVersion(6)); private static final SchemaTransition FROM_V6_TO_V7 = SchemaTransition.to(new SchemaVersion(7)); private static final SchemaTransition FROM_V7_TO_V8 = SchemaTransition.to(new SchemaVersion(8)); + private static final SchemaTransition FROM_V8_TO_V9 = SchemaTransition.to(new SchemaVersion(9)); @Override protected void configure() { @@ -60,6 +62,7 @@ public class CassandraRoutesModule extends AbstractModule { allMigrationClazzBinder.addBinding(FROM_V5_TO_V6).to(MailboxPathV2Migration.class); allMigrationClazzBinder.addBinding(FROM_V6_TO_V7).to(MappingsSourcesMigration.class); allMigrationClazzBinder.addBinding(FROM_V7_TO_V8).to(MailboxPathV3Migration.class); + allMigrationClazzBinder.addBinding(FROM_V8_TO_V9).to(MessageV3Migration.class); bind(SchemaVersion.class) .annotatedWith(Names.named(CassandraMigrationService.LATEST_VERSION)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
