Repository: james-project Updated Branches: refs/heads/master 07bc44463 -> a760297cc
JAMES-2645 Refactor cassandra RRTs by adding a proper DAO class with basic tests Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c216bce6 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c216bce6 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c216bce6 Branch: refs/heads/master Commit: c216bce6a30037fc7115068599dc15c89a400a07 Parents: 07bc444 Author: Rene Cordier <[email protected]> Authored: Thu Jan 17 11:40:38 2019 +0700 Committer: Benoit Tellier <[email protected]> Committed: Fri Jan 18 18:01:20 2019 +0700 ---------------------------------------------------------------------- .../CassandraRecipientRewriteTable.java | 128 ++-------------- .../CassandraRecipientRewriteTableDAO.java | 150 +++++++++++++++++++ .../CassandraRecipientRewriteTableDAOTest.java | 100 +++++++++++++ .../CassandraRecipientRewriteTableTest.java | 3 +- .../james/rrt/cassandra/CassandraStepdefs.java | 3 +- 5 files changed, 264 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java index 332ec43..27bc7d4 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java @@ -18,24 +18,10 @@ ****************************************************************/ package org.apache.james.rrt.cassandra; -import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; -import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; -import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; -import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; -import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.DOMAIN; -import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.MAPPING; -import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.TABLE_NAME; -import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.USER; - -import java.util.List; import java.util.Map; -import java.util.Optional; import javax.inject.Inject; -import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; -import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.core.Domain; import org.apache.james.rrt.lib.AbstractRecipientRewriteTable; import org.apache.james.rrt.lib.Mapping; @@ -44,135 +30,41 @@ import org.apache.james.rrt.lib.Mappings; import org.apache.james.rrt.lib.MappingsImpl; import org.apache.james.util.OptionalUtils; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Session; -import com.github.steveash.guavate.Guavate; - public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTable { - - private final CassandraAsyncExecutor executor; - private final CassandraUtils cassandraUtils; - private final PreparedStatement insertStatement; - private final PreparedStatement deleteStatement; - private final PreparedStatement retrieveMappingStatement; - private final PreparedStatement retrieveAllMappingsStatement; + private final CassandraRecipientRewriteTableDAO dao; @Inject - public CassandraRecipientRewriteTable(Session session, CassandraUtils cassandraUtils) { - this.executor = new CassandraAsyncExecutor(session); - this.cassandraUtils = cassandraUtils; - this.insertStatement = prepareInsertStatement(session); - this.deleteStatement = prepareDelete(session); - this.retrieveMappingStatement = prepareRetrieveMappingStatement(session); - this.retrieveAllMappingsStatement = prepareRetrieveAllMappingStatement(session); - } - - private PreparedStatement prepareRetrieveAllMappingStatement(Session session) { - return session.prepare(select(USER, DOMAIN, MAPPING) - .from(TABLE_NAME)); - } - - private PreparedStatement prepareRetrieveMappingStatement(Session session) { - return session.prepare(select(MAPPING) - .from(TABLE_NAME) - .where(eq(USER, bindMarker(USER))) - .and(eq(DOMAIN, bindMarker(DOMAIN)))); - } - - private PreparedStatement prepareDelete(Session session) { - return session.prepare(delete() - .from(TABLE_NAME) - .where(eq(USER, bindMarker(USER))) - .and(eq(DOMAIN, bindMarker(DOMAIN))) - .and(eq(MAPPING, bindMarker(MAPPING)))); - } - - private PreparedStatement prepareInsertStatement(Session session) { - return session.prepare(insertInto(TABLE_NAME) - .value(USER, bindMarker(USER)) - .value(DOMAIN, bindMarker(DOMAIN)) - .value(MAPPING, bindMarker(MAPPING))); + public CassandraRecipientRewriteTable(CassandraRecipientRewriteTableDAO dao) { + this.dao = dao; } @Override public void addMapping(MappingSource source, Mapping mapping) { - executor.executeVoid(insertStatement.bind() - .setString(USER, source.getFixedUser()) - .setString(DOMAIN, source.getFixedDomain()) - .setString(MAPPING, mapping.asString())) - .join(); + dao.addMapping(source, mapping).block(); } @Override public void removeMapping(MappingSource source, Mapping mapping) { - executor.executeVoid(deleteStatement.bind() - .setString(USER, source.getFixedUser()) - .setString(DOMAIN, source.getFixedDomain()) - .setString(MAPPING, mapping.asString())) - .join(); + dao.removeMapping(source, mapping).block(); } @Override public Mappings getStoredMappings(MappingSource source) { - return retrieveMappings(source) + return dao.retrieveMappings(source) + .blockOptional() .orElse(MappingsImpl.empty()); } - private Optional<Mappings> retrieveMappings(MappingSource source) { - List<String> mappings = executor.execute(retrieveMappingStatement.bind() - .setString(USER, source.getFixedUser()) - .setString(DOMAIN, source.getFixedDomain())) - .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet) - .map(row -> row.getString(MAPPING)) - .collect(Guavate.toImmutableList())) - .join(); - - return MappingsImpl.fromCollection(mappings).toOptional(); - } - @Override public Map<MappingSource, Mappings> getAllMappings() { - return executor.execute(retrieveAllMappingsStatement.bind()) - .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet) - .map(row -> new UserMapping(MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), row.getString(MAPPING))) - .collect(Guavate.toImmutableMap( - UserMapping::getSource, - UserMapping::toMapping, - Mappings::union))) - .join(); - } - - private static class UserMapping { - - private final MappingSource source; - private final String mapping; - - public UserMapping(MappingSource source, String mapping) { - this.source = source; - this.mapping = mapping; - } - - - public MappingSource getSource() { - return source; - } - - public String getMapping() { - return mapping; - } - - public Mappings toMapping() { - return MappingsImpl.fromRawString(getMapping()); - } - + return dao.getAllMappings().block(); } @Override protected Mappings mapAddress(String user, Domain domain) { return OptionalUtils.orSuppliers( - () -> retrieveMappings(MappingSource.fromUser(user, domain)), - () -> retrieveMappings(MappingSource.fromDomain(domain))) + () -> dao.retrieveMappings(MappingSource.fromUser(user, domain)).blockOptional(), + () -> dao.retrieveMappings(MappingSource.fromDomain(domain)).blockOptional()) .orElse(MappingsImpl.empty()); } - } http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java new file mode 100644 index 0000000..52f3516 --- /dev/null +++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java @@ -0,0 +1,150 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.rrt.cassandra; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.DOMAIN; +import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.MAPPING; +import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.TABLE_NAME; +import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.USER; + +import java.util.Map; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.rrt.lib.Mapping; +import org.apache.james.rrt.lib.MappingSource; +import org.apache.james.rrt.lib.Mappings; +import org.apache.james.rrt.lib.MappingsImpl; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.github.steveash.guavate.Guavate; + +import reactor.core.publisher.Mono; + +class CassandraRecipientRewriteTableDAO { + private final CassandraAsyncExecutor executor; + private final CassandraUtils cassandraUtils; + private final PreparedStatement insertStatement; + private final PreparedStatement deleteStatement; + private final PreparedStatement retrieveMappingStatement; + private final PreparedStatement retrieveAllMappingsStatement; + + @Inject + CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) { + this.executor = new CassandraAsyncExecutor(session); + this.cassandraUtils = cassandraUtils; + this.insertStatement = prepareInsertStatement(session); + this.deleteStatement = prepareDelete(session); + this.retrieveMappingStatement = prepareRetrieveMappingStatement(session); + this.retrieveAllMappingsStatement = prepareRetrieveAllMappingStatement(session); + } + + private PreparedStatement prepareRetrieveAllMappingStatement(Session session) { + return session.prepare(select(USER, DOMAIN, MAPPING) + .from(TABLE_NAME)); + } + + private PreparedStatement prepareRetrieveMappingStatement(Session session) { + return session.prepare(select(MAPPING) + .from(TABLE_NAME) + .where(eq(USER, bindMarker(USER))) + .and(eq(DOMAIN, bindMarker(DOMAIN)))); + } + + private PreparedStatement prepareDelete(Session session) { + return session.prepare(delete() + .from(TABLE_NAME) + .where(eq(USER, bindMarker(USER))) + .and(eq(DOMAIN, bindMarker(DOMAIN))) + .and(eq(MAPPING, bindMarker(MAPPING)))); + } + + private PreparedStatement prepareInsertStatement(Session session) { + return session.prepare(insertInto(TABLE_NAME) + .value(USER, bindMarker(USER)) + .value(DOMAIN, bindMarker(DOMAIN)) + .value(MAPPING, bindMarker(MAPPING))); + } + + Mono<Void> addMapping(MappingSource source, Mapping mapping) { + return executor.executeVoidReactor(insertStatement.bind() + .setString(USER, source.getFixedUser()) + .setString(DOMAIN, source.getFixedDomain()) + .setString(MAPPING, mapping.asString())); + } + + Mono<Void> removeMapping(MappingSource source, Mapping mapping) { + return executor.executeVoidReactor(deleteStatement.bind() + .setString(USER, source.getFixedUser()) + .setString(DOMAIN, source.getFixedDomain()) + .setString(MAPPING, mapping.asString())); + } + + Mono<MappingsImpl> retrieveMappings(MappingSource source) { + return executor.executeReactor(retrieveMappingStatement.bind() + .setString(USER, source.getFixedUser()) + .setString(DOMAIN, source.getFixedDomain())) + .map(resultSet -> cassandraUtils.convertToStream(resultSet) + .map(row -> row.getString(MAPPING)) + .collect(Guavate.toImmutableList())) + .map(MappingsImpl::fromCollection) + .filter(mappings -> !mappings.isEmpty()); + } + + Mono<Map<MappingSource, Mappings>> getAllMappings() { + return executor.executeReactor(retrieveAllMappingsStatement.bind()) + .map(resultSet -> cassandraUtils.convertToStream(resultSet) + .map(row -> new UserMapping(MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), row.getString(MAPPING))) + .collect(Guavate.toImmutableMap( + UserMapping::getSource, + UserMapping::toMapping, + Mappings::union))); + } + + private static class UserMapping { + private final MappingSource source; + private final String mapping; + + UserMapping(MappingSource source, String mapping) { + this.source = source; + this.mapping = mapping; + } + + MappingSource getSource() { + return source; + } + + String getMapping() { + return mapping; + } + + Mappings toMapping() { + return MappingsImpl.fromRawString(getMapping()); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java new file mode 100644 index 0000000..0d2f096 --- /dev/null +++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java @@ -0,0 +1,100 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.rrt.cassandra; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.core.Domain; +import org.apache.james.rrt.lib.Mapping; +import org.apache.james.rrt.lib.MappingSource; +import org.apache.james.rrt.lib.MappingsImpl; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class CassandraRecipientRewriteTableDAOTest { + private static final String USER = "test"; + private static final String ADDRESS = "test@domain"; + private static final MappingSource SOURCE = MappingSource.fromUser(USER, Domain.LOCALHOST); + private static final Mapping MAPPING = Mapping.alias(ADDRESS); + + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRRTModule.MODULE); + + private CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION); + } + + @Test + void retrieveMappingsShouldReturnEmptyByDefault() { + assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional()) + .isEmpty(); + } + + @Test + void getAllMappingsShouldReturnEmptyByDefault() { + assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block()) + .isEmpty(); + } + + @Test + void retrieveMappingsShouldReturnStoredMapping() { + cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block(); + + assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional()) + .contains(MappingsImpl.fromMappings(MAPPING)); + } + + @Test + void getAllMappingsShouldReturnStoredMapping() { + cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block(); + + assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block()) + .contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING))); + } + + @Test + void retrieveMappingsShouldNotReturnRemovedMapping() { + cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block(); + + cassandraRecipientRewriteTableDAO.removeMapping(SOURCE, MAPPING).block(); + + assertThat(cassandraRecipientRewriteTableDAO.retrieveMappings(SOURCE).blockOptional()) + .isEmpty(); + } + + @Test + void getAllMappingsShouldNotReturnRemovedMapping() { + cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block(); + + cassandraRecipientRewriteTableDAO.removeMapping(SOURCE, MAPPING).block(); + + assertThat(cassandraRecipientRewriteTableDAO.getAllMappings().block()) + .isEmpty(); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java index c7d6d6f..d0bc05c 100644 --- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java +++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java @@ -61,7 +61,8 @@ public class CassandraRecipientRewriteTableTest extends AbstractRecipientRewrite @Override protected AbstractRecipientRewriteTable getRecipientRewriteTable() throws Exception { - CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION); + CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable( + new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION)); rrt.configure(new DefaultConfigurationBuilder()); return rrt; } http://git-wip-us.apache.org/repos/asf/james-project/blob/c216bce6/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java index 3c2916c..edec0ca 100644 --- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java +++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java @@ -49,7 +49,8 @@ public class CassandraStepdefs { } private AbstractRecipientRewriteTable getRecipientRewriteTable() throws Exception { - CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION); + CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable( + new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION)); rrt.configure(new DefaultConfigurationBuilder()); return rrt; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
