Repository: james-project Updated Branches: refs/heads/master 8256ad1b3 -> 14be7a1d5
MAILBOX-285 Cassandra MessageMapper should support concurrent flags updates Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/16bba839 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/16bba839 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/16bba839 Branch: refs/heads/master Commit: 16bba839c2f104098c5db000416dfa282c0209df Parents: f8eb73e Author: Benoit Tellier <btell...@linagora.com> Authored: Wed Jan 11 18:28:48 2017 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Tue Jan 17 10:44:40 2017 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageMapper.java | 17 +++--- .../store/mail/model/MessageMapperTest.java | 57 ++++++++++++++++++++ 2 files changed, 66 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/16bba839/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index e6f1ed7..51f2528 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -42,6 +42,7 @@ import org.apache.james.mailbox.cassandra.mail.utils.MessageDeletedDuringFlagsUp import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.MessageMetaData; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.model.UpdatedFlags; @@ -335,11 +336,7 @@ public class CassandraMessageMapper implements MessageMapper { return Optional.of( new FunctionRunnerWithRetry(maxRetries) .executeAndRetrieveObject(() -> retryMessageFlagsUpdate(mailbox, - ComposedMessageIdWithMetaData.builder() - .composedMessageId(new ComposedMessageId(mailbox.getMailboxId(), message.getMessageId(), message.getUid())) - .modSeq(message.getModSeq()) - .flags(message.createFlags()) - .build(), + message.getMessageId(), flagUpdateCalculator))); } catch (MessageDeletedDuringFlagsUpdateException e) { mailboxSession.getLog().warn(e.getMessage()); @@ -349,13 +346,17 @@ public class CassandraMessageMapper implements MessageMapper { } } - private Optional<UpdatedFlags> retryMessageFlagsUpdate(Mailbox mailbox, ComposedMessageIdWithMetaData composedMessageIdWithMetaData, FlagsUpdateCalculator flagUpdateCalculator) { - CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); + private Optional<UpdatedFlags> retryMessageFlagsUpdate(Mailbox mailbox, MessageId messageId, FlagsUpdateCalculator flagUpdateCalculator) { + CassandraId cassandraId = (CassandraId) mailbox.getMailboxId(); + ComposedMessageIdWithMetaData composedMessageIdWithMetaData = imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId)) + .join() + .findFirst() + .orElseThrow(MailboxDeleteDuringUpdate::new); return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, messageDAO.retrieveMessages(ImmutableList.of(composedMessageIdWithMetaData), FetchType.Metadata, Optional.empty()).join() .findFirst() .map(pair -> pair.getLeft().toMailboxMessage(ImmutableList.of())) - .orElseThrow(() -> new MessageDeletedDuringFlagsUpdateException(mailboxId, (CassandraMessageId) composedMessageIdWithMetaData.getComposedMessageId().getMessageId()))); + .orElseThrow(() -> new MessageDeletedDuringFlagsUpdateException(cassandraId, (CassandraMessageId) messageId))); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/16bba839/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java index ec87df8..a4e6053 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java @@ -28,6 +28,10 @@ import java.io.IOException; import java.util.Date; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.mail.Flags; import javax.mail.util.SharedByteArrayInputStream; @@ -62,6 +66,37 @@ import com.google.common.collect.Lists; @Contract(MapperProvider.class) public class MessageMapperTest<T extends MapperProvider> { + private class ConcurrentSetFlagTestRunnable implements Runnable { + private final int threadNumber; + private final int updateCount; + private final Mailbox mailbox; + private final MessageUid uid; + private final CountDownLatch countDownLatch; + + public ConcurrentSetFlagTestRunnable(int threadNumber, int updateCount, Mailbox mailbox, MessageUid uid, CountDownLatch countDownLatch) { + this.threadNumber = threadNumber; + this.updateCount = updateCount; + this.mailbox = mailbox; + this.uid = uid; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + countDownLatch.countDown(); + for (int i = 0; i < updateCount; i++) { + try { + messageMapper.updateFlags(mailbox, + new FlagsUpdateCalculator(new Flags("custom-" + threadNumber + "-" + i), + FlagsUpdateMode.ADD), + MessageRange.one(uid)); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + private final static char DELIMITER = '.'; private static final int LIMIT = 10; private static final int BODY_START = 16; @@ -695,6 +730,28 @@ public class MessageMapperTest<T extends MapperProvider> { } @ContractTest + public void userFlagsUpdateShouldWorkInConcurrentEnvironment() throws Exception { + saveMessages(); + + int threadCount = 2; + int updateCount = 10; + CountDownLatch countDownLatch = new CountDownLatch(threadCount); + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + executorService.submit(new ConcurrentSetFlagTestRunnable(i, updateCount, + benwaInboxMailbox, message1.getUid(), countDownLatch)); + } + executorService.shutdown(); + assertThat(executorService.awaitTermination(1, TimeUnit.MINUTES)) + .isTrue(); + + Iterator<MailboxMessage> messages = messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.one(message1.getUid()), + FetchType.Metadata, 1); + assertThat(messages.hasNext()).isTrue(); + assertThat(messages.next().createFlags().getUserFlags()).hasSize(threadCount * updateCount); + } + + @ContractTest public void messagesShouldBeSavedWithTheirUserFlags() throws Exception { MailboxMessage message = SimpleMailboxMessage.copy(benwaInboxMailbox.getMailboxId(), message1); messageMapper.add(benwaInboxMailbox, message); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org