JAMES-1945 Handle modSeq computation as future Prepare statements for Cassandra ModSeqs
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f2b7cd70 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f2b7cd70 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f2b7cd70 Branch: refs/heads/master Commit: f2b7cd70787072ceccc13442951118272c56cc85 Parents: 9200405 Author: Benoit Tellier <[email protected]> Authored: Mon Feb 20 10:47:03 2017 +0700 Committer: benwa <[email protected]> Committed: Thu Feb 23 10:37:41 2017 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraModSeqProvider.java | 173 ++++++++++++------- 1 file changed, 110 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/f2b7cd70/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java index af4ca5d..1a6a885 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.mail; +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; @@ -29,38 +30,87 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTab import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import javax.inject.Inject; -import org.apache.james.backends.cassandra.utils.CassandraConstants; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; -import org.apache.james.backends.cassandra.utils.LightweightTransactionException; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.cassandra.CassandraId; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.store.mail.ModSeqProvider; import org.apache.james.mailbox.store.mail.model.Mailbox; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; -import com.datastax.driver.core.querybuilder.BuiltStatement; -import com.google.common.base.Throwables; +import com.google.common.base.Supplier; public class CassandraModSeqProvider implements ModSeqProvider { + public static final String MOD_SEQ_CONDITION = "modSeqCondition"; + + public static class ExceptionRelay extends RuntimeException { + private final MailboxException underlying; + + public ExceptionRelay(MailboxException underlying) { + super(underlying); + this.underlying = underlying; + } + + public MailboxException getUnderlying() { + return underlying; + } + } + + private static <T> T unbox(Supplier<T> supplier) throws MailboxException { + try { + return supplier.get(); + } catch (CompletionException e) { + if (e.getCause() instanceof ExceptionRelay) { + throw ((ExceptionRelay) e.getCause()).getUnderlying(); + } + throw e; + } + } + private static final int DEFAULT_MAX_RETRY = 100000; - private static final Logger LOG = LoggerFactory.getLogger(CassandraModSeqProvider.class); private static final ModSeq FIRST_MODSEQ = new ModSeq(0); - - private final Session session; + + private final CassandraAsyncExecutor cassandraAsyncExecutor; private final FunctionRunnerWithRetry runner; + private final PreparedStatement select; + private final PreparedStatement update; + private final PreparedStatement insert; public CassandraModSeqProvider(Session session, int maxRetry) { - this.session = session; + this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.runner = new FunctionRunnerWithRetry(maxRetry); + this.insert = prepareInsert(session); + this.update = prepareUpdate(session); + this.select = prepareSelect(session); + } + + private PreparedStatement prepareInsert(Session session) { + return session.prepare(insertInto(TABLE_NAME) + .value(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ)) + .value(MAILBOX_ID, bindMarker(MAILBOX_ID)) + .ifNotExists()); + } + + private PreparedStatement prepareUpdate(Session session) { + return session.prepare(update(TABLE_NAME) + .onlyIf(eq(NEXT_MODSEQ, bindMarker(MOD_SEQ_CONDITION))) + .with(set(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ))) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); + } + + private PreparedStatement prepareSelect(Session session) { + return session.prepare(select(NEXT_MODSEQ) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); } @Inject @@ -71,87 +121,84 @@ public class CassandraModSeqProvider implements ModSeqProvider { @Override public long nextModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return nextModSeq(mailboxId); + return nextModSeq(mailboxId).join() + .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId)); } @Override public long nextModSeq(MailboxSession session, MailboxId mailboxId) throws MailboxException { - return nextModSeq((CassandraId)mailboxId); + return nextModSeq((CassandraId) mailboxId) + .join() + .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId)); } @Override public long highestModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException { - return findHighestModSeq((CassandraId) mailbox.getMailboxId()).getValue(); + return unbox(() -> findHighestModSeq((CassandraId) mailbox.getMailboxId()).join().getValue()); } @Override public long highestModSeq(MailboxSession mailboxSession, MailboxId mailboxId) throws MailboxException { - return findHighestModSeq((CassandraId) mailboxId).getValue(); - } - - private ModSeq findHighestModSeq(CassandraId mailboxId) throws MailboxException { - ResultSet result = session.execute( - select(NEXT_MODSEQ) - .from(TABLE_NAME) - .where(eq(MAILBOX_ID, mailboxId.asUuid()))); - if (result.isExhausted()) { - return FIRST_MODSEQ; - } else { - return new ModSeq(result.one().getLong(NEXT_MODSEQ)); - } + return unbox(() -> findHighestModSeq((CassandraId) mailboxId).join().getValue()); + } + + private CompletableFuture<ModSeq> findHighestModSeq(CassandraId mailboxId) { + return cassandraAsyncExecutor.executeSingleRow( + select.bind() + .setUUID(MAILBOX_ID, mailboxId.asUuid())) + .thenApply(optional -> optional.map(row -> new ModSeq(row.getLong(NEXT_MODSEQ))) + .orElse(FIRST_MODSEQ)); } - private Optional<ModSeq> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) { + private CompletableFuture<Optional<ModSeq>> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) { ModSeq nextModSeq = modSeq.next(); - return transactionalStatementToOptionalModSeq(nextModSeq, - insertInto(TABLE_NAME) - .value(NEXT_MODSEQ, nextModSeq.getValue()) - .value(MAILBOX_ID, mailboxId.asUuid()) - .ifNotExists()); + return cassandraAsyncExecutor.executeReturnApplied( + insert.bind() + .setUUID(MAILBOX_ID, mailboxId.asUuid()) + .setLong(NEXT_MODSEQ, nextModSeq.getValue())) + .thenApply(success -> successToModSeq(nextModSeq, success)); } - private Optional<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) { + private CompletableFuture<Optional<ModSeq>> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) { ModSeq nextModSeq = modSeq.next(); - return transactionalStatementToOptionalModSeq(nextModSeq, - update(TABLE_NAME) - .onlyIf(eq(NEXT_MODSEQ, modSeq.getValue())) - .with(set(NEXT_MODSEQ, nextModSeq.getValue())) - .where(eq(MAILBOX_ID, mailboxId.asUuid()))); + return cassandraAsyncExecutor.executeReturnApplied( + update.bind() + .setUUID(MAILBOX_ID, mailboxId.asUuid()) + .setLong(NEXT_MODSEQ, nextModSeq.getValue()) + .setLong(MOD_SEQ_CONDITION, modSeq.getValue())) + .thenApply(success -> successToModSeq(nextModSeq, success)); } - private Optional<ModSeq> transactionalStatementToOptionalModSeq(ModSeq modSeq, BuiltStatement statement) { - if(session.execute(statement).one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED)) { + private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) { + if (success) { return Optional.of(modSeq); } return Optional.empty(); } - private long nextModSeq(CassandraId mailboxId) throws MailboxException { - if (findHighestModSeq(mailboxId).isFirst()) { - Optional<ModSeq> optional = tryInsertModSeq(mailboxId, FIRST_MODSEQ); - if (optional.isPresent()) { - return optional.get().getValue(); - } - } - - try { - return runner.executeAndRetrieveObject( - () -> { - try { - return tryUpdateModSeq(mailboxId, findHighestModSeq(mailboxId)) - .map(ModSeq::getValue); - } catch (Exception exception) { - LOG.error("Can not retrieve next ModSeq", exception); - throw Throwables.propagate(exception); + public CompletableFuture<Optional<Long>> nextModSeq(CassandraId mailboxId) { + return findHighestModSeq(mailboxId) + .thenCompose(modSeq -> { + if (modSeq.isFirst()) { + return tryInsertModSeq(mailboxId, FIRST_MODSEQ); + } + return tryUpdateModSeq(mailboxId, modSeq); + }).thenCompose(firstInsert -> { + if (firstInsert.isPresent()) { + return CompletableFuture.completedFuture(firstInsert); } - }); - } catch (LightweightTransactionException e) { - throw new MailboxException("Error during ModSeq update", e); - } + return handleRetries(mailboxId); + }) + .thenApply(optional -> optional.map(ModSeq::getValue)); + } + + private CompletableFuture<Optional<ModSeq>> handleRetries(CassandraId mailboxId) { + return runner.executeAsyncAndRetrieveObject( + () -> findHighestModSeq(mailboxId) + .thenCompose(newModSeq -> tryUpdateModSeq(mailboxId, newModSeq))); } private static class ModSeq { - private final long value; public ModSeq(long value) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
