Author: eric
Date: Sun Mar  8 12:09:42 2015
New Revision: 1664983

URL: http://svn.apache.org/r1664983
Log:
Data races are possible while updating flags in Cassandra, patch provided by 
Benoit Tellier (MAILBOX-206)

Modified:
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
    
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
    
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java

Modified: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
 Sun Mar  8 12:09:42 2015
@@ -57,7 +57,7 @@ public class CassandraSession implements
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + 
".messageCounter (" + "mailboxId UUID PRIMARY KEY," + "nextUid bigint," + ");");
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + 
".mailboxCounters (" + "mailboxId UUID PRIMARY KEY," + "count counter," + 
"unseen counter," + "nextModSeq counter" + ");");
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".message 
(" + "mailboxId UUID," + "uid bigint," + "internalDate timestamp," + 
"bodyStartOctet int," + "content blob," + "modSeq bigint," + "mediaType text," 
+ "subType text," + "fullContentOctets int," + "bodyOctets int,"
-                + "textualLineCount bigint," + "bodyContent blob," + 
"headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," + 
"flagDraft boolean," + "flagRecent boolean," + "flagSeen boolean," + 
"flagFlagged boolean," + "flagUser boolean," + "PRIMARY KEY (mailboxId, uid)" + 
");");
+                + "textualLineCount bigint," + "bodyContent blob," + 
"headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," + 
"flagDraft boolean," + "flagRecent boolean," + "flagSeen boolean," + 
"flagFlagged boolean," + "flagUser boolean," + "flagVersion bigint," + "PRIMARY 
KEY (mailboxId, uid)" + ");");
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + 
".subscription (" + "user text," + "mailbox text," + "PRIMARY KEY (mailbox, 
user)" + ");");
         session.close();
     }

Modified: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 Sun Mar  8 12:09:42 2015
@@ -29,6 +29,7 @@ import static com.datastax.driver.core.q
 import static com.datastax.driver.core.querybuilder.QueryBuilder.lt;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_CONTENT;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_OCTECTS;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_START_OCTET;
@@ -51,6 +52,7 @@ import static org.apache.james.mailbox.c
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Flag.RECENT;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Flag.SEEN;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Flag.USER;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FLAG_VERSION;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -64,6 +66,7 @@ import javax.mail.Flags;
 import javax.mail.Flags.Flag;
 import javax.mail.util.SharedByteArrayInputStream;
 
+import com.datastax.driver.core.querybuilder.Update;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable;
 import org.apache.james.mailbox.cassandra.table.CassandraMessageTable;
@@ -106,15 +109,25 @@ public class CassandraMessageMapper impl
     private MailboxSession mailboxSession;
     private UidProvider<UUID> uidProvider;
 
+    private final int applied = 0;
+    private int maxRetries;
+
+    private final static int DEFAULT_MAX_RETRIES = 10000;
+
     public CassandraMessageMapper(Session session, CassandraUidProvider 
uidProvider, ModSeqProvider<UUID> modSeqProvider) {
+        this(session, uidProvider, modSeqProvider, null, DEFAULT_MAX_RETRIES);
+    }
+
+    public CassandraMessageMapper(Session session, CassandraUidProvider 
uidProvider, ModSeqProvider<UUID> modSeqProvider, MailboxSession 
mailboxSession) {
+        this(session, uidProvider, modSeqProvider, mailboxSession, 
DEFAULT_MAX_RETRIES);
+    }
+
+    public CassandraMessageMapper(Session session, CassandraUidProvider 
uidProvider, ModSeqProvider<UUID> modSeqProvider, MailboxSession 
mailboxSession, int maxRetries) {
         this.session = session;
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
-    }
-
-    public CassandraMessageMapper(Session session, CassandraUidProvider 
uidProvider, CassandraModSeqProvider modSeqProvider, MailboxSession 
mailboxSession) {
-        this(session, uidProvider, modSeqProvider);
         this.mailboxSession = mailboxSession;
+        this.maxRetries = maxRetries;
     }
 
     @Override
