Author: thomasm
Date: Thu Apr 4 14:48:46 2013
New Revision: 1464576
URL: http://svn.apache.org/r1464576
Log:
OAK-746 Builder for the MongoMK
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/osgi/MongoMicroKernelService.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/DocumentStore.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MemoryDocumentStore.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Revision.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BaseMongoMKTest.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BlobTest.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedClusterTest.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedTest.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/osgi/MongoMicroKernelService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/osgi/MongoMicroKernelService.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/osgi/MongoMicroKernelService.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/osgi/MongoMicroKernelService.java
Thu Apr 4 14:48:46 2013
@@ -65,7 +65,8 @@ public class MongoMicroKernelService {
private MongoMK mk;
@Activate
- private void activate(BundleContext context,Map<String,?> config) throws
Exception {
+ private void activate(BundleContext context, Map<String, ?> config)
+ throws Exception {
String host = PropertiesUtil.toString(config.get(PROP_HOST),
DEFAULT_HOST);
int port = PropertiesUtil.toInteger(config.get(PROP_PORT),
DEFAULT_PORT);
String db = PropertiesUtil.toString(config.get(PROP_DB), DEFAULT_DB);
@@ -78,11 +79,11 @@ public class MongoMicroKernelService {
logger.info("Connected to database {}", mongoDB);
- mk = new MongoMK(mongoDB, 0);
+ mk = new MongoMK.Builder().setMongoDB(mongoDB).open();
Properties props = new Properties();
- props.setProperty("oak.mk.type","mongo");
- reg = context.registerService(MicroKernel.class.getName(),mk,props);
+ props.setProperty("oak.mk.type", "mongo");
+ reg = context.registerService(MicroKernel.class.getName(), mk, props);
}
@Deactivate
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
Thu Apr 4 14:48:46 2013
@@ -90,6 +90,11 @@ public class Commit {
diff.newline();
}
+ public void touchNode(String path) {
+ UpdateOp op = getUpdateOperationForNode(path);
+ op.setMapEntry(UpdateOp.LAST_REV + "." + revision.getClusterId(),
revision.toString());
+ }
+
void updateProperty(String path, String propertyName, String value) {
UpdateOp op = getUpdateOperationForNode(path);
String key = Utils.escapePropertyName(propertyName);
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/DocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/DocumentStore.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/DocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/DocumentStore.java
Thu Apr 4 14:48:46 2013
@@ -35,15 +35,32 @@ public interface DocumentStore {
enum Collection { NODES }
/**
- * Get a document. The returned map is a clone (the caller
- * can modify it without affecting the stored version).
- *
+ * Get a document.
+ * <p>
+ * The returned map is a clone (the caller can modify it without affecting
+ * the stored version).
+ *
* @param collection the collection
* @param key the key
* @return the map, or null if not found
*/
@CheckForNull
Map<String, Object> find(Collection collection, String key);
+
+ /**
+ * Get a document, ignoring the cache if the cached entry is older than the
+ * specified time.
+ * <p>
+ * The returned map is a clone (the caller can modify it without affecting
+ * the stored version).
+ *
+ * @param collection the collection
+ * @param key the key
+ * @param maxCacheAge the maximum age of the cached document
+ * @return the map, or null if not found
+ */
+ @CheckForNull
+ Map<String, Object> find(Collection collection, String key, int
maxCacheAge);
@Nonnull
List<Map<String, Object>> query(Collection collection, String fromKey,
String toKey, int limit);
@@ -77,7 +94,9 @@ public interface DocumentStore {
@Nonnull
Map<String, Object> createOrUpdate(Collection collection, UpdateOp update)
throws MicroKernelException;
-
+
+ void invalidateCache();
+
void dispose();
}
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MemoryDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MemoryDocumentStore.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MemoryDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MemoryDocumentStore.java
Thu Apr 4 14:48:46 2013
@@ -48,9 +48,13 @@ public class MemoryDocumentStore impleme
private ConcurrentSkipListMap<String, Map<String, Object>> nodes =
new ConcurrentSkipListMap<String, Map<String, Object>>();
- public Map<String, Object> find(Collection collection, String path) {
+ public Map<String, Object> find(Collection collection, String key, int
maxCacheAge) {
+ return find(collection, key);
+ }
+
+ public Map<String, Object> find(Collection collection, String key) {
ConcurrentSkipListMap<String, Map<String, Object>> map =
getMap(collection);
- Map<String, Object> n = map.get(path);
+ Map<String, Object> n = map.get(key);
if (n == null) {
return null;
}
@@ -219,6 +223,11 @@ public class MemoryDocumentStore impleme
}
@Override
+ public void invalidateCache() {
+ // there is no cache, so nothing to invalidate
+ }
+
+ @Override
public void dispose() {
// ignore
}
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
Thu Apr 4 14:48:46 2013
@@ -18,7 +18,6 @@ package org.apache.jackrabbit.mongomk.pr
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -27,13 +26,13 @@ import java.util.concurrent.ExecutionExc
import javax.annotation.Nonnull;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.mongomk.prototype.UpdateOp.Operation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
@@ -49,12 +48,6 @@ import com.mongodb.WriteResult;
*/
public class MongoDocumentStore implements DocumentStore {
- /**
- * Marker instance to be used as a value in cache to indicate that no
value exist for given key as Guava
- * cache does not allow null values
- */
- static final Map<String, Object> NULL_VAL = Collections.emptyMap();
-
private static final Logger LOG =
LoggerFactory.getLogger(MongoDocumentStore.class);
private static final boolean LOG_TIME = false;
@@ -63,7 +56,7 @@ public class MongoDocumentStore implemen
private long time;
- private final Cache<String, Map<String, Object>> cache;
+ private final Cache<String, CachedDocument> cache;
public MongoDocumentStore(DB db) {
nodesCollection = db.getCollection(Collection.NODES.toString());
@@ -99,26 +92,44 @@ public class MongoDocumentStore implemen
// oak-jcr doesn't call dispose()
dispose();
}
+
+ @Override
+ public void invalidateCache() {
+ cache.invalidateAll();
+ }
+ public Map<String, Object> find(Collection collection, String path) {
+ return find(collection, path, Integer.MAX_VALUE);
+ }
+
@Override
- public Map<String, Object> find(final Collection collection, final String
path) {
+ public Map<String, Object> find(final Collection collection, final String
path, int maxCacheAge) {
try {
- Map<String, Object> returnVal = cache.get(path, new
Callable<Map<String, Object>>() {
- @Override
- public Map<String, Object> call() throws Exception {
- Map<String, Object> result;
- result = loadDocument(collection, path);
- // support caching of null entries
- return result == null ? NULL_VAL : result;
+ CachedDocument doc;
+ while (true) {
+ doc = cache.get(path, new Callable<CachedDocument>() {
+ @Override
+ public CachedDocument call() throws Exception {
+ Map<String, Object> map = findUncached(collection,
path);
+ return new CachedDocument(map);
+ }
+ });
+ if (maxCacheAge == Integer.MAX_VALUE) {
+ break;
}
- });
- return returnVal == NULL_VAL ? null : returnVal;
+ if (System.currentTimeMillis() - doc.time < maxCacheAge) {
+ break;
+ }
+ // too old: invalidate, try again
+ cache.invalidate(path);
+ }
+ return doc.value;
} catch (ExecutionException e) {
throw new IllegalStateException("Failed to load node " + path, e);
}
}
- protected Map<String, Object> loadDocument(Collection collection, String
path) {
+ public Map<String, Object> findUncached(Collection collection, String
path) {
DBCollection dbCollection = getDBCollection(collection);
long start = start();
try {
@@ -150,7 +161,7 @@ public class MongoDocumentStore implemen
DBObject o = cursor.next();
Map<String, Object> map = convertFromDBObject(o);
String key = (String) map.get(UpdateOp.ID);
- cache.put(key, map);
+ cache.put(key, new CachedDocument(map));
list.add(map);
}
return list;
@@ -247,7 +258,7 @@ public class MongoDocumentStore implemen
Utils.deepCopyMap(map, newMap);
String key = updateOp.getKey();
MemoryDocumentStore.applyChanges(newMap, updateOp);
- cache.put(key, newMap);
+ cache.put(key, new CachedDocument(newMap));
log("createOrUpdate returns ", map);
return map;
@@ -306,7 +317,7 @@ public class MongoDocumentStore implemen
}
for (Map<String, Object> map : maps) {
String id = (String) map.get(UpdateOp.ID);
- cache.put(id, map);
+ cache.put(id, new CachedDocument(map));
}
return true;
} catch (MongoException e) {
@@ -364,5 +375,16 @@ public class MongoDocumentStore implemen
LOG.debug(message + argList);
}
}
+
+ /**
+ * A cache entry.
+ */
+ static class CachedDocument {
+ final long time = System.currentTimeMillis();
+ final Map<String, Object> value;
+ CachedDocument(Map<String, Object> value) {
+ this.value = value;
+ }
+ }
}
\ No newline at end of file
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
Thu Apr 4 14:48:46 2013
@@ -19,14 +19,17 @@ package org.apache.jackrabbit.mongomk.pr
import java.io.InputStream;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -81,7 +84,7 @@ public class MongoMK implements MicroKer
* cache update).
*/
// TODO test observation with multiple Oak instances
- protected static final long ASYNC_DELAY = 1000;
+ protected int asyncDelay = 1000;
/**
* Whether this instance is disposed.
@@ -106,22 +109,32 @@ public class MongoMK implements MicroKer
/**
* The node cache.
*
- * Key: path@rev
- * Value: node
+ * Key: path@rev, value: node
*/
private final Cache<String, Node> nodeCache;
/**
* Child node cache.
+ *
+ * Key: path@rev, value: children
*/
private final Cache<String, Node.Children> nodeChildrenCache;
/**
* The unsaved last revisions.
+ *
* Key: path, value: revision.
*/
private final Map<String, Revision> unsavedLastRevisions =
- new HashMap<String, Revision>();
+ new ConcurrentHashMap<String, Revision>();
+
+ /**
+ * The last known revision for each cluster node.
+ *
+ * Key: the machine id, value: revision.
+ */
+ private final Map<Integer, Revision> lastKnownRevision =
+ new ConcurrentHashMap<Integer, Revision>();
/**
* The last known head revision. This is the last-known revision.
@@ -130,7 +143,7 @@ public class MongoMK implements MicroKer
private Thread backgroundThread;
- private int simpleRevisionCounter;
+ private AtomicInteger simpleRevisionCounter;
/**
* Maps branch commit revision to revision it is based on
@@ -142,7 +155,7 @@ public class MongoMK implements MicroKer
* Create a new in-memory MongoMK used for testing.
*/
public MongoMK() {
- this(new MemoryDocumentStore(), new MemoryBlobStore(), 0);
+ this(new Builder());
}
/**
@@ -152,9 +165,7 @@ public class MongoMK implements MicroKer
* @param clusterId the cluster id (must be unique)
*/
public MongoMK(DB db, int clusterId) {
- this(db == null ? new MemoryDocumentStore() : new
MongoDocumentStore(db),
- db == null ? new MemoryBlobStore() : new MongoBlobStore(db),
- clusterId);
+ this(new Builder().setMongoDB(db).setClusterId(clusterId));
}
/**
@@ -165,9 +176,14 @@ public class MongoMK implements MicroKer
* @param clusterId the cluster id (must be unique)
*/
public MongoMK(DocumentStore store, BlobStore blobStore, int clusterId) {
- this.store = store;
- this.blobStore = blobStore;
- this.clusterId = clusterId;
+ this(new
Builder().setDocumentStore(store).setBlobStore(blobStore).setClusterId(clusterId));
+ }
+
+ MongoMK(Builder builder) {
+ this.store = builder.getDocumentStore();
+ this.blobStore = builder.getBlobStore();
+ this.clusterId = builder.getClusterId();
+ this.asyncDelay = builder.getAsyncDelay();
//TODO Use size based weigher
nodeCache = CacheBuilder.newBuilder()
@@ -177,17 +193,17 @@ public class MongoMK implements MicroKer
nodeChildrenCache = CacheBuilder.newBuilder()
.maximumSize(CACHE_CHILDREN)
.build();
-
+
+ init();
+ }
+
+ void init() {
backgroundThread = new Thread(
new BackgroundOperation(this, isDisposed),
"MongoMK background thread");
backgroundThread.setDaemon(true);
backgroundThread.start();
-
- init();
- }
-
- void init() {
+
headRevision = newRevision();
Node n = readNode("/", headRevision);
if (n == null) {
@@ -221,7 +237,7 @@ public class MongoMK implements MicroKer
* for testing.
*/
void useSimpleRevisions() {
- this.simpleRevisionCounter = 1;
+ this.simpleRevisionCounter = new AtomicInteger(1);
init();
}
@@ -231,14 +247,90 @@ public class MongoMK implements MicroKer
* @return the revision
*/
Revision newRevision() {
- if (simpleRevisionCounter > 0) {
- return new Revision(simpleRevisionCounter++, 0, clusterId);
+ if (simpleRevisionCounter != null) {
+ return new Revision(simpleRevisionCounter.getAndIncrement(), 0,
clusterId);
}
return Revision.newRevision(clusterId);
}
void runBackgroundOperations() {
- // to be implemented
+ if (isDisposed.get()) {
+ return;
+ }
+ if (simpleRevisionCounter != null) {
+ // only when using timestamp
+ return;
+ }
+ try {
+ // backgroundWrite();
+ // backgroundRead();
+ } catch (RuntimeException e) {
+ if (isDisposed.get()) {
+ return;
+ }
+ LOG.warn("Background operation failed: " + e.toString(), e);
+ }
+ }
+
+ private void backgroundRead() {
+ String id = Utils.getIdFromPath("/");
+ Map<String, Object> map = store.find(DocumentStore.Collection.NODES,
id, asyncDelay);
+ @SuppressWarnings("unchecked")
+ Map<String, String> lastRevMap = (Map<String, String>)
map.get(UpdateOp.LAST_REV);
+
+ for (Entry<String, String> e : lastRevMap.entrySet()) {
+ int machineId = Integer.parseInt(e.getKey());
+ if (machineId == clusterId) {
+ continue;
+ }
+ Revision r = Revision.fromString(e.getValue());
+ Revision last = lastKnownRevision.get(machineId);
+
+ if (last == null || last.compareRevisionTime(r) != 0) {
+ // TODO invalidating the whole cache is not really needed,
+ // instead only those children that are cached could be checked
+
+ store.invalidateCache();
+ lastKnownRevision.put(machineId, r);
+ // add a new revision, so that changes are visible
+ headRevision = Revision.newRevision(clusterId);
+ }
+ }
+ }
+
+ private void backgroundWrite() {
+ if (unsavedLastRevisions.size() == 0) {
+ return;
+ }
+ ArrayList<String> paths = new
ArrayList<String>(unsavedLastRevisions.keySet());
+ // sort by depth (high depth first), then path
+ Collections.sort(paths, new Comparator<String>() {
+
+ @Override
+ public int compare(String o1, String o2) {
+ int d1 = Utils.pathDepth(o1);
+ int d2 = Utils.pathDepth(o1);
+ if (d1 != d2) {
+ return Integer.signum(d1 - d2);
+ }
+ return o1.compareTo(o2);
+ }
+
+ });
+ long now = Revision.getCurrentTimestamp();
+ for (String p : paths) {
+ Revision r = unsavedLastRevisions.get(p);
+ if (r == null) {
+ continue;
+ }
+ if (Revision.getTimestampDifference(now, r.getTimestamp()) <
asyncDelay) {
+ continue;
+ }
+
+ Commit commit = new Commit(this, null, r);
+ commit.touchNode(p);
+ commit.apply();
+ }
}
public void dispose() {
@@ -377,7 +469,8 @@ public class MongoMK implements MicroKer
return false;
}
// get root of commit
- nodeMap = store.find(DocumentStore.Collection.NODES,
Utils.getIdFromPath(commitRootPath));
+ nodeMap = store.find(DocumentStore.Collection.NODES,
+ Utils.getIdFromPath(commitRootPath));
if (nodeMap == null) {
return false;
}
@@ -1075,21 +1168,68 @@ public class MongoMK implements MicroKer
return store;
}
+ public void setAsyncDelay(int delay) {
+ this.asyncDelay = delay;
+ }
+
+ public int getAsyncDelay() {
+ return asyncDelay;
+ }
+
+ /**
+ * Apply the changes of a node to the cache.
+ *
+ * @param rev the revision
+ * @param path the path
+ * @param isNew whether this is a new node
+ * @param isDelete whether the node is deleted
+ * @param isWritten whether the MongoDB documented was added / updated
+ * @param added the list of added child nodes
+ * @param removed the list of removed child nodes
+ */
+ public void applyChanges(Revision rev, String path,
+ boolean isNew, boolean isDelete, boolean isWritten,
+ ArrayList<String> added, ArrayList<String> removed) {
+ if (!isWritten) {
+ unsavedLastRevisions.put(path, rev);
+ } else {
+ // the document was updated:
+ // we no longer need to update it in a background process
+ unsavedLastRevisions.remove(path);
+ }
+ Children c = nodeChildrenCache.getIfPresent(path + "@" + rev);
+ if (isNew || (!isDelete && c != null)) {
+ String key = path + "@" + rev;
+ Children c2 = new Children(path, rev);
+ TreeSet<String> set = new TreeSet<String>();
+ if (c != null) {
+ set.addAll(c.children);
+ }
+ set.removeAll(removed);
+ set.addAll(added);
+ c2.children.addAll(set);
+ nodeChildrenCache.put(key, c2);
+ }
+ }
+
/**
* A background thread.
*/
static class BackgroundOperation implements Runnable {
final WeakReference<MongoMK> ref;
private final AtomicBoolean isDisposed;
+ private int delay;
+
BackgroundOperation(MongoMK mk, AtomicBoolean isDisposed) {
ref = new WeakReference<MongoMK>(mk);
+ delay = mk.getAsyncDelay();
this.isDisposed = isDisposed;
}
public void run() {
while (!isDisposed.get()) {
synchronized (isDisposed) {
try {
- isDisposed.wait(ASYNC_DELAY);
+ isDisposed.wait(delay);
} catch (InterruptedException e) {
// ignore
}
@@ -1097,32 +1237,111 @@ public class MongoMK implements MicroKer
MongoMK mk = ref.get();
if (mk != null) {
mk.runBackgroundOperations();
+ delay = mk.getAsyncDelay();
}
}
}
}
- public void applyChanges(Revision rev, String path,
- boolean isNew, boolean isDelete, boolean isWritten,
- ArrayList<String> added, ArrayList<String> removed) {
- if (!isWritten) {
- unsavedLastRevisions.put(path, rev);
- } else {
- unsavedLastRevisions.remove(path);
- }
- Children c = nodeChildrenCache.getIfPresent(path + "@" + rev);
- if (isNew || (!isDelete && c != null)) {
- String key = path + "@" + rev;
- Children c2 = new Children(path, rev);
- TreeSet<String> set = new TreeSet<String>();
- if (c != null) {
- set.addAll(c.children);
- }
- set.removeAll(removed);
- set.addAll(added);
- c2.children.addAll(set);
- nodeChildrenCache.put(key, c2);
+ /**
+ * A builder for a MongoMK instance.
+ */
+ public static class Builder {
+
+ private DocumentStore documentStore;
+ private BlobStore blobStore;
+ private int clusterId;
+ private int asyncDelay = 1000;
+
+ /**
+ * Set the MongoDB connection to use. By default an in-memory store is
used.
+ *
+ * @param db the MongoDB connection
+ * @return this
+ */
+ public Builder setMongoDB(DB db) {
+ if (db != null) {
+ this.documentStore = new MongoDocumentStore(db);
+ this.blobStore = new MongoBlobStore(db);
+ }
+ return this;
+ }
+
+ /**
+ * Set the document store to use. By default an in-memory store is
used.
+ *
+ * @param documentStore the document store
+ * @return this
+ */
+ public Builder setDocumentStore(DocumentStore documentStore) {
+ this.documentStore = documentStore;
+ return this;
+ }
+
+ public DocumentStore getDocumentStore() {
+ if (documentStore == null) {
+ documentStore = new MemoryDocumentStore();
+ }
+ return documentStore;
+ }
+
+ /**
+ * Set the blob store to use. By default an in-memory store is used.
+ *
+ * @param blobStore the blob store
+ * @return this
+ */
+ public Builder setBlobStore(BlobStore blobStore) {
+ this.blobStore = blobStore;
+ return this;
+ }
+
+ public BlobStore getBlobStore() {
+ if (blobStore == null) {
+ blobStore = new MemoryBlobStore();
+ }
+ return blobStore;
+ }
+
+ /**
+ * Set the cluster id to use. By default, 0 is used.
+ *
+ * @param clusterId the cluster id
+ * @return this
+ */
+ public Builder setClusterId(int clusterId) {
+ this.clusterId = clusterId;
+ return this;
+ }
+
+ public int getClusterId() {
+ return clusterId;
+ }
+
+ /**
+ * Set the maximum delay to write the last revision to the root node.
By
+ * default 1000 (meaning 1 second) is used.
+ *
+ * @param asyncDelay in milliseconds
+ * @return this
+ */
+ public Builder setAsyncDelay(int asyncDelay) {
+ this.asyncDelay = asyncDelay;
+ return this;
+ }
+
+ public int getAsyncDelay() {
+ return asyncDelay;
+ }
+
+ /**
+ * Open the MongoMK instance using the configured options.
+ *
+ * @return the MongoMK instance
+ */
+ public MongoMK open() {
+ return new MongoMK(this);
}
}
-
+
}
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Revision.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Revision.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Revision.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Revision.java
Thu Apr 4 14:48:46 2013
@@ -68,7 +68,7 @@ public class Revision {
* @return the unique revision id
*/
static Revision newRevision(int clusterId) {
- long timestamp = System.currentTimeMillis() / 100 - timestampOffset;
+ long timestamp = getCurrentTimestamp();
int c;
synchronized (Revision.class) {
if (timestamp > lastTimestamp) {
@@ -89,6 +89,26 @@ public class Revision {
return new Revision(timestamp, c, clusterId);
}
+ /**
+ * Get the timestamp value of the current date and time.
+ *
+ * @return the timestamp
+ */
+ public static long getCurrentTimestamp() {
+ return System.currentTimeMillis() / 100 - timestampOffset;
+ }
+
+ /**
+ * Get the difference between two timestamps (a - b) in milliseconds.
+ *
+ * @param a the first timestamp
+ * @param b the second timestamp
+ * @return the difference in milliseconds
+ */
+ public static long getTimestampDifference(long a, long b) {
+ return (a - b) * 100;
+ }
+
public static Revision fromString(String rev) {
if (!rev.startsWith("r")) {
throw new IllegalArgumentException(rev);
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BaseMongoMKTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BaseMongoMKTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BaseMongoMKTest.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BaseMongoMKTest.java
Thu Apr 4 14:48:46 2013
@@ -30,7 +30,7 @@ public class BaseMongoMKTest extends Bas
@Override
public void setUp() throws Exception {
DB db = mongoConnection.getDB();
- mk = new MongoMK(db, 0);
+ mk = new MongoMK.Builder().setMongoDB(db).open();
}
@Override
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BlobTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BlobTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BlobTest.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/BlobTest.java
Thu Apr 4 14:48:46 2013
@@ -50,7 +50,8 @@ public class BlobTest {
@Test
public void addBlobs() throws Exception {
- MongoMK mk = new MongoMK(openMongoConnection(), 0);
+ MongoMK mk = new MongoMK.Builder().
+ setMongoDB(openMongoConnection()).open();
long blobSize = TOTAL_SIZE / DOCUMENT_COUNT;
ArrayList<String> blobIds = new ArrayList<String>();
// use a new seed each time, to allow running the test multiple times
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
Thu Apr 4 14:48:46 2013
@@ -16,10 +16,12 @@
*/
package org.apache.jackrabbit.mongomk.prototype;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
+import org.junit.Ignore;
import org.junit.Test;
import com.mongodb.DB;
@@ -56,6 +58,35 @@ public class ClusterTest {
}
@Test
+ @Ignore
+ public void revisionVisibility() throws InterruptedException {
+ MongoMK mk1 = createMK(1);
+ MongoMK mk2 = createMK(2);
+
+ String m2h;
+ m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 2, null);
+ assertEquals("{\":childNodeCount\":0}", m2h);
+
+ mk1.commit("/", "+\"test\":{}", null, null);
+ String m1h = mk1.getNodes("/", mk1.getHeadRevision(), 0, 0, 1, null);
+ assertEquals("{\"test\":{},\":childNodeCount\":1}", m1h);
+
+ m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 2, null);
+ // not available yet...
+ assertEquals("{\":childNodeCount\":0}", m2h);
+
+ // the delay is 10 ms
+ Thread.sleep(100);
+
+ // so now it should be available
+ m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 5, null);
+ assertEquals("{\"test\":{},\":childNodeCount\":1}", m2h);
+
+ mk1.dispose();
+ mk2.dispose();
+ }
+
+ @Test
public void rollbackAfterConflict() {
MongoMK mk1 = createMK(1);
MongoMK mk2 = createMK(2);
@@ -78,18 +109,22 @@ public class ClusterTest {
private MongoMK createMK(int clusterId) {
+ MongoMK.Builder builder = new MongoMK.Builder();
if (MONGO_DB) {
DB db = MongoUtils.getConnection().getDB();
MongoUtils.dropCollections(db);
- return new MongoMK(db, clusterId);
- }
- if (ds == null) {
- ds = new MemoryDocumentStore();
- }
- if (bs == null) {
- bs = new MemoryBlobStore();
+ builder.setMongoDB(db);
+ } else {
+ if (ds == null) {
+ ds = new MemoryDocumentStore();
+ }
+ if (bs == null) {
+ bs = new MemoryBlobStore();
+ }
+ builder.setDocumentStore(ds).setBlobStore(bs);
}
- return new MongoMK(ds, bs, clusterId);
+ builder.setAsyncDelay(10);
+ return builder.setClusterId(clusterId).open();
}
}
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedClusterTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedClusterTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedClusterTest.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedClusterTest.java
Thu Apr 4 14:48:46 2013
@@ -236,18 +236,21 @@ System.out.print(msg);
}
private MongoMK createMK(int clusterId) {
+ MongoMK.Builder builder = new MongoMK.Builder();
if (MONGO_DB) {
DB db = MongoUtils.getConnection().getDB();
MongoUtils.dropCollections(db);
- return new MongoMK(db, clusterId);
+ builder.setMongoDB(db);
+ } else {
+ if (ds == null) {
+ ds = new MemoryDocumentStore();
+ }
+ if (bs == null) {
+ bs = new MemoryBlobStore();
+ }
+ builder.setDocumentStore(ds).setBlobStore(bs);
}
- if (ds == null) {
- ds = new MemoryDocumentStore();
- }
- if (bs == null) {
- bs = new MemoryBlobStore();
- }
- return new MongoMK(ds, bs, clusterId);
+ return builder.setClusterId(clusterId).open();
}
/**
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedTest.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/RandomizedTest.java
Thu Apr 4 14:48:46 2013
@@ -237,12 +237,13 @@ public class RandomizedTest {
}
private static MongoMK createMK() {
+ MongoMK.Builder builder = new MongoMK.Builder();
if (MONGO_DB) {
DB db = MongoUtils.getConnection().getDB();
MongoUtils.dropCollections(db);
- return new MongoMK(db, 0);
+ builder.setMongoDB(db);
}
- return new MongoMK();
+ return builder.open();
}
}
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java?rev=1464576&r1=1464575&r2=1464576&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java
Thu Apr 4 14:48:46 2013
@@ -42,7 +42,7 @@ public class SimpleTest {
@Test
public void test() {
- MongoMK mk = new MongoMK();
+ MongoMK mk = new MongoMK.Builder().open();
mk.dispose();
}
@@ -80,7 +80,7 @@ public class SimpleTest {
@Test
public void addNodeGetNode() {
- MongoMK mk = new MongoMK();
+ MongoMK mk = new MongoMK.Builder().open();
Revision rev = mk.newRevision();
Node n = new Node("/test", rev);
n.setProperty("name", "Hello");
@@ -351,12 +351,13 @@ public class SimpleTest {
}
private static MongoMK createMK() {
+ MongoMK.Builder builder = new MongoMK.Builder();
if (MONGO_DB) {
DB db = MongoUtils.getConnection().getDB();
MongoUtils.dropCollections(db);
- return new MongoMK(db, 0);
+ builder.setMongoDB(db);
}
- return new MongoMK();
+ return builder.open();
}
}