Author: reschke
Date: Thu Dec 5 17:24:33 2013
New Revision: 1548211
URL: http://svn.apache.org/r1548211
Log:
OAK-1266 - work in progress SQL/JDBC DocumentStore implementation
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/pom.xml
Modified: jackrabbit/oak/trunk/oak-core/pom.xml
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/pom.xml?rev=1548211&r1=1548210&r2=1548211&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-core/pom.xml Thu Dec 5 17:24:33 2013
@@ -278,7 +278,7 @@
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
- <scope>test</scope>
+ <!--<scope>test</scope> temporarily changed the scope for OAK-1266-->
</dependency>
</dependencies>
</project>
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java?rev=1548211&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
Thu Dec 5 17:24:33 2013
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.sqlpersistence;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.oak.plugins.mongomk.Collection;
+import org.apache.jackrabbit.oak.plugins.mongomk.Document;
+import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.Revision;
+import org.apache.jackrabbit.oak.plugins.mongomk.StableRevisionComparator;
+import org.apache.jackrabbit.oak.plugins.mongomk.UpdateOp;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+public class SQLDocumentStore implements DocumentStore {
+
+ public SQLDocumentStore() {
+ try {
+ initialize(new File("."));
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public <T extends Document> T find(Collection<T> collection, String id) {
+ return find(collection, id, 0);
+ }
+
+ @Override
+ public <T extends Document> T find(Collection<T> collection, String id,
int maxCacheAge) {
+ // TODO handle maxCacheAge
+ return readDocument(collection, id);
+ }
+
+ @Override
+ public <T extends Document> List<T> query(Collection<T> collection, String
fromKey, String toKey, int limit) {
+ return query(collection, fromKey, toKey, null, 0, 0);
+ }
+
+ @Override
+ public <T extends Document> List<T> query(Collection<T> collection, String
fromKey, String toKey, String indexedProperty,
+ long startValue, int limit) {
+ return internalQuery(collection, fromKey, toKey, indexedProperty,
startValue, limit);
+ }
+
+ @Override
+ public <T extends Document> void remove(Collection<T> collection, String
id) {
+ delete(collection, id);
+ }
+
+ @Override
+ public <T extends Document> boolean create(Collection<T> collection,
List<UpdateOp> updateOps) {
+ return internalCreate(collection, updateOps);
+ }
+
+ @Override
+ public <T extends Document> void update(Collection<T> collection,
List<String> keys, UpdateOp updateOp) {
+ internalUpdate(collection, keys, updateOp);
+ }
+
+ @Override
+ public <T extends Document> T createOrUpdate(Collection<T> collection,
UpdateOp update) throws MicroKernelException {
+ return internalCreateOrUpdate(collection, update, false);
+ }
+
+ @Override
+ public <T extends Document> T findAndUpdate(Collection<T> collection,
UpdateOp update) throws MicroKernelException {
+ return internalCreateOrUpdate(collection, update, true);
+ }
+
+ @Override
+ public void invalidateCache() {
+ // TODO
+ }
+
+ @Override
+ public <T extends Document> void invalidateCache(Collection<T> collection,
String id) {
+ // TODO
+ }
+
+ @Override
+ public void dispose() {
+ try {
+ this.connection.close();
+ this.connection = null;
+ } catch (SQLException ex) {
+ throw new MicroKernelException(ex);
+ }
+ }
+
+ @Override
+ public <T extends Document> T getIfCached(Collection<T> collection, String
id) {
+ return null;
+ }
+
+ // implementation
+
+ private final Comparator<Revision> comparator =
Collections.reverseOrder(new StableRevisionComparator());
+
+ private Connection connection;
+
+ private void initialize(File homeDir) throws Exception {
+ File dbDir = new File(homeDir, "db");
+ if (!dbDir.exists()) {
+ dbDir.mkdirs();
+ }
+
+ String jdbcuri = "jdbc:h2:" + dbDir.getCanonicalPath() + "/revs";
+ connection = DriverManager.getConnection(jdbcuri, "sa", "");
+ connection.setAutoCommit(false);
+ Statement stmt = connection.createStatement();
+ stmt.execute("drop table if exists CLUSTERNODES");
+ stmt.execute("drop table if exists NODES");
+ stmt.execute("create table if not exists CLUSTERNODES(ID varchar
primary key, MODIFIED bigint, DATA varchar)");
+ stmt.execute("create table if not exists NODES(ID varchar primary key,
MODIFIED bigint, DATA varchar)");
+ }
+
+ @CheckForNull
+ private <T extends Document> boolean internalCreate(Collection<T>
collection, List<UpdateOp> updates) {
+ try {
+ for (UpdateOp update : updates) {
+ T doc = collection.newDocument(this);
+ MemoryDocumentStore.applyChanges(doc, update, comparator);
+ writeDocument(collection, doc, true);
+ }
+ // FIXME to be atomic
+ return true;
+ } catch (MicroKernelException ex) {
+ return false;
+ }
+ }
+
+ @CheckForNull
+ private <T extends Document> T internalCreateOrUpdate(Collection<T>
collection, UpdateOp update, boolean checkConditions) {
+ T oldDoc = readDocument(collection, update.getId());
+
+ T doc = collection.newDocument(this);
+ if (oldDoc == null) {
+ if (!update.isNew()) {
+ throw new MicroKernelException("Document does not exist: " +
update.getId());
+ }
+ } else {
+ oldDoc.deepCopy(doc);
+ }
+ if (checkConditions && !MemoryDocumentStore.checkConditions(doc,
update)) {
+ return null;
+ }
+ MemoryDocumentStore.applyChanges(doc, update, comparator);
+ writeDocument(collection, doc, oldDoc == null);
+ doc.seal();
+
+ return oldDoc;
+ }
+
+ @CheckForNull
+ private <T extends Document> void internalUpdate(Collection<T> collection,
List<String> ids, UpdateOp updateOp) {
+ String tableName = getTable(collection);
+ try {
+ for (String id : ids) {
+ String in = dbRead(connection, tableName, id);
+ if (in == null) {
+ throw new MicroKernelException(tableName + " " + id + "
not found");
+ }
+ T doc = fromString(collection, in);
+ MemoryDocumentStore.applyChanges(doc, updateOp, comparator);
+ String data = asString(doc);
+ dbUpdate(connection, tableName, id, (Long)
doc.get("_modified"), data);
+ }
+ connection.commit();
+ } catch (Exception ex) {
+ throw new MicroKernelException(ex);
+ }
+ }
+
+ private <T extends Document> List<T> internalQuery(Collection<T>
collection, String fromKey, String toKey,
+ String indexedProperty, long startValue, int limit) {
+ String tableName = getTable(collection);
+ List<T> result = new ArrayList<T>();
+ if (indexedProperty != null && !"_modified".equals(indexedProperty)) {
+ throw new RuntimeException("indexed property " + indexedProperty +
" not supported");
+ }
+ try {
+ List<String> dbresult = dbQuery(connection, tableName, fromKey,
toKey, indexedProperty, startValue);
+ for (String data : dbresult) {
+ T doc = fromString(collection, data);
+ doc.seal();
+ result.add(doc);
+ }
+ } catch (Exception ex) {
+ throw new MicroKernelException(ex);
+ }
+ return result;
+ }
+
+ private static <T extends Document> String getTable(Collection<T>
collection) {
+ if (collection == Collection.CLUSTER_NODES) {
+ return "CLUSTERNODES";
+ } else if (collection == Collection.NODES) {
+ return "NODES";
+ } else {
+ throw new IllegalArgumentException("Unknown collection: " +
collection.toString());
+ }
+ }
+
+ private static String asString(Document doc) {
+ JSONObject obj = new JSONObject();
+ for (String key : doc.keySet()) {
+ Object value = doc.get(key);
+ obj.put(key, value);
+ }
+ return obj.toJSONString();
+ }
+
+ private <T extends Document> T fromString(Collection<T> collection, String
data) throws ParseException {
+ T doc = collection.newDocument(this);
+ Map<String, Object> obj = (Map<String, Object>) new
JSONParser().parse(data);
+ for (Map.Entry<String, Object> entry : obj.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ if (value == null) {
+ // ???
+ doc.put(key, value);
+ } else if (value instanceof Boolean || value instanceof Long ||
value instanceof String) {
+ doc.put(key, value);
+ } else if (value instanceof JSONObject) {
+ doc.put(key, convertJsonObject((JSONObject) value));
+ } else {
+ throw new RuntimeException("unexpected type: " +
value.getClass());
+ }
+ }
+ return doc;
+ }
+
+ @Nonnull
+ private Map<Revision, Object> convertJsonObject(@Nonnull JSONObject obj) {
+ Map<Revision, Object> map = new TreeMap<Revision, Object>(comparator);
+ Set<Map.Entry> entries = obj.entrySet();
+ for (Map.Entry entry : entries) {
+ // not clear why every persisted map is a revision map
+ map.put(Revision.fromString(entry.getKey().toString()),
entry.getValue());
+ }
+ return map;
+ }
+
+ @CheckForNull
+ private <T extends Document> T readDocument(Collection<T> collection,
String id) {
+ String tableName = getTable(collection);
+ try {
+ String in = dbRead(connection, tableName, id);
+ return in != null ? fromString(collection, in) : null;
+ } catch (Exception ex) {
+ throw new MicroKernelException(ex);
+ }
+ }
+
+ private <T extends Document> void delete(Collection<T> collection, String
id) {
+ String tableName = getTable(collection);
+ try {
+ dbDelete(connection, tableName, id);
+ connection.commit();
+ } catch (Exception ex) {
+ throw new MicroKernelException(ex);
+ }
+ }
+
+ private <T extends Document> void writeDocument(Collection<T> collection,
T document, boolean insert) {
+ String tableName = getTable(collection);
+ try {
+ String data = asString(document);
+ if (insert) {
+ dbInsert(connection, tableName, document.getId(), (Long)
document.get("_modified"), data);
+ } else {
+ dbUpdate(connection, tableName, document.getId(), (Long)
document.get("_modified"), data);
+ }
+ connection.commit();
+ } catch (SQLException ex) {
+ throw new MicroKernelException(ex);
+ }
+ }
+
+ // low level operations
+
+ @CheckForNull
+ private String dbRead(Connection connection, String tableName, String id)
throws SQLException {
+ PreparedStatement stmt = connection.prepareStatement("select DATA from
" + tableName + " where ID = ?");
+ try {
+ stmt.setString(1, id);
+ ResultSet rs = stmt.executeQuery();
+ if (rs.next()) {
+ return rs.getString(1);
+ } else {
+ return null;
+ }
+ } finally {
+ stmt.close();
+ }
+ }
+
+ private List<String> dbQuery(Connection connection, String tableName,
String minId, String maxId, String indexedProperty,
+ long startValue) throws SQLException {
+ String t = "select DATA from " + tableName + " where ID > ? and ID <
?";
+ if (indexedProperty != null) {
+ t += " and MODIFIED >= ?";
+ }
+ PreparedStatement stmt = connection.prepareStatement(t);
+ List<String> result = new ArrayList<String>();
+ try {
+ stmt.setString(1, minId);
+ stmt.setString(2, maxId);
+ if (indexedProperty != null) {
+ stmt.setLong(3, startValue);
+ }
+ ResultSet rs = stmt.executeQuery();
+ while (rs.next()) {
+ String data = rs.getString(1);
+ result.add(data);
+ }
+ } finally {
+ stmt.close();
+ }
+ return result;
+ }
+
+ private void dbUpdate(Connection connection, String tableName, String id,
Long modified, String data) throws SQLException {
+ PreparedStatement stmt = connection.prepareStatement("UPDATE " +
tableName + " SET MODIFIED = ?, DATA = ? WHERE ID = ?");
+ try {
+ stmt.setObject(1, modified, Types.BIGINT);
+ stmt.setString(2, data);
+ stmt.setString(3, id);
+ stmt.executeUpdate();
+ } finally {
+ stmt.close();
+ }
+ }
+
+ private void dbInsert(Connection connection, String tableName, String id,
Long modified, String data) throws SQLException {
+ PreparedStatement stmt = connection.prepareStatement("INSERT INTO " +
tableName + " VALUES(?, ?, ?)");
+ try {
+ stmt.setString(1, id);
+ stmt.setObject(2, modified, Types.BIGINT);
+ stmt.setString(3, data);
+ stmt.executeUpdate();
+ } finally {
+ stmt.close();
+ }
+ }
+
+ private void dbDelete(Connection connection, String tableName, String id)
throws SQLException {
+ PreparedStatement stmt = connection.prepareStatement("delete from " +
tableName + " where ID = ?");
+ try {
+ stmt.setString(1, id);
+ stmt.executeUpdate();
+ } finally {
+ stmt.close();
+ }
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
------------------------------------------------------------------------------
svn:executable = *