Repository: james-project Updated Branches: refs/heads/master f8b587a7a -> 7b38d06c6
MAILBOX-339 New CassandraMailboxPathDAO implementation Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3861fa41 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3861fa41 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3861fa41 Branch: refs/heads/master Commit: 3861fa41482ff668d357b2fce00858bf20feef43 Parents: 2fae24a Author: Antoine Duprat <adup...@linagora.com> Authored: Mon May 14 15:32:45 2018 +0200 Committer: Matthieu Baechler <matth...@apache.org> Committed: Mon May 28 17:38:00 2018 +0200 ---------------------------------------------------------------------- .../mail/CassandraMailboxPathV2DAO.java | 191 +++++++++++++++++++ .../modules/CassandraMailboxModule.java | 15 +- .../table/CassandraMailboxPathV2Table.java | 36 ++++ .../mail/CassandraMailboxPathV2DAOTest.java | 30 +++ 4 files changed, 271 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/3861fa41/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java new file mode 100644 index 0000000..1f58898 --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java @@ -0,0 +1,191 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.mailbox.cassandra.GhostMailbox.TYPE; +import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table.FIELDS; +import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table.MAILBOX_ID; +import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table.MAILBOX_NAME; +import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table.NAMESPACE; +import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table.TABLE_NAME; +import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table.USER; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.mailbox.cassandra.GhostMailbox; +import org.apache.james.mailbox.cassandra.ids.CassandraId; +import org.apache.james.mailbox.model.MailboxPath; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; + +public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO { + + + private final CassandraAsyncExecutor cassandraAsyncExecutor; + private final CassandraUtils cassandraUtils; + private final PreparedStatement delete; + private final PreparedStatement insert; + private final PreparedStatement select; + private final PreparedStatement selectAll; + + @Inject + public CassandraMailboxPathV2DAO(Session session, CassandraUtils cassandraUtils) { + this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); + this.cassandraUtils = cassandraUtils; + this.insert = prepareInsert(session); + this.delete = prepareDelete(session); + this.select = prepareSelect(session); + this.selectAll = prepareSelectAll(session); + } + + private PreparedStatement prepareDelete(Session session) { + return session.prepare(QueryBuilder.delete() + .from(TABLE_NAME) + .where(eq(NAMESPACE, bindMarker(NAMESPACE))) + .and(eq(USER, bindMarker(USER))) + .and(eq(MAILBOX_NAME, bindMarker(MAILBOX_NAME)))); + } + + private PreparedStatement prepareInsert(Session session) { + return session.prepare(insertInto(TABLE_NAME) + .value(NAMESPACE, bindMarker(NAMESPACE)) + .value(USER, bindMarker(USER)) + .value(MAILBOX_NAME, bindMarker(MAILBOX_NAME)) + .value(MAILBOX_ID, bindMarker(MAILBOX_ID)) + .ifNotExists()); + } + + private PreparedStatement prepareSelect(Session session) { + return session.prepare(select(FIELDS) + .from(TABLE_NAME) + .where(eq(NAMESPACE, bindMarker(NAMESPACE))) + .and(eq(USER, bindMarker(USER))) + .and(eq(MAILBOX_NAME, bindMarker(MAILBOX_NAME)))); + } + + private PreparedStatement prepareSelectAll(Session session) { + return session.prepare(select(FIELDS) + .from(TABLE_NAME) + .where(eq(NAMESPACE, bindMarker(NAMESPACE))) + .and(eq(USER, bindMarker(USER)))); + } + + @Override + public CompletableFuture<Optional<CassandraIdAndPath>> retrieveId(MailboxPath mailboxPath) { + return cassandraAsyncExecutor.executeSingleRow( + select.bind() + .setString(NAMESPACE, mailboxPath.getNamespace()) + .setString(USER, sanitizeUser(mailboxPath.getUser())) + .setString(MAILBOX_NAME, mailboxPath.getName())) + .thenApply(rowOptional -> + rowOptional.map(this::fromRowToCassandraIdAndPath)) + .thenApply(value -> logGhostMailbox(mailboxPath, value)); + } + + @Override + public CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes(String namespace, String user) { + return cassandraAsyncExecutor.execute( + selectAll.bind() + .setString(NAMESPACE, namespace) + .setString(USER, sanitizeUser(user))) + .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet) + .map(this::fromRowToCassandraIdAndPath) + .peek(this::logReadSuccess)); + } + + /** + * See https://issues.apache.org/jira/browse/MAILBOX-322 to read about the Ghost mailbox bug. + * + * A missed read on an existing mailbox is the cause of the ghost mailbox bug. Here we log missing reads. Successful + * reads and write operations are also added in order to allow audit in order to know if the mailbox existed. + */ + @Override + public Optional<CassandraIdAndPath> logGhostMailbox(MailboxPath mailboxPath, Optional<CassandraIdAndPath> value) { + if (value.isPresent()) { + CassandraIdAndPath cassandraIdAndPath = value.get(); + logReadSuccess(cassandraIdAndPath); + } else { + GhostMailbox.logger() + .addField(GhostMailbox.MAILBOX_NAME, mailboxPath) + .addField(TYPE, "readMiss") + .log(logger -> logger.info("Read mailbox missed")); + } + return value; + } + + /** + * See https://issues.apache.org/jira/browse/MAILBOX-322 to read about the Ghost mailbox bug. + * + * Read success allows to know if a mailbox existed before (mailbox write history might be older than this log introduction + * or log history might have been dropped) + */ + private void logReadSuccess(CassandraIdAndPath cassandraIdAndPath) { + GhostMailbox.logger() + .addField(GhostMailbox.MAILBOX_NAME, cassandraIdAndPath.getMailboxPath()) + .addField(TYPE, "readSuccess") + .addField(GhostMailbox.MAILBOX_ID, cassandraIdAndPath.getCassandraId()) + .log(logger -> logger.info("Read mailbox succeeded")); + } + + private CassandraIdAndPath fromRowToCassandraIdAndPath(Row row) { + return new CassandraIdAndPath( + CassandraId.of(row.getUUID(MAILBOX_ID)), + new MailboxPath(row.getString(NAMESPACE), + row.getString(USER), + row.getString(MAILBOX_NAME))); + } + + @Override + public CompletableFuture<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId) { + return cassandraAsyncExecutor.executeReturnApplied(insert.bind() + .setString(NAMESPACE, mailboxPath.getNamespace()) + .setString(USER, sanitizeUser(mailboxPath.getUser())) + .setString(MAILBOX_NAME, mailboxPath.getName()) + .setUUID(MAILBOX_ID, mailboxId.asUuid())); + } + + @Override + public CompletableFuture<Void> delete(MailboxPath mailboxPath) { + return cassandraAsyncExecutor.executeVoid(delete.bind() + .setString(NAMESPACE, mailboxPath.getNamespace()) + .setString(USER, sanitizeUser(mailboxPath.getUser())) + .setString(MAILBOX_NAME, mailboxPath.getName())); + } + + private String sanitizeUser(String user) { + if (user == null) { + return ""; + } + return user; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3861fa41/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraMailboxModule.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraMailboxModule.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraMailboxModule.java index 20e05fb..85fd62d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraMailboxModule.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraMailboxModule.java @@ -30,6 +30,7 @@ import org.apache.james.backends.cassandra.components.CassandraTable; import org.apache.james.backends.cassandra.components.CassandraType; import org.apache.james.backends.cassandra.utils.CassandraConstants; import org.apache.james.mailbox.cassandra.table.CassandraMailboxPathTable; +import org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table; import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable; import com.datastax.driver.core.schemabuilder.SchemaBuilder; @@ -63,7 +64,19 @@ public class CassandraMailboxModule implements CassandraModule { .comment("Denormalisation table. Allow to retrieve mailboxes belonging to a certain user. This is a " + "LIST optimisation.") .caching(SchemaBuilder.KeyCaching.ALL, - SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))); + SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION))), + new CassandraTable(CassandraMailboxPathV2Table.TABLE_NAME, + SchemaBuilder.createTable(CassandraMailboxPathV2Table.TABLE_NAME) + .ifNotExists() + .addPartitionKey(CassandraMailboxPathV2Table.NAMESPACE, text()) + .addPartitionKey(CassandraMailboxPathV2Table.USER, text()) + .addClusteringColumn(CassandraMailboxPathV2Table.MAILBOX_NAME, text()) + .addColumn(CassandraMailboxPathV2Table.MAILBOX_ID, timeuuid()) + .withOptions() + .comment("Denormalisation table. Allow to retrieve mailboxes belonging to a certain user. This is a " + + "LIST optimisation.") + .caching(SchemaBuilder.KeyCaching.ALL, + SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))); types = ImmutableList.of( new CassandraType(CassandraMailboxTable.MAILBOX_BASE, SchemaBuilder.createType(CassandraMailboxTable.MAILBOX_BASE) http://git-wip-us.apache.org/repos/asf/james-project/blob/3861fa41/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathV2Table.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathV2Table.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathV2Table.java new file mode 100644 index 0000000..6788606 --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathV2Table.java @@ -0,0 +1,36 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.table; + +public interface CassandraMailboxPathV2Table { + + String TABLE_NAME = "mailboxPathV2"; + + String NAMESPACE = "namespace"; + + String USER = "user"; + + String MAILBOX_NAME = "mailboxName"; + + String MAILBOX_ID = "mailboxId"; + + String[] FIELDS = { NAMESPACE, USER, MAILBOX_NAME, MAILBOX_ID}; + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3861fa41/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAOTest.java new file mode 100644 index 0000000..fdae69a --- /dev/null +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAOTest.java @@ -0,0 +1,30 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail; + +import org.apache.james.backends.cassandra.utils.CassandraUtils; + +public class CassandraMailboxPathV2DAOTest extends CassandraMailboxPathDAOTest { + + @Override + CassandraMailboxPathDAO testee() { + return new CassandraMailboxPathV2DAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org