This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 49df4ff7a51c4754bec8679b0dffb6f2a2ad7382 Author: datph <[email protected]> AuthorDate: Fri Feb 22 15:40:34 2019 +0700 MAILBOX-380 Create CassandraEventDeadLettersGroupTable With DAO to increase the performance when retrieve all group of failed event --- .../mailbox/events/EventDeadLettersContract.java | 2 +- .../mailbox/events/CassandraEventDeadLetters.java | 10 +++- .../events/CassandraEventDeadLettersGroupDAO.java | 70 ++++++++++++++++++++++ .../events/CassandraEventDeadLettersModule.java | 5 ++ .../CassandraEventDeadLettersGroupTable.java} | 22 ++----- .../events/CassandraEventDeadLettersDAOTest.java | 22 +------ ... => CassandraEventDeadLettersGroupDAOTest.java} | 45 ++++++++------ .../events/CassandraEventDeadLettersTest.java | 3 +- 8 files changed, 116 insertions(+), 63 deletions(-) diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java index d1d6a37..6bb00e6 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java @@ -396,7 +396,7 @@ interface EventDeadLettersContract { eventDeadLetters.store(GROUP_A, EVENT_1).block(); eventDeadLetters.store(GROUP_B, EVENT_1).block(); - assertThat(eventDeadLetters.groupsWithFailedEvents().toStream()) + assertThat(eventDeadLetters.groupsWithFailedEvents().collectList().block()) .containsOnly(GROUP_A, GROUP_B); } diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java index 6530d1d..b155b3b 100644 --- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java @@ -29,10 +29,13 @@ import reactor.core.publisher.Mono; public class CassandraEventDeadLetters implements EventDeadLetters { private final CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO; + private final CassandraEventDeadLettersGroupDAO cassandraEventDeadLettersGroupDAO; @Inject - public CassandraEventDeadLetters(CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO) { + public CassandraEventDeadLetters(CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO, + CassandraEventDeadLettersGroupDAO cassandraEventDeadLettersGroupDAO) { this.cassandraEventDeadLettersDAO = cassandraEventDeadLettersDAO; + this.cassandraEventDeadLettersGroupDAO = cassandraEventDeadLettersGroupDAO; } @Override @@ -40,7 +43,8 @@ public class CassandraEventDeadLetters implements EventDeadLetters { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL); - return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent); + return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent) + .then(cassandraEventDeadLettersGroupDAO.storeGroup(registeredGroup)); } @Override @@ -68,6 +72,6 @@ public class CassandraEventDeadLetters implements EventDeadLetters { @Override public Flux<Group> groupsWithFailedEvents() { - return cassandraEventDeadLettersDAO.retrieveAllGroups(); + return cassandraEventDeadLettersGroupDAO.retrieveAllGroups(); } } diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersGroupDAO.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersGroupDAO.java new file mode 100644 index 0000000..bb37a3d --- /dev/null +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersGroupDAO.java @@ -0,0 +1,70 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersGroupTable.GROUP; +import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersGroupTable.TABLE_NAME; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.github.fge.lambdas.Throwing; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class CassandraEventDeadLettersGroupDAO { + private final CassandraAsyncExecutor executor; + private final PreparedStatement insertStatement; + private final PreparedStatement selectAllStatement; + + @Inject + CassandraEventDeadLettersGroupDAO(Session session) { + this.executor = new CassandraAsyncExecutor(session); + this.insertStatement = prepareInsertStatement(session); + this.selectAllStatement = prepareSelectStatement(session); + } + + private PreparedStatement prepareInsertStatement(Session session) { + return session.prepare(insertInto(TABLE_NAME) + .value(GROUP, bindMarker(GROUP))); + } + + private PreparedStatement prepareSelectStatement(Session session) { + return session.prepare(select(GROUP) + .from(TABLE_NAME)); + } + + Mono<Void> storeGroup(Group group) { + return executor.executeVoid(insertStatement.bind() + .setString(GROUP, group.asString())); + } + + Flux<Group> retrieveAllGroups() { + return executor.executeRows(selectAllStatement.bind()) + .map(Throwing.function(row -> Group.deserialize(row.getString(GROUP)))); + } +} diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java index f8b1e30..72e354c 100644 --- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java @@ -21,6 +21,7 @@ package org.apache.james.mailbox.events; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.utils.CassandraConstants; +import org.apache.james.mailbox.events.tables.CassandraEventDeadLettersGroupTable; import org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable; import com.datastax.driver.core.DataType; @@ -37,5 +38,9 @@ public interface CassandraEventDeadLettersModule { .addPartitionKey(CassandraEventDeadLettersTable.GROUP, DataType.text()) .addClusteringColumn(CassandraEventDeadLettersTable.EVENT_ID, DataType.uuid()) .addColumn(CassandraEventDeadLettersTable.EVENT, DataType.text())) + .table(CassandraEventDeadLettersGroupTable.TABLE_NAME) + .comment("Projection table for retrieving groups for all failed events") + .statement(statement -> statement + .addPartitionKey(CassandraEventDeadLettersGroupTable.GROUP, DataType.text())) .build(); } diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersGroupTable.java similarity index 52% copy from mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java copy to mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersGroupTable.java index f8b1e30..b1d6219 100644 --- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersGroupTable.java @@ -17,25 +17,11 @@ * under the License. * ****************************************************************/ -package org.apache.james.mailbox.events; +package org.apache.james.mailbox.events.tables; -import org.apache.james.backends.cassandra.components.CassandraModule; -import org.apache.james.backends.cassandra.utils.CassandraConstants; -import org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable; +public interface CassandraEventDeadLettersGroupTable { -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.schemabuilder.SchemaBuilder; + String TABLE_NAME = "group_table"; -public interface CassandraEventDeadLettersModule { - CassandraModule MODULE = CassandraModule.builder() - .table(CassandraEventDeadLettersTable.TABLE_NAME) - .comment("Holds event dead letter") - .options(options -> options - .caching(SchemaBuilder.KeyCaching.ALL, - SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION))) - .statement(statement -> statement - .addPartitionKey(CassandraEventDeadLettersTable.GROUP, DataType.text()) - .addClusteringColumn(CassandraEventDeadLettersTable.EVENT_ID, DataType.uuid()) - .addColumn(CassandraEventDeadLettersTable.EVENT, DataType.text())) - .build(); + String GROUP = "group"; } diff --git a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java index 7bc66a7..e5da4df 100644 --- a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java +++ b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java @@ -58,7 +58,7 @@ class CassandraEventDeadLettersDAOTest { cassandraEventDeadLettersDAO.removeEvent(GROUP_A, EVENT_ID_1).block(); assertThat(cassandraEventDeadLettersDAO - .retrieveAllGroups() + .retrieveEventIdsWithGroup(GROUP_A) .collectList().block()) .isEmpty(); } @@ -101,24 +101,4 @@ class CassandraEventDeadLettersDAOTest { .collectList().block()) .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3); } - - @Test - void retrieveAllGroupsShouldReturnEmptyWhenDefault() { - assertThat(cassandraEventDeadLettersDAO - .retrieveAllGroups() - .collectList().block()) - .isEmpty(); - } - - @Test - void retrieveAllGroupsShouldReturnStoredGroups() { - cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block(); - cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block(); - cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3).block(); - - assertThat(cassandraEventDeadLettersDAO - .retrieveAllGroups() - .collectList().block()) - .containsOnly(GROUP_A, GROUP_B); - } } diff --git a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersGroupDAOTest.java similarity index 56% copy from mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java copy to mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersGroupDAOTest.java index 2c108d4..c5bc092 100644 --- a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java +++ b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersGroupDAOTest.java @@ -19,35 +19,42 @@ package org.apache.james.mailbox.events; +import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_A; +import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_B; +import static org.assertj.core.api.Assertions.assertThat; + import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; -import org.apache.james.event.json.EventSerializer; -import org.apache.james.mailbox.events.EventDeadLettersContract.FailedEventContract; -import org.apache.james.mailbox.events.EventDeadLettersContract.FailedEventsContract; -import org.apache.james.mailbox.events.EventDeadLettersContract.GroupsWithFailedEventsContract; -import org.apache.james.mailbox.events.EventDeadLettersContract.RemoveContract; -import org.apache.james.mailbox.events.EventDeadLettersContract.StoreContract; -import org.apache.james.mailbox.model.TestId; -import org.apache.james.mailbox.model.TestMessageId; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -class CassandraEventDeadLettersTest implements StoreContract, RemoveContract, FailedEventContract, FailedEventsContract, - GroupsWithFailedEventsContract { +public class CassandraEventDeadLettersGroupDAOTest { @RegisterExtension static CassandraClusterExtension cassandraClusterExtension = new CassandraClusterExtension(CassandraEventDeadLettersModule.MODULE); - private CassandraEventDeadLetters eventDeadLetters; + private static CassandraEventDeadLettersGroupDAO GROUP_DAO; + + @BeforeAll + static void setUp(CassandraCluster cassandraCluster) { + GROUP_DAO = new CassandraEventDeadLettersGroupDAO(cassandraCluster.getConf()); + } - @BeforeEach - void setUp(CassandraCluster cassandraCluster) { - EventSerializer eventSerializer = new EventSerializer(new TestId.Factory(), new TestMessageId.Factory()); - eventDeadLetters = new CassandraEventDeadLetters(new CassandraEventDeadLettersDAO(cassandraCluster.getConf(), eventSerializer)); + @Test + void retrieveAllGroupsShouldReturnEmptyWhenDefault() { + assertThat(GROUP_DAO.retrieveAllGroups() + .collectList().block()) + .isEmpty(); } - @Override - public EventDeadLetters eventDeadLetters() { - return eventDeadLetters; + @Test + void retrieveAllGroupsShouldReturnStoredGroups() { + GROUP_DAO.storeGroup(GROUP_A).block(); + GROUP_DAO.storeGroup(GROUP_B).block(); + + assertThat(GROUP_DAO.retrieveAllGroups() + .collectList().block()) + .containsOnly(GROUP_A, GROUP_B); } } diff --git a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java index 2c108d4..20810d3 100644 --- a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java +++ b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java @@ -43,7 +43,8 @@ class CassandraEventDeadLettersTest implements StoreContract, RemoveContract, Fa @BeforeEach void setUp(CassandraCluster cassandraCluster) { EventSerializer eventSerializer = new EventSerializer(new TestId.Factory(), new TestMessageId.Factory()); - eventDeadLetters = new CassandraEventDeadLetters(new CassandraEventDeadLettersDAO(cassandraCluster.getConf(), eventSerializer)); + eventDeadLetters = new CassandraEventDeadLetters(new CassandraEventDeadLettersDAO(cassandraCluster.getConf(), eventSerializer), + new CassandraEventDeadLettersGroupDAO(cassandraCluster.getConf())); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
