Hi,
You will found as an attachment a patch containing a failing test,
corresponding to MAILBOX-206.
It simply launches in two different threads an update on messages flags,
on different flags ( in thread one we mark mails as DELETED and in
thread 2 we mark it as DRAFT ). I added a sleep in
CassandraMessageMapper to simulate network latency ( and be sure to
observe the data race ). The expected result is to have both
modifications applied. The observed result is that mails are only marked
as DRAFT.
I wrote some code that solve the issue ( it passes this test and lead to
no regressions ). I will send it as a patch as soon as we will have
finished to review it at Linagora.
Merry christmas ( in advance ),
Tellier Benoit
diff -uNr james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
--- james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java 2014-12-16 17:49:32.079929004 +0100
+++ james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java 2014-12-21 03:51:36.641755606 +0100
@@ -329,6 +329,13 @@
public Iterator<UpdatedFlags> updateFlags(Mailbox<UUID> mailbox, Flags flags, boolean value, boolean replace, MessageRange set) throws MailboxException {
ImmutableList.Builder<UpdatedFlags> result = ImmutableList.builder();
for (Row row : session.execute(buildQuery(mailbox, set))) {
+ // Simulate latency
+ try {
+ Thread.sleep(100);
+ } catch( InterruptedException interrupetedException) {
+ System.out.println("Interrupted");
+ Thread.currentThread().interrupt();
+ }
Message<UUID> message = message(row);
Flags originFlags = message.createFlags();
Flags updatedFlags = buildFlags(message, flags, value, replace);
diff -uNr james-mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java james-mailbox-new/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
--- james-mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java 2014-12-16 17:49:32.163262333 +0100
+++ james-mailbox-new/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java 2014-12-21 03:51:36.638422272 +0100
@@ -19,12 +19,18 @@
package org.apache.james.mailbox.cassandra.mail;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.mail.Flags;
import javax.mail.internet.SharedInputStream;
@@ -32,8 +38,11 @@
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.cassandra.CassandraClusterSingleton;
+import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.mock.MockMailboxSession;
import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.Message;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
@@ -136,6 +145,96 @@
testAdd();
testGetLastUid();
testGetHighestModSeq();
+ testSetFagsToFalse();
+ testMessageUpdateAddition();
+ testSetFagsToFalse();
+ testSetFagsConcurent();
+ }
+
+ class UpdateThread implements Runnable {
+
+ public Flags.Flag flag;
+
+ public UpdateThread(Flags.Flag flag) {
+ this.flag = flag;
+ }
+
+ public void run() {
+ Flags flags = new Flags();
+ flags.add(flag);
+ try {
+ messageMapper.updateFlags(MBOXES.get(1), flags, true, false, MessageRange.all());
+ } catch(MailboxException e) {
+ LOG.error("Can not update flags");
+ }
+ }
+
+ }
+
+ private void testSetFagsConcurent() throws MailboxException, InterruptedException {
+ UpdateThread th1 = new UpdateThread(Flags.Flag.DELETED);
+ UpdateThread th2 = new UpdateThread(Flags.Flag.DRAFT);
+ ForkJoinPool forkJoinPool = new ForkJoinPool(2);
+ forkJoinPool.execute(th1);
+ Thread.sleep(50);
+ forkJoinPool.execute(th2);
+ forkJoinPool.shutdown();
+ forkJoinPool.awaitTermination(100, TimeUnit.SECONDS);
+ Iterator<Message<UUID>> messageIterator = messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(), MessageMapper.FetchType.Full, 100);
+ while(messageIterator.hasNext()) {
+ Message<UUID> message = messageIterator.next();
+ assertFalse(message.isAnswered());
+ assertTrue(message.isDeleted());
+ assertTrue(message.isDraft());
+ assertFalse(message.isRecent());
+ assertFalse(message.isSeen());
+ assertFalse(message.isFlagged());
+ }
+ }
+
+ /**
+ * Test message flag set all flags to false
+ */
+ private void testSetFagsToFalse() throws MailboxException {
+ LOG.info("message update : flag addition");
+ Flags flags = new Flags();
+ flags.add(Flags.Flag.SEEN);
+ flags.add(Flags.Flag.ANSWERED);
+ flags.add(Flags.Flag.FLAGGED);
+ flags.add(Flags.Flag.DRAFT);
+ flags.add(Flags.Flag.DELETED);
+ flags.add(Flags.Flag.RECENT);
+ messageMapper.updateFlags(MBOXES.get(1), flags, false, false, MessageRange.all());
+ Iterator<Message<UUID>> messageIterator = messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(), MessageMapper.FetchType.Full, 100);
+ while(messageIterator.hasNext()) {
+ Message<UUID> message = messageIterator.next();
+ assertFalse(message.isAnswered());
+ assertFalse(message.isDraft());
+ assertFalse(message.isDeleted());
+ assertFalse(message.isRecent());
+ assertFalse(message.isSeen());
+ assertFalse(message.isFlagged());
+ }
+ }
+
+ /**
+ * Test message flag set to true
+ */
+ private void testMessageUpdateAddition() throws MailboxException {
+ LOG.info("message update : flag addition");
+ Flags flags = new Flags();
+ flags.add(Flags.Flag.FLAGGED);
+ messageMapper.updateFlags(MBOXES.get(1), flags, true, false, MessageRange.all());
+ Iterator<Message<UUID>> messageIterator = messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(), MessageMapper.FetchType.Full, 100);
+ while(messageIterator.hasNext()) {
+ Message<UUID> message = messageIterator.next();
+ assertFalse(message.isAnswered());
+ assertFalse(message.isDraft());
+ assertFalse(message.isDeleted());
+ assertFalse(message.isRecent());
+ assertFalse(message.isSeen());
+ assertTrue(message.isFlagged());
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]