Author: btellier
Date: Wed Jun 17 09:12:45 2015
New Revision: 1685967

URL: http://svn.apache.org/r1685967
Log:
MAILBOX-209 Correcting race condition for modSeq generation in Cassandra 
mailbox - patch contributed by Matthieu Baechler

Added:
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java
Modified:
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java
    
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidAndModSeqProviderTest.java

Modified: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java?rev=1685967&r1=1685966&r2=1685967&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
 Wed Jun 17 09:12:45 2015
@@ -30,13 +30,14 @@ import static com.datastax.driver.core.D
 
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.schemabuilder.Create;
-
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
 import com.datastax.driver.core.schemabuilder.SchemaStatement;
+
 import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
 import org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable;
 import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable;
 import org.apache.james.mailbox.cassandra.table.CassandraMessageTable;
 import org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable;
 import org.apache.james.mailbox.cassandra.table.CassandraSubscriptionTable;
@@ -63,8 +64,7 @@ public class CassandraTableManager {
                 .ifNotExists()
                 .addPartitionKey(CassandraMailboxCountersTable.MAILBOX_ID, 
timeuuid())
                 .addColumn(CassandraMailboxCountersTable.COUNT, counter())
-                .addColumn(CassandraMailboxCountersTable.UNSEEN, counter())
-                .addColumn(CassandraMailboxCountersTable.NEXT_MOD_SEQ, 
counter())),
+                .addColumn(CassandraMailboxCountersTable.UNSEEN, counter())),
         MessageUid(CassandraMessageUidTable.TABLE_NAME,
             SchemaBuilder.createTable(CassandraMessageUidTable.TABLE_NAME)
                 .ifNotExists()
@@ -103,9 +103,14 @@ public class CassandraTableManager {
                 .ifNotExists()
                 .addPartitionKey(CassandraACLTable.ID, timeuuid())
                 .addColumn(CassandraACLTable.ACL, text())
-                .addColumn(CassandraACLTable.VERSION, bigint())
-        )
+                .addColumn(CassandraACLTable.VERSION, bigint())),
+        ModSeq(CassandraMessageModseqTable.TABLE_NAME,
+            SchemaBuilder.createTable(CassandraMessageModseqTable.TABLE_NAME)
+                .ifNotExists()
+                .addPartitionKey(CassandraMessageModseqTable.MAILBOX_ID, 
timeuuid())
+                .addColumn(CassandraMessageModseqTable.NEXT_MODSEQ, bigint()))
         ;
+
         private Create createStatement;
         private String name;
 

Modified: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java?rev=1685967&r1=1685966&r2=1685967&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
 Wed Jun 17 09:12:45 2015
@@ -20,40 +20,130 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.incr;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
-import static 
org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.MAILBOX_ID;
-import static 
org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.NEXT_MOD_SEQ;
-import static 
org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.TABLE_NAME;
+import static 
org.apache.james.mailbox.cassandra.CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.MAILBOX_ID;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.NEXT_MODSEQ;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
 
+import java.util.Optional;
 import java.util.UUID;
 
 import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.cassandra.mail.utils.FunctionRunnerWithRetry;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.BuiltStatement;
+import com.google.common.base.Throwables;
 
 public class CassandraModSeqProvider implements ModSeqProvider<UUID> {
 
-    private Session session;
+    private static final int DEFAULT_MAX_RETRY = 100000;
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraModSeqProvider.class);
+    private static final ModSeq FIRST_MODSEQ = new ModSeq(0);
+    
+    private final Session session;
+    private final FunctionRunnerWithRetry runner;
 
