This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e51448c6c6455ce76bdb865357386190ce2407f9 Author: datph <[email protected]> AuthorDate: Wed Mar 13 17:22:15 2019 +0700 MAILBOX-373 Refactor CassandraEventDeadLetter --- .../mailbox/events/CassandraEventDeadLetters.java | 21 ++++++----- .../events/CassandraEventDeadLettersDAO.java | 44 ++++++++-------------- .../events/CassandraEventDeadLettersModule.java | 2 +- .../tables/CassandraEventDeadLettersTable.java | 2 +- .../events/CassandraEventDeadLettersDAOTest.java | 36 +++++++++--------- 5 files changed, 46 insertions(+), 59 deletions(-) diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java index b155b3b..39eae1e 100644 --- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java @@ -39,35 +39,36 @@ public class CassandraEventDeadLetters implements EventDeadLetters { } @Override - public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent) { + public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent, InsertionId insertionId) { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL); + Preconditions.checkArgument(insertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL); - return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent) + return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent, insertionId) .then(cassandraEventDeadLettersGroupDAO.storeGroup(registeredGroup)); } @Override - public Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId) { + public Mono<Void> remove(Group registeredGroup, InsertionId failDeliveredInsertionId) { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); - Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL); + Preconditions.checkArgument(failDeliveredInsertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL); - return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredEventId); + return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredInsertionId); } @Override - public Mono<Event> failedEvent(Group registeredGroup, Event.EventId failDeliveredEventId) { + public Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId) { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); - Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL); + Preconditions.checkArgument(failDeliveredInsertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL); - return cassandraEventDeadLettersDAO.retrieveFailedEvent(registeredGroup, failDeliveredEventId); + return cassandraEventDeadLettersDAO.retrieveFailedEvent(registeredGroup, failDeliveredInsertionId); } @Override - public Flux<Event.EventId> failedEventIds(Group registeredGroup) { + public Flux<InsertionId> failedIds(Group registeredGroup) { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); - return cassandraEventDeadLettersDAO.retrieveEventIdsWithGroup(registeredGroup); + return cassandraEventDeadLettersDAO.retrieveInsertionIdsWithGroup(registeredGroup); } @Override diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java index 2056f55..0afe2ba 100644 --- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java @@ -25,8 +25,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT; -import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT_ID; import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.GROUP; +import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.INSERTION_ID; import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.TABLE_NAME; import javax.inject.Inject; @@ -36,7 +36,6 @@ import org.apache.james.event.json.EventSerializer; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; -import com.github.fge.lambdas.Throwing; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -46,7 +45,6 @@ public class CassandraEventDeadLettersDAO { private final EventSerializer eventSerializer; private final PreparedStatement insertStatement; private final PreparedStatement deleteStatement; - private final PreparedStatement selectAllGroupStatement; private final PreparedStatement selectEventStatement; private final PreparedStatement selectEventIdsWithGroupStatement; @@ -56,15 +54,14 @@ public class CassandraEventDeadLettersDAO { this.eventSerializer = eventSerializer; this.insertStatement = prepareInsertStatement(session); this.deleteStatement = prepareDeleteStatement(session); - this.selectAllGroupStatement = prepareSelectAllGroupStatement(session); this.selectEventStatement = prepareSelectEventStatement(session); - this.selectEventIdsWithGroupStatement = prepareSelectEventIdsWithGroupStatement(session); + this.selectEventIdsWithGroupStatement = prepareSelectInsertionIdsWithGroupStatement(session); } private PreparedStatement prepareInsertStatement(Session session) { return session.prepare(insertInto(TABLE_NAME) .value(GROUP, bindMarker(GROUP)) - .value(EVENT_ID, bindMarker(EVENT_ID)) + .value(INSERTION_ID, bindMarker(INSERTION_ID)) .value(EVENT, bindMarker(EVENT))); } @@ -72,57 +69,46 @@ public class CassandraEventDeadLettersDAO { return session.prepare(delete() .from(TABLE_NAME) .where(eq(GROUP, bindMarker(GROUP))) - .and(eq(EVENT_ID, bindMarker(EVENT_ID)))); - } - - private PreparedStatement prepareSelectAllGroupStatement(Session session) { - return session.prepare(select(GROUP) - .from(TABLE_NAME)); + .and(eq(INSERTION_ID, bindMarker(INSERTION_ID)))); } private PreparedStatement prepareSelectEventStatement(Session session) { return session.prepare(select(EVENT) .from(TABLE_NAME) .where(eq(GROUP, bindMarker(GROUP))) - .and(eq(EVENT_ID, bindMarker(EVENT_ID)))); + .and(eq(INSERTION_ID, bindMarker(INSERTION_ID)))); } - private PreparedStatement prepareSelectEventIdsWithGroupStatement(Session session) { - return session.prepare(select(EVENT_ID) + private PreparedStatement prepareSelectInsertionIdsWithGroupStatement(Session session) { + return session.prepare(select(INSERTION_ID) .from(TABLE_NAME) .where(eq(GROUP, bindMarker(GROUP)))); } - Mono<Void> store(Group group, Event failedEvent) { + Mono<Void> store(Group group, Event failedEvent, EventDeadLetters.InsertionId insertionId) { return executor.executeVoid(insertStatement.bind() .setString(GROUP, group.asString()) - .setUUID(EVENT_ID, failedEvent.getEventId().getId()) + .setUUID(INSERTION_ID, insertionId.getId()) .setString(EVENT, eventSerializer.toJson(failedEvent))); } - Mono<Void> removeEvent(Group group, Event.EventId failedEventId) { + Mono<Void> removeEvent(Group group, EventDeadLetters.InsertionId failedInsertionId) { return executor.executeVoid(deleteStatement.bind() .setString(GROUP, group.asString()) - .setUUID(EVENT_ID, failedEventId.getId())); + .setUUID(INSERTION_ID, failedInsertionId.getId())); } - Mono<Event> retrieveFailedEvent(Group group, Event.EventId failedEventId) { + Mono<Event> retrieveFailedEvent(Group group, EventDeadLetters.InsertionId insertionId) { return executor.executeSingleRow(selectEventStatement.bind() .setString(GROUP, group.asString()) - .setUUID(EVENT_ID, failedEventId.getId())) + .setUUID(INSERTION_ID, insertionId.getId())) .map(row -> deserializeEvent(row.getString(EVENT))); } - Flux<Event.EventId> retrieveEventIdsWithGroup(Group group) { + Flux<EventDeadLetters.InsertionId> retrieveInsertionIdsWithGroup(Group group) { return executor.executeRows(selectEventIdsWithGroupStatement.bind() .setString(GROUP, group.asString())) - .map(row -> Event.EventId.of(row.getUUID(EVENT_ID))); - } - - Flux<Group> retrieveAllGroups() { - return executor.executeRows(selectAllGroupStatement.bind()) - .map(Throwing.function(row -> Group.deserialize(row.getString(GROUP)))) - .distinct(); + .map(row -> EventDeadLetters.InsertionId.of(row.getUUID(INSERTION_ID))); } private Event deserializeEvent(String serializedEvent) { diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java index 72e354c..6c4095c 100644 --- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java @@ -36,7 +36,7 @@ public interface CassandraEventDeadLettersModule { SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION))) .statement(statement -> statement .addPartitionKey(CassandraEventDeadLettersTable.GROUP, DataType.text()) - .addClusteringColumn(CassandraEventDeadLettersTable.EVENT_ID, DataType.uuid()) + .addClusteringColumn(CassandraEventDeadLettersTable.INSERTION_ID, DataType.uuid()) .addColumn(CassandraEventDeadLettersTable.EVENT, DataType.text())) .table(CassandraEventDeadLettersGroupTable.TABLE_NAME) .comment("Projection table for retrieving groups for all failed events") diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java index 2418ffe..cbf272c 100644 --- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java @@ -24,6 +24,6 @@ public interface CassandraEventDeadLettersTable { String TABLE_NAME = "event_dead_letters"; String GROUP = "group"; - String EVENT_ID = "eventId"; + String INSERTION_ID = "insertionId"; String EVENT = "event"; } diff --git a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java index e5da4df..2fba1d8 100644 --- a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java +++ b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java @@ -22,11 +22,11 @@ package org.apache.james.mailbox.events; import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_1; import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_2; import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_3; -import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_1; -import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_2; -import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_3; import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_A; import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_B; +import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_1; +import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_2; +import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_3; import static org.assertj.core.api.Assertions.assertThat; import org.apache.james.backends.cassandra.CassandraCluster; @@ -53,12 +53,12 @@ class CassandraEventDeadLettersDAOTest { @Test void removeEventShouldSucceededWhenRemoveStoredEvent() { - cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block(); + cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); - cassandraEventDeadLettersDAO.removeEvent(GROUP_A, EVENT_ID_1).block(); + cassandraEventDeadLettersDAO.removeEvent(GROUP_A, INSERTION_ID_1).block(); assertThat(cassandraEventDeadLettersDAO - .retrieveEventIdsWithGroup(GROUP_A) + .retrieveInsertionIdsWithGroup(GROUP_A) .collectList().block()) .isEmpty(); } @@ -66,39 +66,39 @@ class CassandraEventDeadLettersDAOTest { @Test void retrieveFailedEventShouldReturnEmptyWhenDefault() { assertThat(cassandraEventDeadLettersDAO - .retrieveFailedEvent(GROUP_A, EVENT_ID_1) + .retrieveFailedEvent(GROUP_A, INSERTION_ID_1) .blockOptional().isPresent()) .isFalse(); } @Test void retrieveFailedEventShouldReturnStoredEvent() { - cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block(); - cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block(); + cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2, INSERTION_ID_2).block(); assertThat(cassandraEventDeadLettersDAO - .retrieveFailedEvent(GROUP_B, EVENT_ID_2) + .retrieveFailedEvent(GROUP_B, INSERTION_ID_2) .blockOptional().get()) .isEqualTo(EVENT_2); } @Test - void retrieveEventIdsWithGroupShouldReturnEmptyWhenDefault() { + void retrieveInsertionIdsWithGroupShouldReturnEmptyWhenDefault() { assertThat(cassandraEventDeadLettersDAO - .retrieveEventIdsWithGroup(GROUP_A) + .retrieveInsertionIdsWithGroup(GROUP_A) .collectList().block()) .isEmpty(); } @Test - void retrieveEventIdsWithGroupShouldReturnStoredEventId() { - cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1).block(); - cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block(); - cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3).block(); + void retrieveInsertionIdsWithGroupShouldReturnStoredInsertionId() { + cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_1).block(); + cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2, INSERTION_ID_2).block(); + cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3, INSERTION_ID_3).block(); assertThat(cassandraEventDeadLettersDAO - .retrieveEventIdsWithGroup(GROUP_B) + .retrieveInsertionIdsWithGroup(GROUP_B) .collectList().block()) - .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3); + .containsOnly(INSERTION_ID_1, INSERTION_ID_2, INSERTION_ID_3); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
