MAILBOX-296 Batch message retrieval and get rid of IN clause in 
CassandraMessageDAO


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3bcc4efe
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3bcc4efe
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3bcc4efe

Branch: refs/heads/master
Commit: 3bcc4efe39e342de2c18784e3e330383f0f60d8f
Parents: c4e085c
Author: benwa <[email protected]>
Authored: Thu May 18 17:10:03 2017 +0700
Committer: benwa <[email protected]>
Committed: Fri May 19 17:30:35 2017 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageDAO.java     | 62 +++++++-------------
 .../CassandraMailboxManagerProvider.java        |  2 +-
 .../cassandra/CassandraTestSystemFixture.java   |  2 +-
 .../CassandraMailboxManagerAttachmentTest.java  |  2 +-
 .../cassandra/mail/CassandraMapperProvider.java |  2 +-
 .../cassandra/host/CassandraHostSystem.java     |  2 +-
 6 files changed, 27 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 3b39fac..b0b0f83 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -21,7 +21,6 @@ package org.apache.james.mailbox.cassandra.mail;
 
 import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.in;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
@@ -58,10 +57,8 @@ import javax.mail.util.SharedByteArrayInputStream;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.CassandraMessageId;
-import org.apache.james.mailbox.cassandra.CassandraMessageId.Factory;
 import 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Attachments;
 import 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Properties;
 import org.apache.james.mailbox.exception.MailboxException;
@@ -78,6 +75,7 @@ import 
org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty;
 import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.streams.JamesCollectors;
 
 import com.datastax.driver.core.BoundStatement;
@@ -85,34 +83,28 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.UDTValue;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.querybuilder.Select;
 import com.datastax.driver.core.querybuilder.Select.Where;
-import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.io.ByteStreams;
 import com.google.common.primitives.Bytes;
 
 public class CassandraMessageDAO {
 
-    public static final int CHUNK_SIZE_ON_READ = 5000;
+    public static final int CHUNK_SIZE_ON_READ = 100;
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final CassandraTypesProvider typesProvider;
-    private final Factory messageIdFactory;
     private final PreparedStatement insert;
     private final PreparedStatement delete;
 
     @Inject
-    public CassandraMessageDAO(Session session, CassandraTypesProvider 
typesProvider, CassandraMessageId.Factory messageIdFactory) {
+    public CassandraMessageDAO(Session session, CassandraTypesProvider 
typesProvider) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.typesProvider = typesProvider;
-        this.messageIdFactory = messageIdFactory;
         this.insert = prepareInsert(session);
         this.delete = prepareDelete(session);
     }
@@ -186,40 +178,37 @@ public class CassandraMessageDAO {
     }
 
     public CompletableFuture<Stream<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>>> 
retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType 
fetchType, Optional<Integer> limit) {
-        return CompletableFutureUtil.allOf(
-            messageIds.stream()
+        return CompletableFutureUtil.chainAll(
+            getLimitedIdStream(messageIds.stream().distinct(), limit)
                 .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ))
                 .values()
-                .stream()
-                .map((List<ComposedMessageIdWithMetaData> ids) -> 
retrieveRows(ids, fetchType, limit)
-                    .thenApply(resultSet -> 
toMessagesWithAttachmentRepresentation(messageIds, fetchType, resultSet))))
+                .stream(),
+            ids -> FluentFutureStream.of(
+                ids.stream()
+                    .map(id -> retrieveRow(id, fetchType)
+                        .thenApply(resultSet ->
+                            message(resultSet.one(), id, fetchType))))
+                .completableFuture())
             .thenApply(stream -> stream.flatMap(Function.identity()));
     }
 