-    public CassandraModSeqProvider(Session session) {
+    public CassandraModSeqProvider(Session session, int maxRetry) {
         this.session = session;
+        this.runner = new FunctionRunnerWithRetry(maxRetry);
+    }
+
+    public CassandraModSeqProvider(Session session) {
+        this(session, DEFAULT_MAX_RETRY);
     }
 
     @Override
     public long nextModSeq(MailboxSession mailboxSession, Mailbox<UUID> 
mailbox) throws MailboxException {
-        
session.execute(update(TABLE_NAME).with(incr(NEXT_MOD_SEQ)).where(eq(MAILBOX_ID,
 mailbox.getMailboxId())));
-        return highestModSeq(mailboxSession, mailbox);
+        if (findHighestModSeq(mailboxSession, mailbox).isFirst()) {
+            Optional<ModSeq> optional = tryInsertModSeq(mailbox, FIRST_MODSEQ);
+            if (optional.isPresent()) {
+                return optional.get().getValue();
+            }
+        }
+        
+        return runner.executeAndRetrieveObject(
+                    () -> {
+                        try {
+                            return tryUpdateModSeq(mailbox, 
findHighestModSeq(mailboxSession, mailbox))
+                                    .map(ModSeq::getValue);
+                        } catch (Exception exception) {
+                            LOG.error("Can not retrieve next ModSeq", 
exception);
+                            throw Throwables.propagate(exception);
+                        }
+                    });
     }
 
     @Override
     public long highestModSeq(MailboxSession mailboxSession, Mailbox<UUID> 
mailbox) throws MailboxException {
-        ResultSet result = 
session.execute(select(NEXT_MOD_SEQ).from(TABLE_NAME).where(eq(MAILBOX_ID, 
mailbox.getMailboxId())));
-        return result.isExhausted() ? 0 : result.one().getLong(NEXT_MOD_SEQ);
+        return findHighestModSeq(mailboxSession, mailbox).getValue();
+    }
+    
+    private ModSeq findHighestModSeq(MailboxSession mailboxSession, 
Mailbox<UUID> mailbox) throws MailboxException {
+        ResultSet result = session.execute(
+                select(NEXT_MODSEQ)
+                    .from(TABLE_NAME)
+                    .where(eq(MAILBOX_ID, mailbox.getMailboxId())));
+        if (result.isExhausted()) {
+            return FIRST_MODSEQ;
+        } else {
+            return new ModSeq(result.one().getLong(NEXT_MODSEQ));
+        }
+    }
+
+    private Optional<ModSeq> tryInsertModSeq(Mailbox<UUID> mailbox, ModSeq 
modSeq) {
+        ModSeq nextModSeq = modSeq.next();
+        return transactionalStatementToOptionalModSeq(nextModSeq,
+                insertInto(TABLE_NAME)
+                    .value(NEXT_MODSEQ, nextModSeq.getValue())
+                    .value(MAILBOX_ID, mailbox.getMailboxId())
+                    .ifNotExists());
+    }
+    
+    private Optional<ModSeq> tryUpdateModSeq(Mailbox<UUID> mailbox, ModSeq 
modSeq) {
+        ModSeq nextModSeq = modSeq.next();
+        return transactionalStatementToOptionalModSeq(nextModSeq,
+                update(TABLE_NAME)
+                    .onlyIf(eq(NEXT_MODSEQ, modSeq.getValue()))
+                    .with(set(NEXT_MODSEQ, nextModSeq.getValue()))
+                    .where(eq(MAILBOX_ID, mailbox.getMailboxId())));
+    }
+
+    private Optional<ModSeq> transactionalStatementToOptionalModSeq(ModSeq 
modSeq, BuiltStatement statement) {
+        
if(session.execute(statement).one().getBool(LIGHTWEIGHT_TRANSACTION_APPLIED)) {
+            return Optional.of(modSeq);
+        }
+        return Optional.empty();
+    }
+    
+    private static class ModSeq {
+        
+        private final long value;
+        
+        public ModSeq(long value) {
+            this.value = value;
+        }
+        
+        public ModSeq next() {
+            return new ModSeq(value + 1);
+        }
+        
+        public long getValue() {
+            return value;
+        }
+        
+        public boolean isFirst() {
+            return value == FIRST_MODSEQ.value;
+        }
     }
 }

Modified: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java?rev=1685967&r1=1685966&r2=1685967&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java
 Wed Jun 17 09:12:45 2015
@@ -24,5 +24,4 @@ public interface CassandraMailboxCounter
     String MAILBOX_ID = "mailboxId";
     String COUNT = "count";
     String UNSEEN = "unseen";
-    String NEXT_MOD_SEQ = "nextModSeq";
 }
\ No newline at end of file

Added: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java?rev=1685967&view=auto
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java
 (added)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java
 Wed Jun 17 09:12:45 2015
