JAMES-1717 Cassandra implementation for notification registry
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/64d58083 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/64d58083 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/64d58083 Branch: refs/heads/master Commit: 64d580837cc30815646b1dc487024e8902233d3e Parents: caebbc0 Author: Benoit Tellier <btell...@linagora.com> Authored: Tue Apr 19 14:37:50 2016 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Fri May 27 18:02:46 2016 +0700 ---------------------------------------------------------------------- server/data/data-jmap-cassandra/pom.xml | 5 + .../vacation/CassandraNotificationRegistry.java | 75 +++++++++++++++ .../CassandraNotificationRegistryDAO.java | 97 ++++++++++++++++++++ .../CassandraNotificationRegistryModule.java | 66 +++++++++++++ .../tables/CassandraNotificationTable.java | 29 ++++++ .../CassandraNotificationRegistryTest.java | 42 +++++++++ 6 files changed, 314 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/64d58083/server/data/data-jmap-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/server/data/data-jmap-cassandra/pom.xml b/server/data/data-jmap-cassandra/pom.xml index f07ab0d..dec6ebc 100644 --- a/server/data/data-jmap-cassandra/pom.xml +++ b/server/data/data-jmap-cassandra/pom.xml @@ -200,6 +200,11 @@ <artifactId>guava</artifactId> </dependency> <dependency> + <groupId>com.jayway.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/64d58083/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistry.java ---------------------------------------------------------------------- diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistry.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistry.java new file mode 100644 index 0000000..e253e33 --- /dev/null +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistry.java @@ -0,0 +1,75 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.cassandra.vacation; + +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import javax.inject.Inject; + +import org.apache.james.jmap.api.vacation.AccountId; +import org.apache.james.jmap.api.vacation.NotificationRegistry; +import org.apache.james.jmap.api.vacation.RecipientId; +import org.apache.james.util.date.ZonedDateTimeProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.primitives.Ints; + +public class CassandraNotificationRegistry implements NotificationRegistry { + + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraNotificationRegistry.class); + + private final ZonedDateTimeProvider zonedDateTimeProvider; + private final CassandraNotificationRegistryDAO cassandraNotificationRegistryDAO; + + @Inject + public CassandraNotificationRegistry(ZonedDateTimeProvider zonedDateTimeProvider, CassandraNotificationRegistryDAO cassandraNotificationRegistryDAO) { + this.zonedDateTimeProvider = zonedDateTimeProvider; + this.cassandraNotificationRegistryDAO = cassandraNotificationRegistryDAO; + } + + @Override + public CompletableFuture<Void> register(AccountId accountId, RecipientId recipientId, Optional<ZonedDateTime> expiryDate) { + Optional<Integer> waitDelay = expiryDate.map(expiry -> Ints.checkedCast(zonedDateTimeProvider.get().until(expiry, ChronoUnit.SECONDS))); + if (isValid(waitDelay)) { + return cassandraNotificationRegistryDAO.register(accountId, recipientId, waitDelay); + } else { + LOGGER.warn("Invalid wait delay for {} {} : {}", accountId, recipientId, waitDelay); + return CompletableFuture.completedFuture(null); + } + } + + @Override + public CompletableFuture<Boolean> isRegistered(AccountId accountId, RecipientId recipientId) { + return cassandraNotificationRegistryDAO.isRegistered(accountId, recipientId); + } + + @Override + public CompletableFuture<Void> flush(AccountId accountId) { + return cassandraNotificationRegistryDAO.flush(accountId); + } + + private boolean isValid(Optional<Integer> waitDelay) { + return !waitDelay.isPresent() || waitDelay.get() >= 0; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/64d58083/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryDAO.java ---------------------------------------------------------------------- diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryDAO.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryDAO.java new file mode 100644 index 0000000..555713e --- /dev/null +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryDAO.java @@ -0,0 +1,97 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.cassandra.vacation; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.jmap.api.vacation.AccountId; +import org.apache.james.jmap.api.vacation.RecipientId; +import org.apache.james.jmap.cassandra.vacation.tables.CassandraNotificationTable; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.Insert; + +public class CassandraNotificationRegistryDAO { + + public static final String TTL = "TTL"; + private final CassandraAsyncExecutor cassandraAsyncExecutor; + private final PreparedStatement registerStatement; + private final PreparedStatement registerWithTTLStatement; + private final PreparedStatement isRegisteredStatement; + private final PreparedStatement flushStatement; + + @Inject + public CassandraNotificationRegistryDAO(Session session) { + this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); + + this.registerStatement = session.prepare(createInsert()); + + this.registerWithTTLStatement = session.prepare(createInsert().using(ttl(bindMarker(TTL)))); + + this.isRegisteredStatement = session.prepare(select() + .from(CassandraNotificationTable.TABLE_NAME) + .where(eq(CassandraNotificationTable.ACCOUNT_ID, bindMarker(CassandraNotificationTable.ACCOUNT_ID))) + .and(eq(CassandraNotificationTable.RECIPIENT_ID, bindMarker(CassandraNotificationTable.RECIPIENT_ID)))); + + this.flushStatement = session.prepare(delete() + .from(CassandraNotificationTable.TABLE_NAME) + .where(eq(CassandraNotificationTable.ACCOUNT_ID, bindMarker(CassandraNotificationTable.ACCOUNT_ID)))); + } + + private Insert createInsert() { + return insertInto(CassandraNotificationTable.TABLE_NAME) + .value(CassandraNotificationTable.ACCOUNT_ID, bindMarker(CassandraNotificationTable.ACCOUNT_ID)) + .value(CassandraNotificationTable.RECIPIENT_ID, bindMarker(CassandraNotificationTable.RECIPIENT_ID)); + } + + public CompletableFuture<Void> register(AccountId accountId, RecipientId recipientId, Optional<Integer> ttl) { + return cassandraAsyncExecutor.executeVoid( + ttl.map(value -> registerWithTTLStatement.bind().setInt(TTL, value)) + .orElse(registerStatement.bind()) + .setString(CassandraNotificationTable.ACCOUNT_ID, accountId.getIdentifier()) + .setString(CassandraNotificationTable.RECIPIENT_ID, recipientId.getAsString())); + } + + public CompletableFuture<Boolean> isRegistered(AccountId accountId, RecipientId recipientId) { + return cassandraAsyncExecutor.executeSingleRow( + isRegisteredStatement.bind() + .setString(CassandraNotificationTable.ACCOUNT_ID, accountId.getIdentifier()) + .setString(CassandraNotificationTable.RECIPIENT_ID, recipientId.getAsString())) + .thenApply(Optional::isPresent); + } + + public CompletableFuture<Void> flush(AccountId accountId) { + return cassandraAsyncExecutor.executeVoid( + flushStatement.bind() + .setString(CassandraNotificationTable.ACCOUNT_ID, accountId.getIdentifier())); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/64d58083/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryModule.java ---------------------------------------------------------------------- diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryModule.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryModule.java new file mode 100644 index 0000000..aeb03d6 --- /dev/null +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryModule.java @@ -0,0 +1,66 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.cassandra.vacation; + +import static com.datastax.driver.core.DataType.text; + +import java.util.List; + +import org.apache.james.backends.cassandra.components.CassandraIndex; +import org.apache.james.backends.cassandra.components.CassandraModule; +import org.apache.james.backends.cassandra.components.CassandraTable; +import org.apache.james.backends.cassandra.components.CassandraType; +import org.apache.james.jmap.cassandra.vacation.tables.CassandraNotificationTable; + +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.google.common.collect.ImmutableList; + +public class CassandraNotificationRegistryModule implements CassandraModule { + + private final List<CassandraTable> tables; + private final List<CassandraIndex> index; + private final List<CassandraType> types; + + public CassandraNotificationRegistryModule() { + tables = ImmutableList.of( + new CassandraTable(CassandraNotificationTable.TABLE_NAME, + SchemaBuilder.createTable(CassandraNotificationTable.TABLE_NAME) + .ifNotExists() + .addPartitionKey(CassandraNotificationTable.ACCOUNT_ID, text()) + .addClusteringColumn(CassandraNotificationTable.RECIPIENT_ID, text()))); + index = ImmutableList.of(); + types = ImmutableList.of(); + } + + @Override + public List<CassandraTable> moduleTables() { + return tables; + } + + @Override + public List<CassandraIndex> moduleIndex() { + return index; + } + + @Override + public List<CassandraType> moduleTypes() { + return types; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/64d58083/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/tables/CassandraNotificationTable.java ---------------------------------------------------------------------- diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/tables/CassandraNotificationTable.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/tables/CassandraNotificationTable.java new file mode 100644 index 0000000..f110a6a --- /dev/null +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/tables/CassandraNotificationTable.java @@ -0,0 +1,29 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.cassandra.vacation.tables; + +public interface CassandraNotificationTable { + + String TABLE_NAME = "vacation_notification_registry"; + + String ACCOUNT_ID = "accountId"; + String RECIPIENT_ID = "recipientId"; + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/64d58083/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryTest.java ---------------------------------------------------------------------- diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryTest.java new file mode 100644 index 0000000..f09e276 --- /dev/null +++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/vacation/CassandraNotificationRegistryTest.java @@ -0,0 +1,42 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.cassandra.vacation; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.jmap.api.vacation.AbstractNotificationRegistryTest; +import org.apache.james.jmap.api.vacation.NotificationRegistry; +import org.apache.james.util.date.ZonedDateTimeProvider; +import org.junit.After; + +public class CassandraNotificationRegistryTest extends AbstractNotificationRegistryTest { + + private CassandraCluster cassandra; + + @Override + protected NotificationRegistry createNotificationRegistry(ZonedDateTimeProvider zonedDateTimeProvider) { + cassandra = CassandraCluster.create(new CassandraNotificationRegistryModule()); + return new CassandraNotificationRegistry(zonedDateTimeProvider, new CassandraNotificationRegistryDAO(cassandra.getConf())); + } + + @After + public void tearDown() { + cassandra.clearAllTables(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org