JAMES-2083 Store Cassandra versions with event sourcing

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/2474f1b8
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/2474f1b8
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/2474f1b8

Branch: refs/heads/master
Commit: 2474f1b83c13091bc640c6538d0096ef655c0d9d
Parents: 5981b46
Author: benwa <btell...@linagora.com>
Authored: Wed Jul 5 08:38:41 2017 +0700
Committer: Antoine Duprat <adup...@linagora.com>
Committed: Mon Jul 10 14:24:00 2017 +0200

----------------------------------------------------------------------
 .../versions/CassandraSchemaVersionDAO.java     | 31 ++++++++++++--------
 .../versions/CassandraSchemaVersionModule.java  |  3 +-
 .../versions/CassandraSchemaVersionDAOTest.java |  6 ++--
 3 files changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/2474f1b8/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java
index 555e0d4..83a16ba 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java
@@ -20,58 +20,63 @@
 package org.apache.james.backends.cassandra.versions;
 
 import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static 
org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.KEY;
-import static 
org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.KEY_FOR_VERSION;
 import static 
org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.TABLE_NAME;
 import static 
org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.VALUE;
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import javax.inject.Inject;
+
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import 
org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-
-import javax.inject.Inject;
+import com.datastax.driver.core.utils.UUIDs;
 
 public class CassandraSchemaVersionDAO {
     private final PreparedStatement readVersionStatement;
     private final PreparedStatement writeVersionStatement;
+    private CassandraUtils cassandraUtils;
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
 
     @Inject
-    public CassandraSchemaVersionDAO(Session session) {
+    public CassandraSchemaVersionDAO(Session session, CassandraUtils 
cassandraUtils) {
         cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         readVersionStatement = prepareReadVersionStatement(session);
         writeVersionStatement = prepareWriteVersionStatement(session);
+        this.cassandraUtils = cassandraUtils;
     }
 
     private PreparedStatement prepareReadVersionStatement(Session session) {
         return session.prepare(
-            QueryBuilder.select(VALUE)
-                .from(TABLE_NAME)
-                .where(QueryBuilder.eq(KEY, KEY_FOR_VERSION)));
+            select(VALUE)
+                .from(TABLE_NAME));
     }
 
     private PreparedStatement prepareWriteVersionStatement(Session session) {
         return session.prepare(
-            
QueryBuilder.insertInto(CassandraSchemaVersionTable.TABLE_NAME).value(KEY, 
KEY_FOR_VERSION)
+            insertInto(CassandraSchemaVersionTable.TABLE_NAME)
+                .value(KEY, bindMarker(KEY))
                 .value(VALUE, bindMarker(VALUE)));
     }
 
     public CompletableFuture<Optional<Integer>> getCurrentSchemaVersion() {
-        return 
cassandraAsyncExecutor.executeSingleRow(readVersionStatement.bind())
-            .thenApply(rowOptional ->
-                rowOptional
-                    .map(row -> row.getInt(VALUE)));
+        return cassandraAsyncExecutor.execute(readVersionStatement.bind())
+            .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
+                .map(row -> row.getInt(VALUE))
+                .reduce(Math::max));
     }
 
     public CompletableFuture<Void> updateVersion(int newVersion) {
         return cassandraAsyncExecutor.executeVoid(
             writeVersionStatement.bind()
+                .setUUID(KEY, UUIDs.timeBased())
                 .setInt(VALUE, newVersion));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/2474f1b8/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionModule.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionModule.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionModule.java
index decc422..4eb7325 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionModule.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionModule.java
@@ -20,6 +20,7 @@
 package org.apache.james.backends.cassandra.versions;
 
 import static com.datastax.driver.core.DataType.cint;
+import static com.datastax.driver.core.DataType.timeuuid;
 
 import java.util.List;
 
@@ -40,7 +41,7 @@ public class CassandraSchemaVersionModule implements 
CassandraModule {
             new CassandraTable(CassandraSchemaVersionTable.TABLE_NAME,
                 
SchemaBuilder.createTable(CassandraSchemaVersionTable.TABLE_NAME)
                     .ifNotExists()
-                    .addPartitionKey(CassandraSchemaVersionTable.KEY, cint())
+                    .addPartitionKey(CassandraSchemaVersionTable.KEY, 
timeuuid())
                     .addClusteringColumn(CassandraSchemaVersionTable.VALUE, 
cint())));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/2474f1b8/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java
 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java
index e0834f3..41b8efd 100644
--- 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java
+++ 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.util.Optional;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,13 +35,12 @@ public class CassandraSchemaVersionDAOTest {
 
     private CassandraSchemaVersionDAO testee;
 
-
     @Before
     public void setUp() {
         cassandra = CassandraCluster.create(new 
CassandraSchemaVersionModule());
         cassandra.ensureAllTables();
 
-        testee = new CassandraSchemaVersionDAO(cassandra.getConf());
+        testee = new CassandraSchemaVersionDAO(cassandra.getConf(), 
CassandraUtils.WITH_DEFAULT_CONFIGURATION);
     }
 
     @After
@@ -61,7 +61,7 @@ public class CassandraSchemaVersionDAOTest {
 
         testee.updateVersion(version).join();
 
-        
assertThat(testee.getCurrentSchemaVersion().join()).isEqualTo(Optional.of(version));
+        assertThat(testee.getCurrentSchemaVersion().join()).contains(version);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to