@@ -0,0 +1,26 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.table;
+
+public interface CassandraMessageModseqTable {
+    String TABLE_NAME = "modseq";
+    String MAILBOX_ID = "mailboxId";
+    String NEXT_MODSEQ = "nextModseq";
+}

Modified: 
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidAndModSeqProviderTest.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidAndModSeqProviderTest.java?rev=1685967&r1=1685966&r2=1685967&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidAndModSeqProviderTest.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidAndModSeqProviderTest.java
 Wed Jun 17 09:12:45 2015
@@ -18,20 +18,25 @@
  ****************************************************************/
 package org.apache.james.mailbox.cassandra.mail;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongConsumer;
+import java.util.stream.LongStream;
 
 import org.apache.james.mailbox.cassandra.CassandraClusterSingleton;
+import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Throwables;
 
 /**
  * Unit tests for UidProvider and ModSeqProvider.
@@ -39,18 +44,18 @@ import org.slf4j.LoggerFactory;
  */
 public class CassandraUidAndModSeqProviderTest {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraUidAndModSeqProviderTest.class);
     private static final CassandraClusterSingleton CASSANDRA = 
CassandraClusterSingleton.build();
-    private static CassandraUidProvider uidProvider;
-    private static CassandraModSeqProvider modSeqProvider;
-    private static CassandraMailboxMapper mapper;
-    private static List<SimpleMailbox<UUID>> mailboxList;
-    private static List<MailboxPath> pathsList;
     private static final int NAMESPACES = 5;
     private static final int USERS = 5;
     private static final int MAILBOX_NO = 5;
     private static final int MAX_RETRY = 100;
     private static final char SEPARATOR = '%';
+    
+    private CassandraUidProvider uidProvider;
+    private CassandraModSeqProvider modSeqProvider;
+    private CassandraMailboxMapper mapper;
+    private List<SimpleMailbox<UUID>> mailboxList;
+    private List<MailboxPath> pathsList;
 
     @Before
     public void setUpClass() throws Exception {
@@ -63,13 +68,13 @@ public class CassandraUidAndModSeqProvid
             mapper.save(mailbox);
         }
     }
-
+    
     @After
     public void cleanUp() {
         CASSANDRA.clearAllTables();
     }
 
-    private static void fillMailboxList() {
+    private void fillMailboxList() {
         mailboxList = new ArrayList<>();
         pathsList = new ArrayList<>();
         MailboxPath path;
@@ -88,52 +93,40 @@ public class CassandraUidAndModSeqProvid
                 }
             }
         }
-
-        LOG.info("Created test case with {} mailboxes and {} paths", 
mailboxList.size(), pathsList.size());
     }
 
-    /**
-     * Test of lastUid method, of class CassandraUidProvider.
-     */
     @Test
