MAILBOX-307 Implement setAcl with a Cassandra read before writes
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d5e1cf8f Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d5e1cf8f Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d5e1cf8f Branch: refs/heads/master Commit: d5e1cf8f39550d5cc7234a9d864414dd2d881c65 Parents: 998446b Author: benwa <btell...@linagora.com> Authored: Thu Sep 28 15:19:47 2017 +0700 Committer: Matthieu Baechler <matth...@apache.org> Committed: Fri Sep 29 09:20:41 2017 +0200 ---------------------------------------------------------------------- .../cassandra/mail/CassandraACLMapper.java | 75 +++++++++----------- 1 file changed, 34 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/d5e1cf8f/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java index a0dccf0..1e67d41 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java @@ -29,10 +29,10 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update; import java.io.IOException; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import org.apache.james.backends.cassandra.init.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; -import org.apache.james.backends.cassandra.utils.CassandraConstants; import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; import org.apache.james.backends.cassandra.utils.LightweightTransactionException; import org.apache.james.mailbox.cassandra.ids.CassandraId; @@ -49,7 +49,6 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; -import com.datastax.driver.core.querybuilder.Insert; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Throwables; @@ -66,7 +65,6 @@ public class CassandraACLMapper { private final CassandraAsyncExecutor executor; private final int maxRetry; private final CodeInjector codeInjector; - private final PreparedStatement insertStatement; private final PreparedStatement conditionalInsertStatement; private final PreparedStatement conditionalUpdateStatement; private final PreparedStatement readStatement; @@ -79,12 +77,20 @@ public class CassandraACLMapper { this.executor = new CassandraAsyncExecutor(session); this.maxRetry = cassandraConfiguration.getAclMaxRetry(); this.codeInjector = codeInjector; - this.insertStatement = session.prepare(insertCqlBase()); - this.conditionalInsertStatement = session.prepare(insertCqlBase().ifNotExists()); + this.conditionalInsertStatement = prepareConditionalInsert(session); this.conditionalUpdateStatement = prepareConditionalUpdate(session); this.readStatement = prepareReadStatement(session); } + private PreparedStatement prepareConditionalInsert(Session session) { + return session.prepare( + insertInto(CassandraACLTable.TABLE_NAME) + .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID)) + .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL)) + .value(CassandraACLTable.VERSION, INITIAL_VALUE) + .ifNotExists()); + } + private PreparedStatement prepareConditionalUpdate(Session session) { return session.prepare( update(CassandraACLTable.TABLE_NAME) @@ -101,13 +107,6 @@ public class CassandraACLMapper { .where(eq(CassandraMailboxTable.ID, bindMarker(CassandraACLTable.ID)))); } - private Insert insertCqlBase() { - return insertInto(CassandraACLTable.TABLE_NAME) - .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID)) - .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL)) - .value(CassandraACLTable.VERSION, INITIAL_VALUE); - } - public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) { return getStoredACLRow(cassandraId) .thenApply(resultSet -> getAcl(cassandraId, resultSet)); @@ -122,51 +121,42 @@ public class CassandraACLMapper { } public void updateACL(CassandraId cassandraId, MailboxACL.ACLCommand command) throws MailboxException { + MailboxACL replacement = MailboxACL.EMPTY.apply(command); + + updateAcl(cassandraId, aclWithVersion -> aclWithVersion.apply(command), replacement); + } + + public void setACL(CassandraId cassandraId, MailboxACL mailboxACL) throws MailboxException { + updateAcl(cassandraId, + acl -> new ACLWithVersion(acl.version, mailboxACL), + mailboxACL); + } + + private void updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) throws MailboxException { try { new FunctionRunnerWithRetry(maxRetry) .execute( () -> { codeInjector.inject(); - ResultSet resultSet = getAclWithVersion(cassandraId) - .map(aclWithVersion -> aclWithVersion.apply(command)) + return getAclWithVersion(cassandraId) + .map(aclTransformation) .map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion)) - .orElseGet(() -> insertACL(cassandraId, applyCommandOnEmptyACL(command))); - return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED); + .orElseGet(() -> insertACL(cassandraId, replacement)); }); } catch (LightweightTransactionException e) { throw new MailboxException("Exception during lightweight transaction", e); } } - public void setACL(CassandraId cassandraId, MailboxACL mailboxACL) { - try { - executor.executeVoid( - insertStatement.bind() - .setUUID(CassandraACLTable.ID, cassandraId.asUuid()) - .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(mailboxACL))) - .join(); - } catch (JsonProcessingException e) { - throw Throwables.propagate(e); - } - } - - private MailboxACL applyCommandOnEmptyACL(MailboxACL.ACLCommand command) { - try { - return MailboxACL.EMPTY.apply(command); - } catch (UnsupportedRightException exception) { - throw Throwables.propagate(exception); - } - } - private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) { return executor.execute( readStatement.bind() .setUUID(CassandraACLTable.ID, cassandraId.asUuid())); } - private ResultSet updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) { + private boolean updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) { try { - return executor.execute( + return executor.executeReturnApplied( conditionalUpdateStatement.bind() .setUUID(CassandraACLTable.ID, cassandraId.asUuid()) .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL)) @@ -178,9 +168,9 @@ public class CassandraACLMapper { } } - private ResultSet insertACL(CassandraId cassandraId, MailboxACL acl) { + private boolean insertACL(CassandraId cassandraId, MailboxACL acl) { try { - return executor.execute( + return executor.executeReturnApplied( conditionalInsertStatement.bind() .setUUID(CassandraACLTable.ID, cassandraId.asUuid()) .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(acl))) @@ -196,7 +186,10 @@ public class CassandraACLMapper { return Optional.empty(); } Row row = resultSet.one(); - return Optional.of(new ACLWithVersion(row.getLong(CassandraACLTable.VERSION), deserializeACL(cassandraId, row.getString(CassandraACLTable.ACL)))); + return Optional.of( + new ACLWithVersion( + row.getLong(CassandraACLTable.VERSION), + deserializeACL(cassandraId, row.getString(CassandraACLTable.ACL)))); } private MailboxACL deserializeACL(CassandraId cassandraId, String serializedACL) { --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org