This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3d0bc77731d05af96a712e815ecd419822f85e0e Author: Benoit Tellier <[email protected]> AuthorDate: Sun Apr 12 11:18:02 2020 +0700 [REFACTORING] CassandraMessageDAO should only care about retrieving a single element Retrieving several elements is an heritage from the `SELECT ... IN` sementic (allowing to retrieve several element at once at the Cassandra level, but sub optimal as it comes at a great coordination cost). This sementic: - Makes the DAO responsible of applying the limit (untested) - Forces to have a found/not found semantic - Forces to have "reactor windowing" rather than a classic "flatMap" concurrency limitation Reactor allows easy composition of Mono, thus this costly semantic is no longer needed. --- .../cassandra/mail/CassandraMessageDAO.java | 8 -------- .../cassandra/mail/CassandraMessageMapper.java | 15 +++++--------- .../cassandra/mail/CassandraMessageDAOTest.java | 24 ++++++++++------------ .../java/org/apache/james/util/streams/Limit.java | 8 ++++++++ 4 files changed, 24 insertions(+), 31 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index fc92869..58e25e3 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -74,7 +74,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.Property; import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; -import org.apache.james.util.streams.Limit; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; @@ -93,7 +92,6 @@ import com.google.common.primitives.Bytes; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; public class CassandraMessageDAO { @@ -241,12 +239,6 @@ public class CassandraMessageDAO { .flatMap(resultSet -> message(resultSet, id, fetchType)); } - public Flux<MessageRepresentation> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { - return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct())) - .publishOn(Schedulers.elastic()) - .flatMap(id -> retrieveMessage(id, fetchType), configuration.getMessageReadChunkSize()); - } - private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) { CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId(); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index b7e6ed3..919da37 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -161,22 +161,17 @@ public class CassandraMessageMapper implements MessageMapper { @Override public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return retrieveMessageIds(mailboxId, messageRange) - .flatMap(ids -> retrieveMessages(ids, ftype, Limit.from(max))) + return Limit.from(max).applyOnFlux( + messageIdDAO.retrieveMessages(mailboxId, messageRange) + .flatMap(id -> retrieveMessage(id, ftype), cassandraConfiguration.getMessageReadChunkSize())) .map(MailboxMessage.class::cast) .sort(Comparator.comparing(MailboxMessage::getUid)) .toIterable() .iterator(); } - private Flux<List<ComposedMessageIdWithMetaData>> retrieveMessageIds(CassandraId mailboxId, MessageRange messageRange) { - return messageIdDAO.retrieveMessages(mailboxId, messageRange) - .window(cassandraConfiguration.getMessageReadChunkSize()) - .flatMap(flux -> flux.collect(Guavate.toImmutableList())); - } - - private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { - return messageDAO.retrieveMessages(messageIds, fetchType, limit) + private Mono<MailboxMessage> retrieveMessage(ComposedMessageIdWithMetaData messageId, FetchType fetchType) { + return messageDAO.retrieveMessage(messageId, fetchType) .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType)); } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java index daf4767..d7fe3da 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java @@ -53,7 +53,6 @@ import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -import org.apache.james.util.streams.Limit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -63,7 +62,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Bytes; import nl.jqno.equalsverifier.EqualsVerifier; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; class CassandraMessageDAOTest { private static final int BODY_START = 16; @@ -86,7 +85,7 @@ class CassandraMessageDAOTest { private SimpleMailboxMessage message; private CassandraMessageId messageId; - private List<ComposedMessageIdWithMetaData> messageIds; + private ComposedMessageIdWithMetaData messageIdWithMetadata; @BeforeEach void setUp(CassandraCluster cassandra) { @@ -97,11 +96,11 @@ class CassandraMessageDAOTest { testee = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobStore, blobIdFactory, new CassandraMessageId.Factory()); - messageIds = ImmutableList.of(ComposedMessageIdWithMetaData.builder() + messageIdWithMetadata = ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId(MAILBOX_ID, messageId, messageUid)) .flags(new Flags()) .modSeq(ModSeq.of(1)) - .build()); + .build(); } @Test @@ -111,7 +110,7 @@ class CassandraMessageDAOTest { testee.save(message).block(); MessageRepresentation attachmentRepresentation = - toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited())); + toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Metadata)); assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()) .isEqualTo(0L); @@ -127,7 +126,7 @@ class CassandraMessageDAOTest { testee.save(message).block(); MessageRepresentation attachmentRepresentation = - toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited())); + toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Metadata)); assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()).isEqualTo(textualLineCount); } @@ -139,7 +138,7 @@ class CassandraMessageDAOTest { testee.save(message).block(); MessageRepresentation attachmentRepresentation = - toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited())); + toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Full)); assertThat(IOUtils.toString(attachmentRepresentation.getContent(), StandardCharsets.UTF_8)) .isEqualTo(CONTENT); @@ -152,7 +151,7 @@ class CassandraMessageDAOTest { testee.save(message).block(); MessageRepresentation attachmentRepresentation = - toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited())); + toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Body)); byte[] expected = Bytes.concat( new byte[BODY_START], @@ -168,7 +167,7 @@ class CassandraMessageDAOTest { testee.save(message).block(); MessageRepresentation attachmentRepresentation = - toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited())); + toMessage(testee.retrieveMessage(messageIdWithMetadata, MessageMapper.FetchType.Headers)); assertThat(IOUtils.toString(attachmentRepresentation.getContent(), StandardCharsets.UTF_8)) .isEqualTo(CONTENT.substring(0, BODY_START)); @@ -189,9 +188,8 @@ class CassandraMessageDAOTest { .build(); } - private MessageRepresentation toMessage(Flux<MessageRepresentation> read) { - return read.toStream() - .findAny() + private MessageRepresentation toMessage(Mono<MessageRepresentation> read) { + return read.blockOptional() .orElseThrow(() -> new IllegalStateException("Collection is not supposed to be empty")); } diff --git a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java index 268ed5e..466f1fb 100644 --- a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java +++ b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java @@ -25,6 +25,8 @@ import java.util.stream.Stream; import com.google.common.base.Preconditions; +import reactor.core.publisher.Flux; + public class Limit { public static Limit from(int limit) { @@ -65,6 +67,12 @@ public class Limit { .orElse(stream); } + public <T> Flux<T> applyOnFlux(Flux<T> flux) { + return limit + .map(flux::take) + .orElse(flux); + } + @Override public final boolean equals(Object o) { if (o instanceof Limit) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