-    public void testLastUid() throws Exception {
-        LOG.info("lastUid");
-        final MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash");
-        final SimpleMailbox<UUID> newBox = new SimpleMailbox<>(path, 1234);
+    public void lastUidShouldRetrieveValueStoredByNextUid() throws Exception {
+        MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash");
+        SimpleMailbox<UUID> newBox = new SimpleMailbox<>(path, 1234);
         mapper.save(newBox);
         mailboxList.add(newBox);
         pathsList.add(path);
 
-        final long result = uidProvider.lastUid(null, newBox);
+        long result = uidProvider.lastUid(null, newBox);
         assertEquals(0, result);
-        for (int i = 1; i < 10; i++) {
-            final long uid = uidProvider.nextUid(null, newBox);
-            assertEquals(uid, uidProvider.lastUid(null, newBox));
-        }
+        LongStream.range(1, 10)
+            .forEach(propagateException(value -> {
+                        long uid = uidProvider.nextUid(null, newBox);
+                        assertThat(uid).isEqualTo(uidProvider.lastUid(null, 
newBox));
+                })
+            );
     }
 
-    /**
-     * Test of nextUid method, of class CassandraUidProvider.
-     */
     @Test
-    public void testNextUid() throws Exception {
-        LOG.info("nextUid");
-        final SimpleMailbox<UUID> mailbox = mailboxList.get(mailboxList.size() 
/ 2);
-        final long lastUid = uidProvider.lastUid(null, mailbox);
-        long result;
-        for (int i = (int) lastUid + 1; i < (lastUid + 10); i++) {
-            result = uidProvider.nextUid(null, mailbox);
-            assertEquals(i, result);
-        }
+    public void nextUidShouldIncrementValueByOne() throws Exception {
+        SimpleMailbox<UUID> mailbox = mailboxList.get(mailboxList.size() / 2);
+        long lastUid = uidProvider.lastUid(null, mailbox);
+        LongStream.range(lastUid + 1, lastUid + 10)
+            .forEach(propagateException(value -> {
+                        long result = uidProvider.nextUid(null, mailbox);
+                        assertThat(value).isEqualTo(result);
+                })
+            );
     }
 
-    /**
-     * Test of highestModSeq method, of class CassandraModSeqProvider.
-     */
     @Test
-    public void testHighestModSeq() throws Exception {
-        LOG.info("highestModSeq");
-        LOG.info("lastUid");
+    public void highestModSeqShouldRetrieveValueStoredNextModSeq() throws 
Exception {
         MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash");
         SimpleMailbox<UUID> newBox = new SimpleMailbox<>(path, 1234);
         mapper.save(newBox);
@@ -142,24 +135,53 @@ public class CassandraUidAndModSeqProvid
 
         long result = modSeqProvider.highestModSeq(null, newBox);
         assertEquals(0, result);
-        for (int i = 1; i < 10; i++) {
-            long uid = modSeqProvider.nextModSeq(null, newBox);
-            assertEquals(uid, modSeqProvider.highestModSeq(null, newBox));
-        }
+        LongStream.range(1, 10)
+            .forEach(propagateException(value -> {
+                        long uid = modSeqProvider.nextModSeq(null, newBox);
+                        
assertThat(uid).isEqualTo(modSeqProvider.highestModSeq(null, newBox));
+                })
+            );
     }
 
-    /**
-     * Test of nextModSeq method, of class CassandraModSeqProvider.
-     */
     @Test
-    public void testNextModSeq() throws Exception {
-        LOG.info("nextModSeq");
-        final SimpleMailbox<UUID> mailbox = mailboxList.get(mailboxList.size() 
/ 2);
-        final long lastUid = modSeqProvider.highestModSeq(null, mailbox);
-        long result;
-        for (int i = (int) lastUid + 1; i < (lastUid + 10); i++) {
-            result = modSeqProvider.nextModSeq(null, mailbox);
-            assertEquals(i, result);
-        }
+    public void nextModSeqShouldIncrementValueByOne() throws Exception {
+        SimpleMailbox<UUID> mailbox = mailboxList.get(mailboxList.size() / 2);
+        long lastUid = modSeqProvider.highestModSeq(null, mailbox);
+        LongStream.range(lastUid + 1, lastUid + 10)
+            .forEach(propagateException(value -> {
+                        long result = modSeqProvider.nextModSeq(null, mailbox);
+                        assertThat(value).isEqualTo(result);
+                })
+            );
+    }
+
+    @Test
+    public void nextModSeqShouldIncrementValueWhenParallelCalls() throws 
Exception {
+        SimpleMailbox<UUID> mailbox = mailboxList.get(mailboxList.size() / 2);
+        long lastUid = modSeqProvider.highestModSeq(null, mailbox);
+        final AtomicLong previousValue = new AtomicLong();
+        LongStream.range(lastUid + 1, lastUid + 10)
+            .parallel()
+            .forEach(propagateException(value -> {
+                        long result = modSeqProvider.nextModSeq(null, mailbox);
+                        assertThat(result).isGreaterThan(previousValue.get());
+                        previousValue.set(result);
+                })
+            );
+    }
+    
+    @FunctionalInterface
+    private interface ConsumerThatThrowsMailboxException<T> {
+        void apply(T arg) throws MailboxException;
+    }
+    
+    private LongConsumer 
propagateException(ConsumerThatThrowsMailboxException<Long> function) {
+        return (value) -> {
+            try {
+                function.apply(value);
+            } catch (MailboxException e) {
+                Throwables.propagate(e);
+            }
+        };
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to