Author: reschke Date: Mon Dec 9 14:22:50 2013 New Revision: 1549579 URL: http://svn.apache.org/r1549579 Log: OAK-1266 - work in progress SQL/JDBC DocumentStore implementation - add modCount support & check
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java?rev=1549579&r1=1549578&r2=1549579&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java Mon Dec 9 14:22:50 2013 @@ -46,6 +46,8 @@ import org.apache.jackrabbit.oak.plugins import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SQLDocumentStore implements DocumentStore { @@ -153,6 +155,8 @@ public class SQLDocumentStore implements // implementation + private static final Logger LOG = LoggerFactory.getLogger(SQLDocumentStore.class); + private final Comparator<Revision> comparator = Collections.reverseOrder(new StableRevisionComparator()); private Connection connection; @@ -166,8 +170,8 @@ public class SQLDocumentStore implements // memory document store stmt.execute("drop table if exists CLUSTERNODES"); stmt.execute("drop table if exists NODES"); - stmt.execute("create table if not exists CLUSTERNODES(ID varchar primary key, MODIFIED bigint, DATA varchar)"); - stmt.execute("create table if not exists NODES(ID varchar primary key, MODIFIED bigint, DATA varchar)"); + stmt.execute("create table if not exists CLUSTERNODES(ID varchar primary key, MODIFIED bigint, MODCOUNT bigint, DATA varchar)"); + stmt.execute("create table if not exists NODES(ID varchar primary key, MODIFIED bigint, MODCOUNT bigint, DATA varchar)"); } @CheckForNull @@ -175,8 +179,9 @@ public class SQLDocumentStore implements try { for (UpdateOp update : updates) { T doc = collection.newDocument(this); + update.increment("_modCount", 1); MemoryDocumentStore.applyChanges(doc, update, comparator); - writeDocument(collection, doc, true); + writeDocument(collection, doc, null, true); } // FIXME to be atomic return true; @@ -200,15 +205,16 @@ public class SQLDocumentStore implements if (checkConditions && !MemoryDocumentStore.checkConditions(doc, update)) { return null; } + update.increment("_modCount", 1); MemoryDocumentStore.applyChanges(doc, update, comparator); - writeDocument(collection, doc, oldDoc == null); + writeDocument(collection, doc, oldDoc != null ? (Long) oldDoc.get("_modCount") : null, oldDoc == null); doc.seal(); return oldDoc; } @CheckForNull - private <T extends Document> void internalUpdate(Collection<T> collection, List<String> ids, UpdateOp updateOp) { + private <T extends Document> void internalUpdate(Collection<T> collection, List<String> ids, UpdateOp update) { String tableName = getTable(collection); try { for (String id : ids) { @@ -217,9 +223,13 @@ public class SQLDocumentStore implements throw new MicroKernelException(tableName + " " + id + " not found"); } T doc = fromString(collection, in); - MemoryDocumentStore.applyChanges(doc, updateOp, comparator); + Long oldmodcount = (Long) doc.get("_modCount"); + update.increment("_modCount", 1); + MemoryDocumentStore.applyChanges(doc, update, comparator); String data = asString(doc); - dbUpdate(connection, tableName, id, (Long) doc.get("_modified"), data); + Long modified = (Long) doc.get("_modified"); + Long modcount = (Long) doc.get("_modCount"); + dbUpdate(connection, tableName, id, modified, modcount, oldmodcount, data); } connection.commit(); } catch (Exception ex) { @@ -318,14 +328,16 @@ public class SQLDocumentStore implements } } - private <T extends Document> void writeDocument(Collection<T> collection, T document, boolean insert) { + private <T extends Document> void writeDocument(Collection<T> collection, T document, Long oldmodcount, boolean insert) { String tableName = getTable(collection); try { String data = asString(document); + Long modified = (Long) document.get("_modified"); + Long modcount = (Long) document.get("_modCount"); if (insert) { - dbInsert(connection, tableName, document.getId(), (Long) document.get("_modified"), data); + dbInsert(connection, tableName, document.getId(), modified, modcount, data); } else { - dbUpdate(connection, tableName, document.getId(), (Long) document.get("_modified"), data); + dbUpdate(connection, tableName, document.getId(), modified, modcount, oldmodcount, data); } connection.commit(); } catch (SQLException ex) { @@ -376,24 +388,40 @@ public class SQLDocumentStore implements return result; } - private void dbUpdate(Connection connection, String tableName, String id, Long modified, String data) throws SQLException { - PreparedStatement stmt = connection.prepareStatement("UPDATE " + tableName + " SET MODIFIED = ?, DATA = ? WHERE ID = ?"); + private void dbUpdate(Connection connection, String tableName, String id, Long modified, Long modcount, Long oldmodcount, + String data) throws SQLException { + String t = "update " + tableName + " set MODIFIED = ?, MODCOUNT = ?, DATA = ? where ID = ?"; + if (oldmodcount != null) { + t += " and MODCOUNT = ?"; + } + PreparedStatement stmt = connection.prepareStatement(t); try { stmt.setObject(1, modified, Types.BIGINT); - stmt.setString(2, data); - stmt.setString(3, id); - stmt.executeUpdate(); + stmt.setObject(2, modcount, Types.BIGINT); + stmt.setString(3, data); + stmt.setString(4, id); + if (oldmodcount != null) { + stmt.setObject(5, oldmodcount, Types.BIGINT); + } + int result = stmt.executeUpdate(); + if (result != 1) { + String message = "update failed for key=" + id + ", modCount=" + modcount + ", oldmodcount=" + oldmodcount; + LOG.error(message); + throw new MicroKernelException(message); + } } finally { stmt.close(); } } - private void dbInsert(Connection connection, String tableName, String id, Long modified, String data) throws SQLException { - PreparedStatement stmt = connection.prepareStatement("INSERT INTO " + tableName + " VALUES(?, ?, ?)"); + private void dbInsert(Connection connection, String tableName, String id, Long modified, Long modcount, String data) + throws SQLException { + PreparedStatement stmt = connection.prepareStatement("insert into " + tableName + " values(?, ?, ?, ?)"); try { stmt.setString(1, id); stmt.setObject(2, modified, Types.BIGINT); - stmt.setString(3, data); + stmt.setObject(3, modcount, Types.BIGINT); + stmt.setString(4, data); stmt.executeUpdate(); } finally { stmt.close();