Hi,

You will found as an attachment a patch containing a failing test,
corresponding to MAILBOX-206.

It simply launches in two different threads an update on messages flags,
on different flags ( in thread one we mark mails as DELETED and in
thread 2 we mark it as DRAFT ). I added a sleep in
CassandraMessageMapper to simulate network latency ( and be sure to
observe the data race ). The expected result is to have both
modifications applied. The observed result is that mails are only marked
as DRAFT.

I wrote some code that solve the issue ( it passes this test and lead to
no regressions ). I will send it as a patch as soon as we will have
finished to review it at Linagora.

Merry christmas ( in advance ),

Tellier Benoit
diff -uNr james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
--- james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java	2014-12-16 17:49:32.079929004 +0100
+++ james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java	2014-12-21 03:51:36.641755606 +0100
@@ -329,6 +329,13 @@
     public Iterator<UpdatedFlags> updateFlags(Mailbox<UUID> mailbox, Flags flags, boolean value, boolean replace, MessageRange set) throws MailboxException {
         ImmutableList.Builder<UpdatedFlags> result = ImmutableList.builder();
         for (Row row : session.execute(buildQuery(mailbox, set))) {
+            // Simulate latency
+            try {
+                Thread.sleep(100);
+            } catch( InterruptedException interrupetedException) {
+                System.out.println("Interrupted");
+                Thread.currentThread().interrupt();
+            }
             Message<UUID> message = message(row);
             Flags originFlags = message.createFlags();
             Flags updatedFlags = buildFlags(message, flags, value, replace);
diff -uNr james-mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java james-mailbox-new/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
--- james-mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java	2014-12-16 17:49:32.163262333 +0100
+++ james-mailbox-new/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java	2014-12-21 03:51:36.638422272 +0100
@@ -19,12 +19,18 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import javax.mail.Flags;
 import javax.mail.internet.SharedInputStream;
@@ -32,8 +38,11 @@
 
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.CassandraClusterSingleton;
+import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.mock.MockMailboxSession;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.Message;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
@@ -136,6 +145,96 @@
         testAdd();
         testGetLastUid();
         testGetHighestModSeq();
+        testSetFagsToFalse();
+        testMessageUpdateAddition();
+        testSetFagsToFalse();
+        testSetFagsConcurent();
+    }
+
+    class UpdateThread implements Runnable {
+
+        public Flags.Flag flag;
+
+        public UpdateThread(Flags.Flag flag) {
+            this.flag = flag;
+        }
+
+        public void run() {
+            Flags flags = new Flags();
+            flags.add(flag);
+            try {
+                messageMapper.updateFlags(MBOXES.get(1), flags, true, false, MessageRange.all());
+            } catch(MailboxException e) {
+                LOG.error("Can not update flags");
+            }
+        }
+
+    }
+
+    private void testSetFagsConcurent() throws MailboxException, InterruptedException {
+        UpdateThread th1 = new UpdateThread(Flags.Flag.DELETED);
+        UpdateThread th2 = new UpdateThread(Flags.Flag.DRAFT);
+        ForkJoinPool forkJoinPool = new ForkJoinPool(2);
+        forkJoinPool.execute(th1);
+        Thread.sleep(50);
+        forkJoinPool.execute(th2);
+        forkJoinPool.shutdown();
+        forkJoinPool.awaitTermination(100, TimeUnit.SECONDS);
+        Iterator<Message<UUID>> messageIterator = messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(), MessageMapper.FetchType.Full, 100);
+        while(messageIterator.hasNext()) {
+            Message<UUID> message = messageIterator.next();
+            assertFalse(message.isAnswered());
+            assertTrue(message.isDeleted());
+            assertTrue(message.isDraft());
+            assertFalse(message.isRecent());
+            assertFalse(message.isSeen());
+            assertFalse(message.isFlagged());
+        }
+    }
+
+    /**
+     * Test message flag set all flags to false
+     */
+    private void testSetFagsToFalse() throws MailboxException {
+        LOG.info("message update : flag addition");
+        Flags flags = new Flags();
+        flags.add(Flags.Flag.SEEN);
+        flags.add(Flags.Flag.ANSWERED);
+        flags.add(Flags.Flag.FLAGGED);
+        flags.add(Flags.Flag.DRAFT);
+        flags.add(Flags.Flag.DELETED);
+        flags.add(Flags.Flag.RECENT);
+        messageMapper.updateFlags(MBOXES.get(1), flags, false, false, MessageRange.all());
+        Iterator<Message<UUID>> messageIterator = messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(), MessageMapper.FetchType.Full, 100);
+        while(messageIterator.hasNext()) {
+            Message<UUID> message = messageIterator.next();
+            assertFalse(message.isAnswered());
+            assertFalse(message.isDraft());
+            assertFalse(message.isDeleted());
+            assertFalse(message.isRecent());
+            assertFalse(message.isSeen());
+            assertFalse(message.isFlagged());
+        }
+    }
+
+    /**
+     * Test message flag set to true
+     */
+    private void testMessageUpdateAddition() throws MailboxException {
+        LOG.info("message update : flag addition");
+        Flags flags = new Flags();
+        flags.add(Flags.Flag.FLAGGED);
+        messageMapper.updateFlags(MBOXES.get(1), flags, true, false, MessageRange.all());
+        Iterator<Message<UUID>> messageIterator = messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(), MessageMapper.FetchType.Full, 100);
+        while(messageIterator.hasNext()) {
+            Message<UUID> message = messageIterator.next();
+            assertFalse(message.isAnswered());
+            assertFalse(message.isDraft());
+            assertFalse(message.isDeleted());
+            assertFalse(message.isRecent());
+            assertFalse(message.isSeen());
+            assertTrue(message.isFlagged());
+        }
     }
 
     /**

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to