-    private Stream<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> 
toMessagesWithAttachmentRepresentation(List<ComposedMessageIdWithMetaData> 
messageIds, FetchType fetchType, ResultSet resultSet) {
-        ImmutableListMultimap<MessageId, Row> messagesById = 
CassandraUtils.convertToStream(resultSet)
-            .collect(Guavate.toImmutableListMultimap(row -> 
messageIdFactory.of(row.getUUID(MESSAGE_ID)), row -> row));
-        return messageIds.stream()
-            .filter(composedId -> 
!messagesById.get(composedId.getComposedMessageId().getMessageId()).isEmpty())
-            .map(composedId -> 
message(messagesById.get(composedId.getComposedMessageId().getMessageId()).get(0),
 composedId, fetchType));
+    private Stream<ComposedMessageIdWithMetaData> 
getLimitedIdStream(Stream<ComposedMessageIdWithMetaData> messageIds, 
Optional<Integer> limit) {
+        return limit
+            .filter(value -> value > 0)
+            .map(messageIds::limit)
+            .orElse(messageIds);
     }
 
-    private CompletableFuture<ResultSet> 
retrieveRows(List<ComposedMessageIdWithMetaData> messageIds, FetchType 
fetchType, Optional<Integer> limit) {
+    private CompletableFuture<ResultSet> 
retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
         return cassandraAsyncExecutor.execute(
-                buildSelectQueryWithLimit(
-                        buildQuery(messageIds, fetchType),
-                        limit));
+                buildQuery(messageId, fetchType));
     }
     