@@ -311,7 +324,7 @@ public class CassandraMessageMapper impl
             Insert query = insertInto(TABLE_NAME).value(MAILBOX_ID, 
mailbox.getMailboxId()).value(IMAP_UID, message.getUid()).value(MOD_SEQ, 
message.getModSeq()).value(INTERNAL_DATE, 
message.getInternalDate()).value(MEDIA_TYPE, message.getMediaType())
                     .value(BODY_START_OCTET, message.getFullContentOctets() - 
message.getBodyOctets()).value(SUB_TYPE, 
message.getSubType()).value(FULL_CONTENT_OCTETS, 
message.getFullContentOctets()).value(BODY_OCTECTS, 
message.getBodyOctets()).value(ANSWERED, message.isAnswered())
                     .value(DELETED, message.isDeleted()).value(DRAFT, 
message.isDraft()).value(FLAGGED, message.isFlagged()).value(RECENT, 
message.isRecent()).value(SEEN, message.isSeen()).value(USER, 
message.createFlags().contains(Flag.USER)).value(BODY_CONTENT, bindMarker())
-                    .value(HEADER_CONTENT, 
bindMarker()).value(TEXTUAL_LINE_COUNT, message.getTextualLineCount());
+                    .value(HEADER_CONTENT, 
bindMarker()).value(TEXTUAL_LINE_COUNT, 
message.getTextualLineCount()).value(FLAG_VERSION, 0);
             PreparedStatement preparedStatement = 
session.prepare(query.toString());
             BoundStatement boundStatement = 
preparedStatement.bind(toByteBuffer(message.getBodyContent()), 
toByteBuffer(message.getHeaderContent()));
             session.execute(boundStatement);
@@ -321,28 +334,106 @@ public class CassandraMessageMapper impl
         }
     }
 
+    private boolean conditionalSave(Mailbox<UUID> mailbox, Message<UUID> 
message, long flagVersion) throws MailboxException {
+        ResultSet resultSet = session.execute(
+                update(TABLE_NAME)
+                        .with(set(ANSWERED, message.isAnswered()))
+                        .and(set(DELETED, message.isDeleted()))
+                        .and(set(DRAFT, message.isDraft()))
+                        .and(set(FLAGGED, message.isFlagged()))
+                        .and(set(RECENT, message.isRecent()))
+                        .and(set(SEEN, message.isSeen()))
+                        .and(set(USER, 
message.createFlags().contains(Flag.USER)))
+                        .and(set(FLAG_VERSION, flagVersion + 1))
+                        .where(eq(IMAP_UID, message.getUid()))
+                        .and(eq(MAILBOX_ID, message.getMailboxId()))
+                        .onlyIf(eq(FLAG_VERSION, flagVersion))
+        );
+        return resultSet.one().getBool(applied);
+    }
+
     private ByteBuffer toByteBuffer(InputStream stream) throws IOException {
         return ByteBuffer.wrap(ByteStreams.toByteArray(stream));
     }
 
+    /**
+     *
+     * @param mailbox Mailbox were messages are located
+     * @param flags Flags used for update
+     * @param value True if you want to set provided flags to true, false to 
set these flag to false
+     * @param replace true if we want to replace current flags by those 
provided
+     * @param set Range of messages to update
+     * @return Updated flag information
+     * @throws MailboxException
+     */
     @Override
     public Iterator<UpdatedFlags> updateFlags(Mailbox<UUID> mailbox, Flags 
flags, boolean value, boolean replace, MessageRange set) throws 
MailboxException {
         ImmutableList.Builder<UpdatedFlags> result = ImmutableList.builder();
         for (Row row : session.execute(buildQuery(mailbox, set))) {
-            Message<UUID> message = message(row);
-            Flags originFlags = message.createFlags();
-            Flags updatedFlags = buildFlags(message, flags, value, replace);
-            message.setFlags(updatedFlags);
-            message.setModSeq(modSeqProvider.nextModSeq(mailboxSession, 
mailbox));
-            save(mailbox, message);
-            result.add(new UpdatedFlags(message.getUid(), message.getModSeq(), 
originFlags, updatedFlags));
+            updateMessage(mailbox, flags, value, replace, result, row);
         }
         return result.build().iterator();
     }
 
