MAILBOX-307 Prepare Cassandra ACL statements
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/91d303b1 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/91d303b1 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/91d303b1 Branch: refs/heads/master Commit: 91d303b1dc72bd2a69a3b1ed8e7b3b86e7d85ca9 Parents: ee68d17 Author: benwa <btell...@linagora.com> Authored: Mon Sep 25 17:17:47 2017 +0700 Committer: Matthieu Baechler <matth...@apache.org> Committed: Fri Sep 29 09:20:40 2017 +0200 ---------------------------------------------------------------------- .../cassandra/mail/CassandraACLMapper.java | 118 ++++++++++++------- 1 file changed, 76 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/91d303b1/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java index 5695ffc..b2603a2 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.mail; +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; @@ -45,15 +46,18 @@ import org.apache.james.mailbox.store.json.SimpleMailboxACLJsonConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.Insert; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Throwables; public class CassandraACLMapper { - private static final Logger LOG = LoggerFactory.getLogger(CassandraACLMapper.class); public static final int INITIAL_VALUE = 0; + private static final Logger LOG = LoggerFactory.getLogger(CassandraACLMapper.class); + private static final String OLD_VERSION = "oldVersion"; @FunctionalInterface public interface CodeInjector { @@ -61,43 +65,75 @@ public class CassandraACLMapper { } private final CassandraAsyncExecutor executor; - private final Session session; private final int maxRetry; private final CodeInjector codeInjector; + private final PreparedStatement insertStatement; + private final PreparedStatement conditionalInsertStatement; + private final PreparedStatement conditionalUpdateStatement; + private final PreparedStatement readStatement; public CassandraACLMapper(Session session, CassandraConfiguration cassandraConfiguration) { this(session, cassandraConfiguration, () -> {}); } public CassandraACLMapper(Session session, CassandraConfiguration cassandraConfiguration, CodeInjector codeInjector) { - this.session = session; this.executor = new CassandraAsyncExecutor(session); this.maxRetry = cassandraConfiguration.getAclMaxRetry(); this.codeInjector = codeInjector; + this.insertStatement = session.prepare(insertCqlBase()); + this.conditionalInsertStatement = session.prepare(insertCqlBase().ifNotExists()); + this.conditionalUpdateStatement = prepareConditionalUpdate(session); + this.readStatement = prepareReadStatement(session); + } + + private PreparedStatement prepareConditionalUpdate(Session session) { + return session.prepare( + update(CassandraACLTable.TABLE_NAME) + .where(eq(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))) + .with(set(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))) + .and(set(CassandraACLTable.VERSION, bindMarker(CassandraACLTable.VERSION))) + .onlyIf(eq(CassandraACLTable.VERSION, bindMarker(OLD_VERSION)))); + } + + private PreparedStatement prepareReadStatement(Session session) { + return session.prepare( + select(CassandraACLTable.ACL, CassandraACLTable.VERSION) + .from(CassandraACLTable.TABLE_NAME) + .where(eq(CassandraMailboxTable.ID, bindMarker(CassandraACLTable.ID)))); + } + + private Insert insertCqlBase() { + return insertInto(CassandraACLTable.TABLE_NAME) + .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID)) + .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL)) + .value(CassandraACLTable.VERSION, INITIAL_VALUE); } public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) { - return getStoredACLRow(cassandraId).thenApply(resultSet -> { - if (resultSet.isExhausted()) { - return SimpleMailboxACL.EMPTY; - } - String serializedACL = resultSet.one().getString(CassandraACLTable.ACL); - return deserializeACL(cassandraId, serializedACL); - }); + return getStoredACLRow(cassandraId) + .thenApply(resultSet -> getAcl(cassandraId, resultSet)); + } + + private MailboxACL getAcl(CassandraId cassandraId, ResultSet resultSet) { + if (resultSet.isExhausted()) { + return SimpleMailboxACL.EMPTY; + } + String serializedACL = resultSet.one().getString(CassandraACLTable.ACL); + return deserializeACL(cassandraId, serializedACL); } public void updateACL(CassandraId cassandraId, MailboxACL.MailboxACLCommand command) throws MailboxException { try { - new FunctionRunnerWithRetry(maxRetry).execute( - () -> { - codeInjector.inject(); - ResultSet resultSet = getAclWithVersion(cassandraId) - .map(aclWithVersion -> aclWithVersion.apply(command)) - .map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion)) - .orElseGet(() -> insertACL(cassandraId, applyCommandOnEmptyACL(command))); - return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED); - } - ); + new FunctionRunnerWithRetry(maxRetry) + .execute( + () -> { + codeInjector.inject(); + ResultSet resultSet = getAclWithVersion(cassandraId) + .map(aclWithVersion -> aclWithVersion.apply(command)) + .map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion)) + .orElseGet(() -> insertACL(cassandraId, applyCommandOnEmptyACL(command))); + return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED); + }); } catch (LightweightTransactionException e) { throw new MailboxException("Exception during lightweight transaction", e); } @@ -105,11 +141,11 @@ public class CassandraACLMapper { public void resetACL(CassandraId cassandraId, MailboxACL mailboxACL) { try { - session.execute( - insertInto(CassandraACLTable.TABLE_NAME) - .value(CassandraACLTable.ID, cassandraId.asUuid()) - .value(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(mailboxACL)) - .value(CassandraACLTable.VERSION, INITIAL_VALUE)); + executor.executeVoid( + insertStatement.bind() + .setUUID(CassandraACLTable.ID, cassandraId.asUuid()) + .setString(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(mailboxACL))) + .join(); } catch (JsonProcessingException e) { throw Throwables.propagate(e); } @@ -124,20 +160,20 @@ public class CassandraACLMapper { } private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) { - return executor.execute(select(CassandraACLTable.ACL, CassandraACLTable.VERSION) - .from(CassandraACLTable.TABLE_NAME) - .where(eq(CassandraMailboxTable.ID, cassandraId.asUuid()))); + return executor.execute( + readStatement.bind() + .setUUID(CassandraACLTable.ID, cassandraId.asUuid())); } private ResultSet updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) { try { - return session.execute( - update(CassandraACLTable.TABLE_NAME) - .with(set(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL))) - .and(set(CassandraACLTable.VERSION, aclWithVersion.version + 1)) - .where(eq(CassandraACLTable.ID, cassandraId.asUuid())) - .onlyIf(eq(CassandraACLTable.VERSION, aclWithVersion.version)) - ); + return executor.execute( + conditionalUpdateStatement.bind() + .setUUID(CassandraACLTable.ID, cassandraId.asUuid()) + .setString(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL)) + .setLong(CassandraACLTable.VERSION, aclWithVersion.version + 1) + .setLong(OLD_VERSION, aclWithVersion.version)) + .join(); } catch (JsonProcessingException exception) { throw Throwables.propagate(exception); } @@ -145,13 +181,11 @@ public class CassandraACLMapper { private ResultSet insertACL(CassandraId cassandraId, MailboxACL acl) { try { - return session.execute( - insertInto(CassandraACLTable.TABLE_NAME) - .value(CassandraACLTable.ID, cassandraId.asUuid()) - .value(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(acl)) - .value(CassandraACLTable.VERSION, 0) - .ifNotExists() - ); + return executor.execute( + conditionalInsertStatement.bind() + .setUUID(CassandraACLTable.ID, cassandraId.asUuid()) + .setString(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(acl))) + .join(); } catch (JsonProcessingException exception) { throw Throwables.propagate(exception); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org