This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5967f8fed6e22de9bd7df6b88498c5777098cbc3
Author: datph <[email protected]>
AuthorDate: Fri Feb 22 09:41:46 2019 +0700

    MAILBOX-380 Create CassandraEventDeadLettersModule With DAO and Table
---
 mailbox/event/event-cassandra/pom.xml              |  51 ++++++++
 .../events/CassandraEventDeadLettersDAO.java       | 131 +++++++++++++++++++++
 .../events/CassandraEventDeadLettersModule.java    |  41 +++++++
 .../tables/CassandraEventDeadLettersTable.java     |  29 +++++
 .../events/CassandraEventDeadLettersDAOTest.java   | 124 +++++++++++++++++++
 mailbox/pom.xml                                    |   1 +
 6 files changed, 377 insertions(+)

diff --git a/mailbox/event/event-cassandra/pom.xml 
b/mailbox/event/event-cassandra/pom.xml
new file mode 100644
index 0000000..1bc6222
--- /dev/null
+++ b/mailbox/event/event-cassandra/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>apache-james-mailbox</artifactId>
+        <groupId>org.apache.james</groupId>
+        <version>3.4.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>apache-james-mailbox-event-cassandra</artifactId>
+    <name>Apache James :: Mailbox :: Event :: In Cassandra 
implementation</name>
+    <description>In Cassandra implementation for the eventDeadLetter 
API</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-backends-cassandra</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-mailbox-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-mailbox-event-json</artifactId>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git 
a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
 
b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
new file mode 100644
index 0000000..0eab33b
--- /dev/null
+++ 
b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
@@ -0,0 +1,131 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static 
org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT;
+import static 
org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT_ID;
+import static 
org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.GROUP;
+import static 
org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.TABLE_NAME;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.event.json.EventSerializer;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class CassandraEventDeadLettersDAO {
+    private final CassandraAsyncExecutor executor;
+    private final EventSerializer eventSerializer;
+    private final PreparedStatement insertStatement;
+    private final PreparedStatement deleteStatement;
+    private final PreparedStatement selectAllGroupStatement;
+    private final PreparedStatement selectEventStatement;
+    private final PreparedStatement selectEventIdsWithGroupStatement;
+
+    @Inject
+    CassandraEventDeadLettersDAO(Session session, EventSerializer 
eventSerializer) {
+        this.executor = new CassandraAsyncExecutor(session);
+        this.eventSerializer = eventSerializer;
+        this.insertStatement = prepareInsertStatement(session);
+        this.deleteStatement = prepareDeleteStatement(session);
+        this.selectAllGroupStatement = prepareSelectAllGroupStatement(session);
+        this.selectEventStatement = prepareSelectEventStatement(session);
+        this.selectEventIdsWithGroupStatement = 
prepareSelectEventIdsWithGroupStatement(session);
+    }
+
+    private PreparedStatement prepareInsertStatement(Session session) {
+        return session.prepare(insertInto(TABLE_NAME)
+            .value(GROUP, bindMarker(GROUP))
+            .value(EVENT_ID, bindMarker(EVENT_ID))
+            .value(EVENT, bindMarker(EVENT)));
+    }
+
+    private PreparedStatement prepareDeleteStatement(Session session) {
+        return session.prepare(delete()
+            .from(TABLE_NAME)
+            .where(eq(GROUP, bindMarker(GROUP)))
+            .and(eq(EVENT_ID, bindMarker(EVENT_ID))));
+    }
+
+    private PreparedStatement prepareSelectAllGroupStatement(Session session) {
+        return session.prepare(select(GROUP)
+            .from(TABLE_NAME));
+    }
+
+    private PreparedStatement prepareSelectEventStatement(Session session) {
+        return session.prepare(select(EVENT)
+            .from(TABLE_NAME)
+            .where(eq(GROUP, bindMarker(GROUP)))
+            .and(eq(EVENT_ID, bindMarker(EVENT_ID))));
+    }
+
+    private PreparedStatement prepareSelectEventIdsWithGroupStatement(Session 
session) {
+        return session.prepare(select(EVENT_ID)
+            .from(TABLE_NAME)
+            .where(eq(GROUP, bindMarker(GROUP))));
+    }
+
+    Mono<Void> store(Group group, Event failedEvent) {
+        return executor.executeVoid(insertStatement.bind()
+                .setString(GROUP, group.asString())
+                .setUUID(EVENT_ID, failedEvent.getEventId().getId())
+                .setString(EVENT, eventSerializer.toJson(failedEvent)));
+    }
+
+    Mono<Void> removeEvent(Group group, Event.EventId failedEventId) {
+        return executor.executeVoid(deleteStatement.bind()
+                .setString(GROUP, group.asString())
+                .setUUID(EVENT_ID, failedEventId.getId()));
+    }
+
+    Mono<Event> retrieveFailedEvent(Group group, Event.EventId failedEventId) {
+        return executor.executeSingleRow(selectEventStatement.bind()
+                .setString(GROUP, group.asString())
+                .setUUID(EVENT_ID, failedEventId.getId()))
+                .map(row -> deserializeEvent(row.getString(EVENT)));
+    }
+
+    Flux<Event.EventId> retrieveEventIdsWithGroup(Group group) {
+        return executor.executeRows(selectEventIdsWithGroupStatement.bind()
+                .setString(GROUP, group.asString()))
+            .map(row -> Event.EventId.of(row.getUUID(EVENT_ID)));
+    }
+
+    Flux<Group> retrieveAllGroups() {
+        return executor.executeRows(selectAllGroupStatement.bind())
+            .map(Throwing.function(row -> 
Group.deserialize(row.getString(GROUP))))
+            .distinct();
+    }
+
+    private Event deserializeEvent(String serializedEvent) {
+        return eventSerializer.fromJson(serializedEvent).get();
+    }
+}
diff --git 
a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java
 