-    private Where buildQuery(List<ComposedMessageIdWithMetaData> messageIds, 
FetchType fetchType) {
+    private Where buildQuery(ComposedMessageIdWithMetaData messageId, 
FetchType fetchType) {
+        CassandraMessageId cassandraMessageId = (CassandraMessageId) 
messageId.getComposedMessageId().getMessageId();
         return select(retrieveFields(fetchType))
                 .from(TABLE_NAME)
-                .where(in(MESSAGE_ID, messageIds.stream()
-                        
.map(ComposedMessageIdWithMetaData::getComposedMessageId)
-                        .map(ComposedMessageId::getMessageId)
-                        .map(messageId -> (CassandraMessageId) messageId)
-                        .map(CassandraMessageId::get)
-                        .collect(Collectors.toList())));
+                .where(eq(MESSAGE_ID, cassandraMessageId.get()));
     }
 
     private Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> message(Row 
row,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) {
@@ -290,13 +279,6 @@ public class CassandraMessageDAO {
         }
     }
 
-    private Statement buildSelectQueryWithLimit(Select.Where selectStatement, 
Optional<Integer> limit) {
-        if (!limit.isPresent() || limit.get() <= 0) {
-            return selectStatement;
-        }
-        return selectStatement.limit(limit.get());
-    }
-
     public CompletableFuture<Void> delete(CassandraMessageId messageId) {
         return cassandraAsyncExecutor.executeVoid(delete.bind()
             .setUUID(MESSAGE_ID, messageId.get()));

http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
index 41bcd4b..d10b7c3 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
@@ -56,7 +56,7 @@ public class CassandraMailboxManagerProvider {
         CassandraMessageId.Factory messageIdFactory = new 
CassandraMessageId.Factory();
         CassandraMessageIdDAO messageIdDAO = new 
CassandraMessageIdDAO(session, messageIdFactory);
         CassandraMessageIdToImapUidDAO imapUidDAO = new 
CassandraMessageIdToImapUidDAO(session, messageIdFactory);
-        CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, 
cassandraTypesProvider, messageIdFactory);
+        CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, 
cassandraTypesProvider);
         CassandraMailboxCounterDAO mailboxCounterDAO = new 
CassandraMailboxCounterDAO(session);
         CassandraMailboxRecentsDAO mailboxRecentsDAO = new 
CassandraMailboxRecentsDAO(session);
         CassandraMailboxDAO mailboxDAO = new CassandraMailboxDAO(session, 
cassandraTypesProvider, MAX_ACL_RETRY);

http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
index 22af497..2ebc801 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
@@ -80,7 +80,7 @@ public class CassandraTestSystemFixture {
         CassandraMessageId.Factory messageIdFactory = new 
CassandraMessageId.Factory();
         CassandraMessageIdDAO messageIdDAO = new 
CassandraMessageIdDAO(CASSANDRA.getConf(), messageIdFactory);
         CassandraMessageIdToImapUidDAO imapUidDAO = new 
CassandraMessageIdToImapUidDAO(CASSANDRA.getConf(), messageIdFactory);
-        CassandraMessageDAO messageDAO = new 
CassandraMessageDAO(CASSANDRA.getConf(), CASSANDRA.getTypesProvider(), 
messageIdFactory);
+        CassandraMessageDAO messageDAO = new 
CassandraMessageDAO(CASSANDRA.getConf(), CASSANDRA.getTypesProvider());
         CassandraMailboxCounterDAO mailboxCounterDAO = new 
CassandraMailboxCounterDAO(CASSANDRA.getConf());
         CassandraMailboxRecentsDAO mailboxRecentsDAO = new 
CassandraMailboxRecentsDAO(CASSANDRA.getConf());
         CassandraApplicableFlagDAO applicableFlagDAO = new 
CassandraApplicableFlagDAO(CASSANDRA.getConf());

http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
index 56672d1..f7513d9 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
@@ -76,7 +76,7 @@ public class CassandraMailboxManagerAttachmentTest extends 
AbstractMailboxManage
                 new CassandraUidProvider(cassandra.getConf()),
                 new CassandraModSeqProvider(cassandra.getConf()),
                 cassandra.getConf(),
-                new CassandraMessageDAO(cassandra.getConf(), 
cassandra.getTypesProvider(), messageIdFactory),
+                new CassandraMessageDAO(cassandra.getConf(), 
cassandra.getTypesProvider()),
                 new CassandraMessageIdDAO(cassandra.getConf(), 
messageIdFactory),
                 new CassandraMessageIdToImapUidDAO(cassandra.getConf(), 
messageIdFactory),
                 new CassandraMailboxCounterDAO(cassandra.getConf()),

http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
index bcbd9cc..0a358db 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
@@ -110,7 +110,7 @@ public class CassandraMapperProvider implements 
MapperProvider {
             new CassandraUidProvider(cassandra.getConf()),
             cassandraModSeqProvider,
             cassandra.getConf(),
-            new CassandraMessageDAO(cassandra.getConf(), 
cassandra.getTypesProvider(), MESSAGE_ID_FACTORY),
+            new CassandraMessageDAO(cassandra.getConf(), 
cassandra.getTypesProvider()),
             new CassandraMessageIdDAO(cassandra.getConf(), MESSAGE_ID_FACTORY),
             new CassandraMessageIdToImapUidDAO(cassandra.getConf(), 
MESSAGE_ID_FACTORY),
             new CassandraMailboxCounterDAO(cassandra.getConf()),

http://git-wip-us.apache.org/repos/asf/james-project/blob/3bcc4efe/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
----------------------------------------------------------------------
diff --git 
a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
 
b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
index 0948a6a..7f6e645 100644
--- 
a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
+++ 
b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
@@ -107,7 +107,7 @@ public class CassandraHostSystem extends 
JamesImapHostSystem {
         CassandraUidProvider uidProvider = new CassandraUidProvider(session);
         CassandraTypesProvider typesProvider = new 
CassandraTypesProvider(mailboxModule, session);
         CassandraMessageId.Factory messageIdFactory = new 
CassandraMessageId.Factory();
-        CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, 
typesProvider, messageIdFactory);
+        CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, 
typesProvider);
         CassandraMessageIdDAO messageIdDAO = new 
CassandraMessageIdDAO(session, messageIdFactory);
         CassandraMessageIdToImapUidDAO imapUidDAO = new 
CassandraMessageIdToImapUidDAO(session, messageIdFactory);
         CassandraMailboxCounterDAO mailboxCounterDAO = new 
CassandraMailboxCounterDAO(session);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to