This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit cf243e0f46c721eb0798bd756e03fdfa546b88b8 Author: Benoit Tellier <[email protected]> AuthorDate: Sun Apr 12 11:03:33 2020 +0700 [REFACTORING] Simplify Cassandra Message POJO MessageResult POJO is confusing as it holds the same name than a store POJO. It's concept "not found" can efficiently be represented with reactor emptiness. Thus we can "push" the attachmentRepresentation in the MessageRepresentation type to keep a single unified POJO instead of a Pair, simplifying signature of the corresponding methods. --- .../mailbox/cassandra/mail/AttachmentLoader.java | 7 +-- .../cassandra/mail/CassandraMessageDAO.java | 69 ++++++---------------- .../cassandra/mail/CassandraMessageIdMapper.java | 2 - .../cassandra/mail/CassandraMessageMapper.java | 8 +-- ...tAttachment.java => MessageRepresentation.java} | 14 +++-- .../cassandra/mail/CassandraMessageDAOTest.java | 15 ++--- 6 files changed, 39 insertions(+), 76 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java index ab281d0..c458fae 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java @@ -21,7 +21,6 @@ package org.apache.james.mailbox.cassandra.mail; import java.util.List; import java.util.stream.Stream; -import org.apache.commons.lang3.tuple.Pair; import org.apache.james.mailbox.model.Attachment; import org.apache.james.mailbox.model.MessageAttachment; import org.apache.james.mailbox.store.mail.MessageMapper; @@ -42,9 +41,9 @@ public class AttachmentLoader { this.attachmentMapper = attachmentMapper; } - public Mono<MailboxMessage> addAttachmentToMessage(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageRepresentation, MessageMapper.FetchType fetchType) { - return loadAttachments(messageRepresentation.getRight(), fetchType) - .map(attachments -> messageRepresentation.getLeft().toMailboxMessage(attachments)); + public Mono<MailboxMessage> addAttachmentToMessage(MessageRepresentation messageRepresentation, MessageMapper.FetchType fetchType) { + return loadAttachments(messageRepresentation.getAttachments().stream(), fetchType) + .map(messageRepresentation::toMailboxMessage); } private Mono<List<MessageAttachment>> loadAttachments(Stream<MessageAttachmentRepresentation> messageAttachmentRepresentations, MessageMapper.FetchType fetchType) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index d861ab9..fc92869 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -54,7 +54,6 @@ import javax.inject.Inject; import javax.mail.util.SharedByteArrayInputStream; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; @@ -237,12 +236,12 @@ public class CassandraMessageDAO { .collect(Guavate.toImmutableList()); } - public Mono<MessageResult> retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) { + public Mono<MessageRepresentation> retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) { return retrieveRow(id, fetchType) .flatMap(resultSet -> message(resultSet, id, fetchType)); } - public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { + public Flux<MessageRepresentation> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct())) .publishOn(Schedulers.elastic()) .flatMap(id -> retrieveMessage(id, fetchType), configuration.getMessageReadChunkSize()); @@ -257,31 +256,29 @@ public class CassandraMessageDAO { .setConsistencyLevel(QUORUM)); } - private Mono<MessageResult> + private Mono<MessageRepresentation> message(ResultSet rows,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) { ComposedMessageId messageId = messageIdWithMetaData.getComposedMessageId(); if (rows.isExhausted()) { - return Mono.just(notFound(messageIdWithMetaData)); + return Mono.empty(); } Row row = rows.one(); - return buildContentRetriever(fetchType, row).map(content -> { - MessageWithoutAttachment messageWithoutAttachment = - new MessageWithoutAttachment( - messageId.getMessageId(), - row.getTimestamp(INTERNAL_DATE), - row.getLong(FULL_CONTENT_OCTETS), - row.getInt(BODY_START_OCTET), - new SharedByteArrayInputStream(content), - messageIdWithMetaData.getFlags(), - getPropertyBuilder(row), - messageId.getMailboxId(), - messageId.getUid(), - messageIdWithMetaData.getModSeq(), - hasAttachment(row)); - return found(Pair.of(messageWithoutAttachment, getAttachments(row))); - }); + return buildContentRetriever(fetchType, row).map(content -> + new MessageRepresentation( + messageId.getMessageId(), + row.getTimestamp(INTERNAL_DATE), + row.getLong(FULL_CONTENT_OCTETS), + row.getInt(BODY_START_OCTET), + new SharedByteArrayInputStream(content), + messageIdWithMetaData.getFlags(), + getPropertyBuilder(row), + messageId.getMailboxId(), + messageId.getUid(), + messageIdWithMetaData.getModSeq(), + hasAttachment(row), + getAttachments(row).collect(Guavate.toImmutableList()))); } private PropertyBuilder getPropertyBuilder(Row row) { @@ -374,36 +371,6 @@ public class CassandraMessageDAO { return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobIdFactory.from(row.getString(field)))); } - public static MessageResult notFound(ComposedMessageIdWithMetaData id) { - return new MessageResult(id, Optional.empty()); - } - - public static MessageResult found(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message) { - return new MessageResult(message.getLeft().getMetadata(), Optional.of(message)); - } - - public static class MessageResult { - private final ComposedMessageIdWithMetaData metaData; - private final Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message; - - public MessageResult(ComposedMessageIdWithMetaData metaData, Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message) { - this.metaData = metaData; - this.message = message; - } - - public ComposedMessageIdWithMetaData getMetadata() { - return metaData; - } - - public boolean isFound() { - return message.isPresent(); - } - - public Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message() { - return message.get(); - } - } - public Flux<MessageIdAttachmentIds> retrieveAllMessageIdAttachmentIds() { return cassandraAsyncExecutor.executeRows( selectAllMessagesWithAttachment.bind() diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 8e07fda..26caba1 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -96,8 +96,6 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .publishOn(Schedulers.elastic()) .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize()) .flatMap(composedMessageId -> messageDAO.retrieveMessage(composedMessageId, fetchType), cassandraConfiguration.getMessageReadChunkSize()) - .filter(CassandraMessageDAO.MessageResult::isFound) - .map(CassandraMessageDAO.MessageResult::message) .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType), cassandraConfiguration.getMessageReadChunkSize()) .groupBy(MailboxMessage::getMailboxId) .flatMap(this::keepMessageIfMailboxExists) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 7f7a0b7..b7e6ed3 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -177,9 +177,7 @@ public class CassandraMessageMapper implements MessageMapper { private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { return messageDAO.retrieveMessages(messageIds, fetchType, limit) - .filter(CassandraMessageDAO.MessageResult::isFound) - .map(CassandraMessageDAO.MessageResult::message) - .flatMap(stream -> attachmentLoader.addAttachmentToMessage(stream, fetchType)); + .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType)); } @Override @@ -221,9 +219,7 @@ public class CassandraMessageMapper implements MessageMapper { return retrieveComposedId(mailboxId, messageUid) .flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata)) .flatMapMany(idWithMetadata -> messageDAO.retrieveMessage(idWithMetadata, FetchType.Metadata)) - .filter(CassandraMessageDAO.MessageResult::isFound) - .map(CassandraMessageDAO.MessageResult::message) - .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of())); + .map(pair -> pair.toMailboxMessage(ImmutableList.of())); } private Mono<ComposedMessageIdWithMetaData> retrieveComposedId(CassandraId mailboxId, MessageUid uid) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageWithoutAttachment.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java similarity index 85% rename from mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageWithoutAttachment.java rename to mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java index 618ebb5..728f0d2 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageWithoutAttachment.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java @@ -35,7 +35,7 @@ import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -public class MessageWithoutAttachment { +public class MessageRepresentation { private final MessageId messageId; private final Date internalDate; private final Long size; @@ -47,10 +47,11 @@ public class MessageWithoutAttachment { private final MessageUid messageUid; private final ModSeq modSeq; private final boolean hasAttachment; + private final List<MessageAttachmentRepresentation> attachments; - public MessageWithoutAttachment(MessageId messageId, Date internalDate, Long size, Integer bodySize, SharedByteArrayInputStream content, - Flags flags, PropertyBuilder propertyBuilder, MailboxId mailboxId, MessageUid messageUid, ModSeq modSeq, - boolean hasAttachment) { + public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodySize, SharedByteArrayInputStream content, + Flags flags, PropertyBuilder propertyBuilder, MailboxId mailboxId, MessageUid messageUid, ModSeq modSeq, + boolean hasAttachment, List<MessageAttachmentRepresentation> attachments) { this.messageId = messageId; this.internalDate = internalDate; this.size = size; @@ -62,6 +63,7 @@ public class MessageWithoutAttachment { this.messageUid = messageUid; this.modSeq = modSeq; this.hasAttachment = hasAttachment; + this.attachments = attachments; } public SimpleMailboxMessage toMailboxMessage(List<MessageAttachment> attachments) { @@ -100,4 +102,8 @@ public class MessageWithoutAttachment { public PropertyBuilder getPropertyBuilder() { return propertyBuilder; } + + public List<MessageAttachmentRepresentation> getAttachments() { + return attachments; + } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java index bec5100..daf4767 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java @@ -32,7 +32,6 @@ import javax.mail.Flags; import javax.mail.util.SharedByteArrayInputStream; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraModule; @@ -111,7 +110,7 @@ class CassandraMessageDAOTest { testee.save(message).block(); - MessageWithoutAttachment attachmentRepresentation = + MessageRepresentation attachmentRepresentation = toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited())); assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()) @@ -127,7 +126,7 @@ class CassandraMessageDAOTest { testee.save(message).block(); - MessageWithoutAttachment attachmentRepresentation = + MessageRepresentation attachmentRepresentation = toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited())); assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()).isEqualTo(textualLineCount); @@ -139,7 +138,7 @@ class CassandraMessageDAOTest { testee.save(message).block(); - MessageWithoutAttachment attachmentRepresentation = + MessageRepresentation attachmentRepresentation = toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited())); assertThat(IOUtils.toString(attachmentRepresentation.getContent(), StandardCharsets.UTF_8)) @@ -152,7 +151,7 @@ class CassandraMessageDAOTest { testee.save(message).block(); - MessageWithoutAttachment attachmentRepresentation = + MessageRepresentation attachmentRepresentation = toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited())); byte[] expected = Bytes.concat( @@ -168,7 +167,7 @@ class CassandraMessageDAOTest { testee.save(message).block(); - MessageWithoutAttachment attachmentRepresentation = + MessageRepresentation attachmentRepresentation = toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited())); assertThat(IOUtils.toString(attachmentRepresentation.getContent(), StandardCharsets.UTF_8)) @@ -190,10 +189,8 @@ class CassandraMessageDAOTest { .build(); } - private MessageWithoutAttachment toMessage(Flux<CassandraMessageDAO.MessageResult> read) { + private MessageRepresentation toMessage(Flux<MessageRepresentation> read) { return read.toStream() - .map(CassandraMessageDAO.MessageResult::message) - .map(Pair::getLeft) .findAny() .orElseThrow(() -> new IllegalStateException("Collection is not supposed to be empty")); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