b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java
new file mode 100644
index 0000000..f8b1e30
--- /dev/null
+++ 
b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java
@@ -0,0 +1,41 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.utils.CassandraConstants;
+import org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+
+public interface CassandraEventDeadLettersModule {
+    CassandraModule MODULE = CassandraModule.builder()
+        .table(CassandraEventDeadLettersTable.TABLE_NAME)
+        .comment("Holds event dead letter")
+        .options(options -> options
+            .caching(SchemaBuilder.KeyCaching.ALL,
+                
SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))
+        .statement(statement -> statement
+            .addPartitionKey(CassandraEventDeadLettersTable.GROUP, 
DataType.text())
+            .addClusteringColumn(CassandraEventDeadLettersTable.EVENT_ID, 
DataType.uuid())
+            .addColumn(CassandraEventDeadLettersTable.EVENT, DataType.text()))
+        .build();
+}
diff --git 
a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java
 
b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java
new file mode 100644
index 0000000..2418ffe
--- /dev/null
+++ 
b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java
@@ -0,0 +1,29 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events.tables;
+
+public interface CassandraEventDeadLettersTable {
+
+    String TABLE_NAME = "event_dead_letters";
+
+    String GROUP = "group";
+    String EVENT_ID = "eventId";
+    String EVENT = "event";
+}
diff --git 
a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java
 
b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java
new file mode 100644
index 0000000..7bc66a7
--- /dev/null
+++ 
b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java
@@ -0,0 +1,124 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_1;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_2;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_3;
+import static 
org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_1;
+import static 
org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_2;
+import static 
org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_3;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_A;
+import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_B;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class CassandraEventDeadLettersDAOTest {
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraClusterExtension = new 
CassandraClusterExtension(CassandraEventDeadLettersModule.MODULE);
+
+    private CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandraCluster) {
+        EventSerializer eventSerializer = new EventSerializer(new 
TestId.Factory(), new TestMessageId.Factory());
+        cassandraEventDeadLettersDAO = new 
CassandraEventDeadLettersDAO(cassandraCluster.getConf(), eventSerializer);
+    }
+
+    @Test
+    void removeEventShouldSucceededWhenRemoveStoredEvent() {
+        cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block();
+
+        cassandraEventDeadLettersDAO.removeEvent(GROUP_A, EVENT_ID_1).block();
+
+        assertThat(cassandraEventDeadLettersDAO
+                .retrieveAllGroups()
+                .collectList().block())
+            .isEmpty();
+    }
+
+    @Test
+    void retrieveFailedEventShouldReturnEmptyWhenDefault() {
+        assertThat(cassandraEventDeadLettersDAO
+                .retrieveFailedEvent(GROUP_A, EVENT_ID_1)
+                .blockOptional().isPresent())
+            .isFalse();
+    }
+
+    @Test
+    void retrieveFailedEventShouldReturnStoredEvent() {
+        cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block();
+        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block();
+
+        assertThat(cassandraEventDeadLettersDAO
+                .retrieveFailedEvent(GROUP_B, EVENT_ID_2)
+                .blockOptional().get())
+            .isEqualTo(EVENT_2);
+    }
+
+    @Test
+    void retrieveEventIdsWithGroupShouldReturnEmptyWhenDefault() {
+        assertThat(cassandraEventDeadLettersDAO
+                .retrieveEventIdsWithGroup(GROUP_A)
+                .collectList().block())
+            .isEmpty();
+    }
+
+    @Test
+    void retrieveEventIdsWithGroupShouldReturnStoredEventId() {
+        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1).block();
+        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block();
+        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3).block();
+
+        assertThat(cassandraEventDeadLettersDAO
+                .retrieveEventIdsWithGroup(GROUP_B)
+                .collectList().block())
+            .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
+    }
+
+    @Test
+    void retrieveAllGroupsShouldReturnEmptyWhenDefault() {
+        assertThat(cassandraEventDeadLettersDAO
+                .retrieveAllGroups()
+                .collectList().block())
+            .isEmpty();
+    }
+
+    @Test
+    void retrieveAllGroupsShouldReturnStoredGroups() {
+        cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block();
+        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block();
+        cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3).block();
+
+        assertThat(cassandraEventDeadLettersDAO
+                .retrieveAllGroups()
+                .collectList().block())
+            .containsOnly(GROUP_A, GROUP_B);
+    }
+}
diff --git a/mailbox/pom.xml b/mailbox/pom.xml
index 08102cc..55fa3ab 100644
--- a/mailbox/pom.xml
+++ b/mailbox/pom.xml
@@ -41,6 +41,7 @@
         <module>cassandra</module>
         <module>elasticsearch</module>
 
+        <module>event/event-cassandra</module>
         <module>event/event-memory</module>
         <module>event/event-rabbitmq</module>
         <module>event/json</module>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to