+    private void updateMessage(Mailbox<UUID> mailbox, Flags flags, boolean 
value, boolean replace, ImmutableList.Builder<UpdatedFlags> result, Row row) 
throws MailboxException {
+        // Get the message and basic information about it
+        Message<UUID> message = message(row);
+        long flagVersion = row.getLong(FLAG_VERSION);
+        long uid = message.getUid();
+        // update flags
+        Flags originFlags = message.createFlags();
+        Flags updatedFlags = buildFlags(message, flags, value, replace);
+        message.setFlags(updatedFlags);
+        // Update the ModSeq
+        long previousModSeq = message.getModSeq();
+        long modSeq = modSeqProvider.nextModSeq(mailboxSession, mailbox);
+        message.setModSeq(modSeq);
+        // Try a first update
+        if(!conditionalSave(mailbox, message, flagVersion)) {
+            int tries = 0;
+            // It fails. Someone updated the flag before us.
+            do {
+                tries++;
+                // Retrieve the message from uid
+                Row newRow = findMessageByUid(mailbox, uid);
+                if(newRow == null) {
+                    // Someone deleted this result while we were doing other 
stuff
+                    // Skip it
+                    break;
+                }
+                message = message(newRow);
+                flagVersion = newRow.getLong(FLAG_VERSION);
+                // update flags
+                originFlags = message.createFlags();
+                updatedFlags = buildFlags(message, flags, value, replace);
+                message.setFlags(updatedFlags);
+                // Update ModSeq
+                if (previousModSeq != message.getModSeq()) {
+                    // Here someone updated the ModSeq, so we can not used the 
previously generated value
+                    previousModSeq = message.getModSeq();
+                    modSeq = modSeqProvider.nextModSeq(mailboxSession, 
mailbox);
+                }
+                message.setModSeq(modSeq);
+                // and retry
+            } while (!conditionalSave(mailbox, message, flagVersion) && tries 
< maxRetries);
+            if(tries == maxRetries) {
+                throw new MailboxException("Max retries reached when asking an 
update of flags on message " + uid + " for mailbox " + mailbox.getMailboxId());
+            }
+        }
+        result.add(new UpdatedFlags(message.getUid(), message.getModSeq(), 
originFlags, updatedFlags));
+    }
+
+    private Row findMessageByUid(Mailbox<UUID> mailbox, long uid) {
+        ResultSet resultSet = session.execute(selectMessage(mailbox, uid));
+        if ( resultSet.isExhausted() ) {
+            return null;
+        }
+        return resultSet.one();
+    }
+
     private Flags buildFlags(Message<UUID> message, Flags flags, boolean 
value, boolean replace) {
         if (replace) {
-            return message.createFlags();
+            return flags;
         } else {
             Flags updatedFlags = message.createFlags();
             if (value) {

Modified: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
 Sun Mar  8 12:09:42 2015
@@ -38,7 +38,8 @@ public interface CassandraMessageTable {
     String TEXTUAL_LINE_COUNT = "textualLineCount";
     String BODY_CONTENT = "bodyContent";
     String HEADER_CONTENT = "headerContent";
-    String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, 
BODY_START_OCTET, MEDIA_TYPE, SUB_TYPE, FULL_CONTENT_OCTETS, BODY_OCTECTS, 
Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, 
Flag.USER, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT };
+    String FLAG_VERSION = "flagVersion";
+    String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, 
BODY_START_OCTET, MEDIA_TYPE, SUB_TYPE, FULL_CONTENT_OCTETS, BODY_OCTECTS, 
Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, 
Flag.USER, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, FLAG_VERSION };
 
     interface Flag {
         String ANSWERED = "flagAnswered";

Modified: 
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
 Sun Mar  8 12:09:42 2015
@@ -91,7 +91,7 @@ public final class CassandraClusterSingl
         } else if (tableName.equals("message")) {
             session.execute("CREATE TABLE IF NOT EXISTS " + 
session.getLoggedKeyspace() + ".message (" + "mailboxId UUID," + "uid bigint," 
+ "internalDate timestamp," + "bodyStartOctet int," + "content blob," + "modSeq 
bigint," + "mediaType text," + "subType text," + "fullContentOctets int,"
                     + "bodyOctets int," + "textualLineCount bigint," + 
"bodyContent blob," + "headerContent blob," + "flagAnswered boolean," + 
"flagDeleted boolean," + "flagDraft boolean," + "flagRecent boolean," + 
"flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean,"
-                    + "PRIMARY KEY (mailboxId, uid)" + ");");
+                    + "flagVersion bigint,"+ "PRIMARY KEY (mailboxId, uid)" + 
");");
         } else if (tableName.equals("subscription")) {
             session.execute("CREATE TABLE IF NOT EXISTS " + 
session.getLoggedKeyspace() + ".subscription (" + "user text," + "mailbox 
text," + "PRIMARY KEY (mailbox, user)" + ");");
         } else {

Modified: 
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
 Sun Mar  8 12:09:42 2015
@@ -19,9 +19,12 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -32,8 +35,11 @@ import javax.mail.util.SharedByteArrayIn
 
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.CassandraClusterSingleton;
+import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.mock.MockMailboxSession;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.Message;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
@@ -136,6 +142,69 @@ public class CassandraMessageMapperTest
         testAdd();
         testGetLastUid();
         testGetHighestModSeq();
+        testMessageUpdateReplace();
+        testMessageUpdateAddition();
+    }
+
+    /**
+     * Test message flag replacement
+     */
+    private void testMessageUpdateReplace() throws MailboxException {
+        LOG.info("message update : replace flags");
+        Flags flags = new Flags();
+        flags.add(Flags.Flag.ANSWERED);
+        flags.add(Flags.Flag.DRAFT);
+        messageMapper.updateFlags(MBOXES.get(1), flags, true, true, 
MessageRange.all());
+        Iterator<Message<UUID>> messageIterator = 
messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(), 
MessageMapper.FetchType.Full, 100);
+        while(messageIterator.hasNext()) {
+            Message<UUID> message = messageIterator.next();
+            assertTrue(message.isAnswered());
+            assertTrue(message.isDraft());
+            assertFalse(message.isDeleted());
+            assertFalse(message.isRecent());
+            assertFalse(message.isSeen());
+            assertFalse(message.isFlagged());
+        }
+    }
+
+    /**
+     * Test message flag set to true
+     */
+    private void testMessageUpdateAddition() throws MailboxException {
+        LOG.info("message update : flag addition");
+        Flags flags = new Flags();
+        flags.add(Flags.Flag.FLAGGED);
+        messageMapper.updateFlags(MBOXES.get(1), flags, true, false, 
MessageRange.all());
+        Iterator<Message<UUID>> messageIterator = 
messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(), 
MessageMapper.FetchType.Full, 100);
+        while(messageIterator.hasNext()) {
+            Message<UUID> message = messageIterator.next();
+            assertTrue(message.isAnswered());
+            assertTrue(message.isDraft());
+            assertFalse(message.isDeleted());
+            assertFalse(message.isRecent());
+            assertFalse(message.isSeen());
+            assertTrue(message.isFlagged());
+        }
+    }
+
+    /**
+     * Test message flag removal
+     */
+    private void testMessageUpdateRemove() throws MailboxException {
+        LOG.info("message update : flag removal");
+        Flags flags = new Flags();
+        flags.add(Flags.Flag.ANSWERED);
+        messageMapper.updateFlags(MBOXES.get(1), flags, false, false, 
MessageRange.all());
+        Iterator<Message<UUID>> messageIterator = 
messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(), 
MessageMapper.FetchType.Full, 100);
+        while(messageIterator.hasNext()) {
+            Message<UUID> message = messageIterator.next();
+            assertFalse(message.isAnswered());
+            assertTrue(message.isDraft());
+            assertFalse(message.isDeleted());
+            assertFalse(message.isRecent());
+            assertFalse(message.isSeen());
+            assertTrue(message.isFlagged());
+        }
     }
 
     /**



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to