JAMES-2083 Store Cassandra versions with event sourcing
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/2474f1b8 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/2474f1b8 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/2474f1b8 Branch: refs/heads/master Commit: 2474f1b83c13091bc640c6538d0096ef655c0d9d Parents: 5981b46 Author: benwa <btell...@linagora.com> Authored: Wed Jul 5 08:38:41 2017 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Mon Jul 10 14:24:00 2017 +0200 ---------------------------------------------------------------------- .../versions/CassandraSchemaVersionDAO.java | 31 ++++++++++++-------- .../versions/CassandraSchemaVersionModule.java | 3 +- .../versions/CassandraSchemaVersionDAOTest.java | 6 ++-- 3 files changed, 23 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/2474f1b8/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java index 555e0d4..83a16ba 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAO.java @@ -20,58 +20,63 @@ package org.apache.james.backends.cassandra.versions; import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.KEY; -import static org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.KEY_FOR_VERSION; import static org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.TABLE_NAME; import static org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.VALUE; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import javax.inject.Inject; + import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; -import com.datastax.driver.core.querybuilder.QueryBuilder; - -import javax.inject.Inject; +import com.datastax.driver.core.utils.UUIDs; public class CassandraSchemaVersionDAO { private final PreparedStatement readVersionStatement; private final PreparedStatement writeVersionStatement; + private CassandraUtils cassandraUtils; private final CassandraAsyncExecutor cassandraAsyncExecutor; @Inject - public CassandraSchemaVersionDAO(Session session) { + public CassandraSchemaVersionDAO(Session session, CassandraUtils cassandraUtils) { cassandraAsyncExecutor = new CassandraAsyncExecutor(session); readVersionStatement = prepareReadVersionStatement(session); writeVersionStatement = prepareWriteVersionStatement(session); + this.cassandraUtils = cassandraUtils; } private PreparedStatement prepareReadVersionStatement(Session session) { return session.prepare( - QueryBuilder.select(VALUE) - .from(TABLE_NAME) - .where(QueryBuilder.eq(KEY, KEY_FOR_VERSION))); + select(VALUE) + .from(TABLE_NAME)); } private PreparedStatement prepareWriteVersionStatement(Session session) { return session.prepare( - QueryBuilder.insertInto(CassandraSchemaVersionTable.TABLE_NAME).value(KEY, KEY_FOR_VERSION) + insertInto(CassandraSchemaVersionTable.TABLE_NAME) + .value(KEY, bindMarker(KEY)) .value(VALUE, bindMarker(VALUE))); } public CompletableFuture<Optional<Integer>> getCurrentSchemaVersion() { - return cassandraAsyncExecutor.executeSingleRow(readVersionStatement.bind()) - .thenApply(rowOptional -> - rowOptional - .map(row -> row.getInt(VALUE))); + return cassandraAsyncExecutor.execute(readVersionStatement.bind()) + .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet) + .map(row -> row.getInt(VALUE)) + .reduce(Math::max)); } public CompletableFuture<Void> updateVersion(int newVersion) { return cassandraAsyncExecutor.executeVoid( writeVersionStatement.bind() + .setUUID(KEY, UUIDs.timeBased()) .setInt(VALUE, newVersion)); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/2474f1b8/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionModule.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionModule.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionModule.java index decc422..4eb7325 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionModule.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionModule.java @@ -20,6 +20,7 @@ package org.apache.james.backends.cassandra.versions; import static com.datastax.driver.core.DataType.cint; +import static com.datastax.driver.core.DataType.timeuuid; import java.util.List; @@ -40,7 +41,7 @@ public class CassandraSchemaVersionModule implements CassandraModule { new CassandraTable(CassandraSchemaVersionTable.TABLE_NAME, SchemaBuilder.createTable(CassandraSchemaVersionTable.TABLE_NAME) .ifNotExists() - .addPartitionKey(CassandraSchemaVersionTable.KEY, cint()) + .addPartitionKey(CassandraSchemaVersionTable.KEY, timeuuid()) .addClusteringColumn(CassandraSchemaVersionTable.VALUE, cint()))); } http://git-wip-us.apache.org/repos/asf/james-project/blob/2474f1b8/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java index e0834f3..41b8efd 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionDAOTest.java @@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Optional; import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -34,13 +35,12 @@ public class CassandraSchemaVersionDAOTest { private CassandraSchemaVersionDAO testee; - @Before public void setUp() { cassandra = CassandraCluster.create(new CassandraSchemaVersionModule()); cassandra.ensureAllTables(); - testee = new CassandraSchemaVersionDAO(cassandra.getConf()); + testee = new CassandraSchemaVersionDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION); } @After @@ -61,7 +61,7 @@ public class CassandraSchemaVersionDAOTest { testee.updateVersion(version).join(); - assertThat(testee.getCurrentSchemaVersion().join()).isEqualTo(Optional.of(version)); + assertThat(testee.getCurrentSchemaVersion().join()).contains(version); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org