MAILBOX-296 Batch message retrieval and get rid of IN clause in CassandraMessageDAO
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3bcc4efe Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3bcc4efe Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3bcc4efe Branch: refs/heads/master Commit: 3bcc4efe39e342de2c18784e3e330383f0f60d8f Parents: c4e085c Author: benwa <[email protected]> Authored: Thu May 18 17:10:03 2017 +0700 Committer: benwa <[email protected]> Committed: Fri May 19 17:30:35 2017 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageDAO.java | 62 +++++++------------- .../CassandraMailboxManagerProvider.java | 2 +- .../cassandra/CassandraTestSystemFixture.java | 2 +- .../CassandraMailboxManagerAttachmentTest.java | 2 +- .../cassandra/mail/CassandraMapperProvider.java | 2 +- .../cassandra/host/CassandraHostSystem.java | 2 +- 6 files changed, 27 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index 3b39fac..b0b0f83 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -21,7 +21,6 @@ package org.apache.james.mailbox.cassandra.mail; import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; -import static com.datastax.driver.core.querybuilder.QueryBuilder.in; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID; @@ -58,10 +57,8 @@ import javax.mail.util.SharedByteArrayInputStream; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; -import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.CassandraMessageId; -import org.apache.james.mailbox.cassandra.CassandraMessageId.Factory; import org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Attachments; import org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Properties; import org.apache.james.mailbox.exception.MailboxException; @@ -78,6 +75,7 @@ import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty; import org.apache.james.util.CompletableFutureUtil; +import org.apache.james.util.FluentFutureStream; import org.apache.james.util.streams.JamesCollectors; import com.datastax.driver.core.BoundStatement; @@ -85,34 +83,28 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; -import com.datastax.driver.core.Statement; import com.datastax.driver.core.UDTValue; import com.datastax.driver.core.querybuilder.QueryBuilder; -import com.datastax.driver.core.querybuilder.Select; import com.datastax.driver.core.querybuilder.Select.Where; -import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableListMultimap; import com.google.common.io.ByteStreams; import com.google.common.primitives.Bytes; public class CassandraMessageDAO { - public static final int CHUNK_SIZE_ON_READ = 5000; + public static final int CHUNK_SIZE_ON_READ = 100; private final CassandraAsyncExecutor cassandraAsyncExecutor; private final CassandraTypesProvider typesProvider; - private final Factory messageIdFactory; private final PreparedStatement insert; private final PreparedStatement delete; @Inject - public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider, CassandraMessageId.Factory messageIdFactory) { + public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.typesProvider = typesProvider; - this.messageIdFactory = messageIdFactory; this.insert = prepareInsert(session); this.delete = prepareDelete(session); } @@ -186,40 +178,37 @@ public class CassandraMessageDAO { } public CompletableFuture<Stream<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Optional<Integer> limit) { - return CompletableFutureUtil.allOf( - messageIds.stream() + return CompletableFutureUtil.chainAll( + getLimitedIdStream(messageIds.stream().distinct(), limit) .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ)) .values() - .stream() - .map((List<ComposedMessageIdWithMetaData> ids) -> retrieveRows(ids, fetchType, limit) - .thenApply(resultSet -> toMessagesWithAttachmentRepresentation(messageIds, fetchType, resultSet)))) + .stream(), + ids -> FluentFutureStream.of( + ids.stream() + .map(id -> retrieveRow(id, fetchType) + .thenApply(resultSet -> + message(resultSet.one(), id, fetchType)))) + .completableFuture()) .thenApply(stream -> stream.flatMap(Function.identity())); } - private Stream<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> toMessagesWithAttachmentRepresentation(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, ResultSet resultSet) { - ImmutableListMultimap<MessageId, Row> messagesById = CassandraUtils.convertToStream(resultSet) - .collect(Guavate.toImmutableListMultimap(row -> messageIdFactory.of(row.getUUID(MESSAGE_ID)), row -> row)); - return messageIds.stream() - .filter(composedId -> !messagesById.get(composedId.getComposedMessageId().getMessageId()).isEmpty()) - .map(composedId -> message(messagesById.get(composedId.getComposedMessageId().getMessageId()).get(0), composedId, fetchType)); + private Stream<ComposedMessageIdWithMetaData> getLimitedIdStream(Stream<ComposedMessageIdWithMetaData> messageIds, Optional<Integer> limit) { + return limit + .filter(value -> value > 0) + .map(messageIds::limit) + .orElse(messageIds); } - private CompletableFuture<ResultSet> retrieveRows(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Optional<Integer> limit) { + private CompletableFuture<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) { return cassandraAsyncExecutor.execute( - buildSelectQueryWithLimit( - buildQuery(messageIds, fetchType), - limit)); + buildQuery(messageId, fetchType)); } - private Where buildQuery(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType) { + private Where buildQuery(ComposedMessageIdWithMetaData messageId, FetchType fetchType) { + CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId(); return select(retrieveFields(fetchType)) .from(TABLE_NAME) - .where(in(MESSAGE_ID, messageIds.stream() - .map(ComposedMessageIdWithMetaData::getComposedMessageId) - .map(ComposedMessageId::getMessageId) - .map(messageId -> (CassandraMessageId) messageId) - .map(CassandraMessageId::get) - .collect(Collectors.toList()))); + .where(eq(MESSAGE_ID, cassandraMessageId.get())); } private Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message(Row row,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) { @@ -290,13 +279,6 @@ public class CassandraMessageDAO { } } - private Statement buildSelectQueryWithLimit(Select.Where selectStatement, Optional<Integer> limit) { - if (!limit.isPresent() || limit.get() <= 0) { - return selectStatement; - } - return selectStatement.limit(limit.get()); - } - public CompletableFuture<Void> delete(CassandraMessageId messageId) { return cassandraAsyncExecutor.executeVoid(delete.bind() .setUUID(MESSAGE_ID, messageId.get())); http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java index 41bcd4b..d10b7c3 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java @@ -56,7 +56,7 @@ public class CassandraMailboxManagerProvider { CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory(); CassandraMessageIdDAO messageIdDAO = new CassandraMessageIdDAO(session, messageIdFactory); CassandraMessageIdToImapUidDAO imapUidDAO = new CassandraMessageIdToImapUidDAO(session, messageIdFactory); - CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, cassandraTypesProvider, messageIdFactory); + CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, cassandraTypesProvider); CassandraMailboxCounterDAO mailboxCounterDAO = new CassandraMailboxCounterDAO(session); CassandraMailboxRecentsDAO mailboxRecentsDAO = new CassandraMailboxRecentsDAO(session); CassandraMailboxDAO mailboxDAO = new CassandraMailboxDAO(session, cassandraTypesProvider, MAX_ACL_RETRY); http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java index 22af497..2ebc801 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java @@ -80,7 +80,7 @@ public class CassandraTestSystemFixture { CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory(); CassandraMessageIdDAO messageIdDAO = new CassandraMessageIdDAO(CASSANDRA.getConf(), messageIdFactory); CassandraMessageIdToImapUidDAO imapUidDAO = new CassandraMessageIdToImapUidDAO(CASSANDRA.getConf(), messageIdFactory); - CassandraMessageDAO messageDAO = new CassandraMessageDAO(CASSANDRA.getConf(), CASSANDRA.getTypesProvider(), messageIdFactory); + CassandraMessageDAO messageDAO = new CassandraMessageDAO(CASSANDRA.getConf(), CASSANDRA.getTypesProvider()); CassandraMailboxCounterDAO mailboxCounterDAO = new CassandraMailboxCounterDAO(CASSANDRA.getConf()); CassandraMailboxRecentsDAO mailboxRecentsDAO = new CassandraMailboxRecentsDAO(CASSANDRA.getConf()); CassandraApplicableFlagDAO applicableFlagDAO = new CassandraApplicableFlagDAO(CASSANDRA.getConf()); http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java index 56672d1..f7513d9 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java @@ -76,7 +76,7 @@ public class CassandraMailboxManagerAttachmentTest extends AbstractMailboxManage new CassandraUidProvider(cassandra.getConf()), new CassandraModSeqProvider(cassandra.getConf()), cassandra.getConf(), - new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), messageIdFactory), + new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider()), new CassandraMessageIdDAO(cassandra.getConf(), messageIdFactory), new CassandraMessageIdToImapUidDAO(cassandra.getConf(), messageIdFactory), new CassandraMailboxCounterDAO(cassandra.getConf()), http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java index bcbd9cc..0a358db 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java @@ -110,7 +110,7 @@ public class CassandraMapperProvider implements MapperProvider { new CassandraUidProvider(cassandra.getConf()), cassandraModSeqProvider, cassandra.getConf(), - new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), MESSAGE_ID_FACTORY), + new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider()), new CassandraMessageIdDAO(cassandra.getConf(), MESSAGE_ID_FACTORY), new CassandraMessageIdToImapUidDAO(cassandra.getConf(), MESSAGE_ID_FACTORY), new CassandraMailboxCounterDAO(cassandra.getConf()), http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java ---------------------------------------------------------------------- diff --git a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java index 0948a6a..7f6e645 100644 --- a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java +++ b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java @@ -107,7 +107,7 @@ public class CassandraHostSystem extends JamesImapHostSystem { CassandraUidProvider uidProvider = new CassandraUidProvider(session); CassandraTypesProvider typesProvider = new CassandraTypesProvider(mailboxModule, session); CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory(); - CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, typesProvider, messageIdFactory); + CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, typesProvider); CassandraMessageIdDAO messageIdDAO = new CassandraMessageIdDAO(session, messageIdFactory); CassandraMessageIdToImapUidDAO imapUidDAO = new CassandraMessageIdToImapUidDAO(session, messageIdFactory); CassandraMailboxCounterDAO mailboxCounterDAO = new CassandraMailboxCounterDAO(session); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
