Author: btellier Date: Fri Jul 3 14:39:09 2015 New Revision: 1689024 URL: http://svn.apache.org/r1689024 Log: MAILBOX-208 Refactor Cassandra Uid provider ( similar to Cassandra Modseq provider )
Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java?rev=1689024&r1=1689023&r2=1689024&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java Fri Jul 3 14:39:09 2015 @@ -19,34 +19,42 @@ package org.apache.james.mailbox.cassandra.mail; +import java.util.Optional; + import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.set; import static com.datastax.driver.core.querybuilder.QueryBuilder.update; -import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.MAILBOX_ID; +import static org.apache.james.mailbox.cassandra.CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED; import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.NEXT_UID; -import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME; +import com.datastax.driver.core.querybuilder.BuiltStatement; +import com.google.common.base.Throwables; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.cassandra.CassandraId; +import org.apache.james.mailbox.cassandra.mail.utils.FunctionRunnerWithRetry; +import org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.store.mail.UidProvider; import org.apache.james.mailbox.store.mail.model.Mailbox; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CassandraUidProvider implements UidProvider<CassandraId> { public final static int DEFAULT_MAX_RETRY = 100000; + private static final Logger LOG = LoggerFactory.getLogger(CassandraUidProvider.class); + private static final Uid FIRST_UID = new Uid(0); - private Session session; - private final int applied = 0; - private int maxRetry; + private final Session session; + private final FunctionRunnerWithRetry runner; public CassandraUidProvider(Session session, int maxRetry) { this.session = session; - this.maxRetry = maxRetry; + this.runner = new FunctionRunnerWithRetry(maxRetry); } public CassandraUidProvider(Session session) { @@ -55,42 +63,86 @@ public class CassandraUidProvider implem @Override public long nextUid(MailboxSession mailboxSession, Mailbox<CassandraId> mailbox) throws MailboxException { - long lastUid = lastUid(mailboxSession, mailbox); - if (lastUid == 0) { - ResultSet result = session.execute( - insertInto(TABLE_NAME) - .value(NEXT_UID, ++lastUid) - .value(MAILBOX_ID, mailbox.getMailboxId().asUuid()) - .ifNotExists()); - if(result.one().getBool(applied)) { - return lastUid; + if (findHighestUid(mailbox).isFirst()) { + Optional<Uid> optional = tryInsertUid(mailbox, FIRST_UID); + if (optional.isPresent()) { + return optional.get().getValue(); } } - int tries = 0; - boolean isApplied; - do { - tries++; - lastUid = lastUid(mailboxSession, mailbox); - ResultSet result = session.execute( - update(TABLE_NAME) - .onlyIf(eq(NEXT_UID, lastUid)) - .with(set(NEXT_UID, ++lastUid)) - .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))); - isApplied = result.one().getBool(applied); - } while (! isApplied && tries < maxRetry); - if( ! isApplied ) { - throw new MailboxException("Can not obtain rights to manage UID"); - } - return lastUid; + + return runner.executeAndRetrieveObject( + () -> { + try { + return tryUpdateUid(mailbox, findHighestUid(mailbox)) + .map(Uid::getValue); + } catch (Exception exception) { + LOG.error("Can not retrieve next Uid", exception); + throw Throwables.propagate(exception); + } + }); } @Override public long lastUid(MailboxSession mailboxSession, Mailbox<CassandraId> mailbox) throws MailboxException { + return findHighestUid(mailbox).getValue(); + } + + private Uid findHighestUid(Mailbox<CassandraId> mailbox) throws MailboxException { ResultSet result = session.execute( - select(NEXT_UID) - .from(TABLE_NAME) - .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))); - return result.isExhausted() ? 0 : result.one().getLong(NEXT_UID); + select(NEXT_UID) + .from(CassandraMessageUidTable.TABLE_NAME) + .where(eq(CassandraMessageUidTable.MAILBOX_ID, mailbox.getMailboxId().asUuid()))); + if (result.isExhausted()) { + return FIRST_UID; + } else { + return new Uid(result.one().getLong(NEXT_UID)); + } + } + + private Optional<Uid> tryInsertUid(Mailbox<CassandraId> mailbox, Uid uid) { + Uid nextUid = uid.next(); + return transactionalStatementToOptionalUid(nextUid, + insertInto(CassandraMessageUidTable.TABLE_NAME) + .value(NEXT_UID, nextUid.getValue()) + .value(CassandraMessageUidTable.MAILBOX_ID, mailbox.getMailboxId().asUuid()) + .ifNotExists()); + } + + private Optional<Uid> tryUpdateUid(Mailbox<CassandraId> mailbox, Uid uid) { + Uid nextUid = uid.next(); + return transactionalStatementToOptionalUid(nextUid, + update(CassandraMessageUidTable.TABLE_NAME) + .onlyIf(eq(NEXT_UID, uid.getValue())) + .with(set(NEXT_UID, nextUid.getValue())) + .where(eq(CassandraMessageUidTable.MAILBOX_ID, mailbox.getMailboxId().asUuid()))); + } + + private Optional<Uid> transactionalStatementToOptionalUid(Uid uid, BuiltStatement statement) { + if(session.execute(statement).one().getBool(LIGHTWEIGHT_TRANSACTION_APPLIED)) { + return Optional.of(uid); + } + return Optional.empty(); + } + + private static class Uid { + + private final long value; + + public Uid(long value) { + this.value = value; + } + + public Uid next() { + return new Uid(value + 1); + } + + public long getValue() { + return value; + } + + public boolean isFirst() { + return value == FIRST_UID.value; + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org