Author: btellier Date: Wed Jun 17 09:12:45 2015 New Revision: 1685967 URL: http://svn.apache.org/r1685967 Log: MAILBOX-209 Correcting race condition for modSeq generation in Cassandra mailbox - patch contributed by Matthieu Baechler
Added: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidAndModSeqProviderTest.java Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java?rev=1685967&r1=1685966&r2=1685967&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java Wed Jun 17 09:12:45 2015 @@ -30,13 +30,14 @@ import static com.datastax.driver.core.D import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.schemabuilder.Create; - import com.datastax.driver.core.Session; import com.datastax.driver.core.schemabuilder.SchemaBuilder; import com.datastax.driver.core.schemabuilder.SchemaStatement; + import org.apache.james.mailbox.cassandra.table.CassandraACLTable; import org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable; import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable; +import org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable; import org.apache.james.mailbox.cassandra.table.CassandraMessageTable; import org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable; import org.apache.james.mailbox.cassandra.table.CassandraSubscriptionTable; @@ -63,8 +64,7 @@ public class CassandraTableManager { .ifNotExists() .addPartitionKey(CassandraMailboxCountersTable.MAILBOX_ID, timeuuid()) .addColumn(CassandraMailboxCountersTable.COUNT, counter()) - .addColumn(CassandraMailboxCountersTable.UNSEEN, counter()) - .addColumn(CassandraMailboxCountersTable.NEXT_MOD_SEQ, counter())), + .addColumn(CassandraMailboxCountersTable.UNSEEN, counter())), MessageUid(CassandraMessageUidTable.TABLE_NAME, SchemaBuilder.createTable(CassandraMessageUidTable.TABLE_NAME) .ifNotExists() @@ -103,9 +103,14 @@ public class CassandraTableManager { .ifNotExists() .addPartitionKey(CassandraACLTable.ID, timeuuid()) .addColumn(CassandraACLTable.ACL, text()) - .addColumn(CassandraACLTable.VERSION, bigint()) - ) + .addColumn(CassandraACLTable.VERSION, bigint())), + ModSeq(CassandraMessageModseqTable.TABLE_NAME, + SchemaBuilder.createTable(CassandraMessageModseqTable.TABLE_NAME) + .ifNotExists() + .addPartitionKey(CassandraMessageModseqTable.MAILBOX_ID, timeuuid()) + .addColumn(CassandraMessageModseqTable.NEXT_MODSEQ, bigint())) ; + private Create createStatement; private String name; Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java?rev=1685967&r1=1685966&r2=1685967&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java Wed Jun 17 09:12:45 2015 @@ -20,40 +20,130 @@ package org.apache.james.mailbox.cassandra.mail; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; -import static com.datastax.driver.core.querybuilder.QueryBuilder.incr; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static com.datastax.driver.core.querybuilder.QueryBuilder.set; import static com.datastax.driver.core.querybuilder.QueryBuilder.update; -import static org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.MAILBOX_ID; -import static org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.NEXT_MOD_SEQ; -import static org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.TABLE_NAME; +import static org.apache.james.mailbox.cassandra.CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.MAILBOX_ID; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.NEXT_MODSEQ; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME; +import java.util.Optional; import java.util.UUID; import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.cassandra.mail.utils.FunctionRunnerWithRetry; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.store.mail.ModSeqProvider; import org.apache.james.mailbox.store.mail.model.Mailbox; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.BuiltStatement; +import com.google.common.base.Throwables; public class CassandraModSeqProvider implements ModSeqProvider<UUID> { - private Session session; + private static final int DEFAULT_MAX_RETRY = 100000; + private static final Logger LOG = LoggerFactory.getLogger(CassandraModSeqProvider.class); + private static final ModSeq FIRST_MODSEQ = new ModSeq(0); + + private final Session session; + private final FunctionRunnerWithRetry runner; - public CassandraModSeqProvider(Session session) { + public CassandraModSeqProvider(Session session, int maxRetry) { this.session = session; + this.runner = new FunctionRunnerWithRetry(maxRetry); + } + + public CassandraModSeqProvider(Session session) { + this(session, DEFAULT_MAX_RETRY); } @Override public long nextModSeq(MailboxSession mailboxSession, Mailbox<UUID> mailbox) throws MailboxException { - session.execute(update(TABLE_NAME).with(incr(NEXT_MOD_SEQ)).where(eq(MAILBOX_ID, mailbox.getMailboxId()))); - return highestModSeq(mailboxSession, mailbox); + if (findHighestModSeq(mailboxSession, mailbox).isFirst()) { + Optional<ModSeq> optional = tryInsertModSeq(mailbox, FIRST_MODSEQ); + if (optional.isPresent()) { + return optional.get().getValue(); + } + } + + return runner.executeAndRetrieveObject( + () -> { + try { + return tryUpdateModSeq(mailbox, findHighestModSeq(mailboxSession, mailbox)) + .map(ModSeq::getValue); + } catch (Exception exception) { + LOG.error("Can not retrieve next ModSeq", exception); + throw Throwables.propagate(exception); + } + }); } @Override public long highestModSeq(MailboxSession mailboxSession, Mailbox<UUID> mailbox) throws MailboxException { - ResultSet result = session.execute(select(NEXT_MOD_SEQ).from(TABLE_NAME).where(eq(MAILBOX_ID, mailbox.getMailboxId()))); - return result.isExhausted() ? 0 : result.one().getLong(NEXT_MOD_SEQ); + return findHighestModSeq(mailboxSession, mailbox).getValue(); + } + + private ModSeq findHighestModSeq(MailboxSession mailboxSession, Mailbox<UUID> mailbox) throws MailboxException { + ResultSet result = session.execute( + select(NEXT_MODSEQ) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, mailbox.getMailboxId()))); + if (result.isExhausted()) { + return FIRST_MODSEQ; + } else { + return new ModSeq(result.one().getLong(NEXT_MODSEQ)); + } + } + + private Optional<ModSeq> tryInsertModSeq(Mailbox<UUID> mailbox, ModSeq modSeq) { + ModSeq nextModSeq = modSeq.next(); + return transactionalStatementToOptionalModSeq(nextModSeq, + insertInto(TABLE_NAME) + .value(NEXT_MODSEQ, nextModSeq.getValue()) + .value(MAILBOX_ID, mailbox.getMailboxId()) + .ifNotExists()); + } + + private Optional<ModSeq> tryUpdateModSeq(Mailbox<UUID> mailbox, ModSeq modSeq) { + ModSeq nextModSeq = modSeq.next(); + return transactionalStatementToOptionalModSeq(nextModSeq, + update(TABLE_NAME) + .onlyIf(eq(NEXT_MODSEQ, modSeq.getValue())) + .with(set(NEXT_MODSEQ, nextModSeq.getValue())) + .where(eq(MAILBOX_ID, mailbox.getMailboxId()))); + } + + private Optional<ModSeq> transactionalStatementToOptionalModSeq(ModSeq modSeq, BuiltStatement statement) { + if(session.execute(statement).one().getBool(LIGHTWEIGHT_TRANSACTION_APPLIED)) { + return Optional.of(modSeq); + } + return Optional.empty(); + } + + private static class ModSeq { + + private final long value; + + public ModSeq(long value) { + this.value = value; + } + + public ModSeq next() { + return new ModSeq(value + 1); + } + + public long getValue() { + return value; + } + + public boolean isFirst() { + return value == FIRST_MODSEQ.value; + } } } Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java?rev=1685967&r1=1685966&r2=1685967&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java Wed Jun 17 09:12:45 2015 @@ -24,5 +24,4 @@ public interface CassandraMailboxCounter String MAILBOX_ID = "mailboxId"; String COUNT = "count"; String UNSEEN = "unseen"; - String NEXT_MOD_SEQ = "nextModSeq"; } \ No newline at end of file Added: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java?rev=1685967&view=auto ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java (added) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java Wed Jun 17 09:12:45 2015 @@ -0,0 +1,26 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.table; + +public interface CassandraMessageModseqTable { + String TABLE_NAME = "modseq"; + String MAILBOX_ID = "mailboxId"; + String NEXT_MODSEQ = "nextModseq"; +} Modified: james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidAndModSeqProviderTest.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidAndModSeqProviderTest.java?rev=1685967&r1=1685966&r2=1685967&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidAndModSeqProviderTest.java (original) +++ james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidAndModSeqProviderTest.java Wed Jun 17 09:12:45 2015 @@ -18,20 +18,25 @@ ****************************************************************/ package org.apache.james.mailbox.cassandra.mail; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongConsumer; +import java.util.stream.LongStream; import org.apache.james.mailbox.cassandra.CassandraClusterSingleton; +import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import com.google.common.base.Throwables; /** * Unit tests for UidProvider and ModSeqProvider. @@ -39,18 +44,18 @@ import org.slf4j.LoggerFactory; */ public class CassandraUidAndModSeqProviderTest { - private static final Logger LOG = LoggerFactory.getLogger(CassandraUidAndModSeqProviderTest.class); private static final CassandraClusterSingleton CASSANDRA = CassandraClusterSingleton.build(); - private static CassandraUidProvider uidProvider; - private static CassandraModSeqProvider modSeqProvider; - private static CassandraMailboxMapper mapper; - private static List<SimpleMailbox<UUID>> mailboxList; - private static List<MailboxPath> pathsList; private static final int NAMESPACES = 5; private static final int USERS = 5; private static final int MAILBOX_NO = 5; private static final int MAX_RETRY = 100; private static final char SEPARATOR = '%'; + + private CassandraUidProvider uidProvider; + private CassandraModSeqProvider modSeqProvider; + private CassandraMailboxMapper mapper; + private List<SimpleMailbox<UUID>> mailboxList; + private List<MailboxPath> pathsList; @Before public void setUpClass() throws Exception { @@ -63,13 +68,13 @@ public class CassandraUidAndModSeqProvid mapper.save(mailbox); } } - + @After public void cleanUp() { CASSANDRA.clearAllTables(); } - private static void fillMailboxList() { + private void fillMailboxList() { mailboxList = new ArrayList<>(); pathsList = new ArrayList<>(); MailboxPath path; @@ -88,52 +93,40 @@ public class CassandraUidAndModSeqProvid } } } - - LOG.info("Created test case with {} mailboxes and {} paths", mailboxList.size(), pathsList.size()); } - /** - * Test of lastUid method, of class CassandraUidProvider. - */ @Test - public void testLastUid() throws Exception { - LOG.info("lastUid"); - final MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash"); - final SimpleMailbox<UUID> newBox = new SimpleMailbox<>(path, 1234); + public void lastUidShouldRetrieveValueStoredByNextUid() throws Exception { + MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash"); + SimpleMailbox<UUID> newBox = new SimpleMailbox<>(path, 1234); mapper.save(newBox); mailboxList.add(newBox); pathsList.add(path); - final long result = uidProvider.lastUid(null, newBox); + long result = uidProvider.lastUid(null, newBox); assertEquals(0, result); - for (int i = 1; i < 10; i++) { - final long uid = uidProvider.nextUid(null, newBox); - assertEquals(uid, uidProvider.lastUid(null, newBox)); - } + LongStream.range(1, 10) + .forEach(propagateException(value -> { + long uid = uidProvider.nextUid(null, newBox); + assertThat(uid).isEqualTo(uidProvider.lastUid(null, newBox)); + }) + ); } - /** - * Test of nextUid method, of class CassandraUidProvider. - */ @Test - public void testNextUid() throws Exception { - LOG.info("nextUid"); - final SimpleMailbox<UUID> mailbox = mailboxList.get(mailboxList.size() / 2); - final long lastUid = uidProvider.lastUid(null, mailbox); - long result; - for (int i = (int) lastUid + 1; i < (lastUid + 10); i++) { - result = uidProvider.nextUid(null, mailbox); - assertEquals(i, result); - } + public void nextUidShouldIncrementValueByOne() throws Exception { + SimpleMailbox<UUID> mailbox = mailboxList.get(mailboxList.size() / 2); + long lastUid = uidProvider.lastUid(null, mailbox); + LongStream.range(lastUid + 1, lastUid + 10) + .forEach(propagateException(value -> { + long result = uidProvider.nextUid(null, mailbox); + assertThat(value).isEqualTo(result); + }) + ); } - /** - * Test of highestModSeq method, of class CassandraModSeqProvider. - */ @Test - public void testHighestModSeq() throws Exception { - LOG.info("highestModSeq"); - LOG.info("lastUid"); + public void highestModSeqShouldRetrieveValueStoredNextModSeq() throws Exception { MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash"); SimpleMailbox<UUID> newBox = new SimpleMailbox<>(path, 1234); mapper.save(newBox); @@ -142,24 +135,53 @@ public class CassandraUidAndModSeqProvid long result = modSeqProvider.highestModSeq(null, newBox); assertEquals(0, result); - for (int i = 1; i < 10; i++) { - long uid = modSeqProvider.nextModSeq(null, newBox); - assertEquals(uid, modSeqProvider.highestModSeq(null, newBox)); - } + LongStream.range(1, 10) + .forEach(propagateException(value -> { + long uid = modSeqProvider.nextModSeq(null, newBox); + assertThat(uid).isEqualTo(modSeqProvider.highestModSeq(null, newBox)); + }) + ); } - /** - * Test of nextModSeq method, of class CassandraModSeqProvider. - */ @Test - public void testNextModSeq() throws Exception { - LOG.info("nextModSeq"); - final SimpleMailbox<UUID> mailbox = mailboxList.get(mailboxList.size() / 2); - final long lastUid = modSeqProvider.highestModSeq(null, mailbox); - long result; - for (int i = (int) lastUid + 1; i < (lastUid + 10); i++) { - result = modSeqProvider.nextModSeq(null, mailbox); - assertEquals(i, result); - } + public void nextModSeqShouldIncrementValueByOne() throws Exception { + SimpleMailbox<UUID> mailbox = mailboxList.get(mailboxList.size() / 2); + long lastUid = modSeqProvider.highestModSeq(null, mailbox); + LongStream.range(lastUid + 1, lastUid + 10) + .forEach(propagateException(value -> { + long result = modSeqProvider.nextModSeq(null, mailbox); + assertThat(value).isEqualTo(result); + }) + ); + } + + @Test + public void nextModSeqShouldIncrementValueWhenParallelCalls() throws Exception { + SimpleMailbox<UUID> mailbox = mailboxList.get(mailboxList.size() / 2); + long lastUid = modSeqProvider.highestModSeq(null, mailbox); + final AtomicLong previousValue = new AtomicLong(); + LongStream.range(lastUid + 1, lastUid + 10) + .parallel() + .forEach(propagateException(value -> { + long result = modSeqProvider.nextModSeq(null, mailbox); + assertThat(result).isGreaterThan(previousValue.get()); + previousValue.set(result); + }) + ); + } + + @FunctionalInterface + private interface ConsumerThatThrowsMailboxException<T> { + void apply(T arg) throws MailboxException; + } + + private LongConsumer propagateException(ConsumerThatThrowsMailboxException<Long> function) { + return (value) -> { + try { + function.apply(value); + } catch (MailboxException e) { + Throwables.propagate(e); + } + }; } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org