Author: thomasm
Date: Wed Jun 5 12:08:52 2013
New Revision: 1489832
URL: http://svn.apache.org/r1489832
Log:
OAK-857 MongoMK: support for many child nodes
Added:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/TimingDocumentStoreWrapper.java
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Node.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java?rev=1489832&r1=1489831&r2=1489832&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
Wed Jun 5 12:08:52 2013
@@ -109,11 +109,10 @@ class Collision {
if ("true".equals(value)) {
// already committed
return false;
- } else {
- // node is also commit root, but not yet committed
- // i.e. a branch commit, which is not yet merged
- commitRootPath = p;
}
+ // node is also commit root, but not yet committed
+ // i.e. a branch commit, which is not yet merged
+ commitRootPath = p;
} else {
// next look at commit root
@SuppressWarnings("unchecked")
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java?rev=1489832&r1=1489831&r2=1489832&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
Wed Jun 5 12:08:52 2013
@@ -76,10 +76,20 @@ public class Commit {
if (op == null) {
String id = Utils.getIdFromPath(path);
op = new UpdateOp(path, id, false);
+ setModified(op, revision);
operations.put(path, op);
}
return op;
}
+
+ static void setModified(UpdateOp op, Revision revision) {
+ op.set(UpdateOp.MODIFIED, getModified(revision.getTimestamp()));
+ }
+
+ public static long getModified(long timestamp) {
+ // 5 second resolution
+ return timestamp / 1000 / 5;
+ }
public Revision getRevision() {
return revision;
@@ -261,7 +271,7 @@ public class Commit {
* @param store the store
* @param op the operation
*/
- private void createOrUpdateNode(DocumentStore store, UpdateOp op) {
+ public void createOrUpdateNode(DocumentStore store, UpdateOp op) {
Map<String, Object> map = store.createOrUpdate(Collection.NODES, op);
if (baseRevision != null) {
final AtomicReference<List<Revision>> collisions = new
AtomicReference<List<Revision>>();
@@ -400,12 +410,16 @@ public class Commit {
previous++;
}
UpdateOp old = new UpdateOp(path, id + "/" + previous, true);
+ setModified(old, revision);
UpdateOp main = new UpdateOp(path, id, false);
+ setModified(main, revision);
main.set(UpdateOp.PREVIOUS, previous);
for (Entry<String, Object> e : map.entrySet()) {
String key = e.getKey();
if (key.equals(UpdateOp.ID)) {
// ok
+ } else if (key.equals(UpdateOp.MODIFIED)) {
+ // ok
} else if (key.equals(UpdateOp.PREVIOUS)) {
// ok
} else if (key.equals(UpdateOp.LAST_REV)) {
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java?rev=1489832&r1=1489831&r2=1489832&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java
Wed Jun 5 12:08:52 2013
@@ -105,9 +105,26 @@ public interface DocumentStore {
* @return the list (possibly empty)
*/
@Nonnull
- List<Map<String, Object>> query(Collection collection, String fromKey,
String toKey, int limit);
+ List<Map<String, Object>> query(Collection collection, String fromKey,
+ String toKey, int limit);
/**
+ * Get a list of documents where the key is greater than a start value and
+ * less than an end value.
+ *
+ * @param collection the collection
+ * @param fromKey the start value (excluding)
+ * @param toKey the end value (excluding)
+ * @param indexedProperty the name of the indexed property (optional)
+ * @param startValue the minimum value of the indexed property
+ * @param limit the maximum number of entries to return
+ * @return the list (possibly empty)
+ */
+ @Nonnull
+ List<Map<String, Object>> query(Collection collection, String fromKey,
+ String toKey, String indexedProperty, long startValue, int limit);
+
+ /**
* Remove a document.
*
* @param collection the collection
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java?rev=1489832&r1=1489831&r2=1489832&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java
Wed Jun 5 12:08:52 2013
@@ -70,10 +70,23 @@ public class MemoryDocumentStore impleme
@Override
@Nonnull
public List<Map<String, Object>> query(Collection collection, String
fromKey, String toKey, int limit) {
+ return query(collection, fromKey, toKey, null, 0, limit);
+ }
+
+ @Override
+ @Nonnull
+ public List<Map<String, Object>> query(Collection collection, String
fromKey,
+ String toKey, String indexedProperty, long startValue, int limit) {
ConcurrentSkipListMap<String, Map<String, Object>> map =
getMap(collection);
ConcurrentNavigableMap<String, Map<String, Object>> sub =
map.subMap(fromKey, toKey);
ArrayList<Map<String, Object>> list = new ArrayList<Map<String,
Object>>();
for (Map<String, Object> n : sub.values()) {
+ if (indexedProperty != null) {
+ Long value = (Long) n.get(indexedProperty);
+ if (value < startValue) {
+ continue;
+ }
+ }
Map<String, Object> copy = Utils.newMap();
synchronized (n) {
Utils.deepCopyMap(n, copy);
@@ -179,7 +192,7 @@ public class MemoryDocumentStore impleme
}
} else {
if (value instanceof Map) {
- Map map = (Map) value;
+ Map<?, ?> map = (Map<?, ?>) value;
if (Boolean.TRUE.equals(op.value)) {
if (!map.containsKey(kv[1])) {
return false;
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java?rev=1489832&r1=1489831&r2=1489832&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java
Wed Jun 5 12:08:52 2013
@@ -55,7 +55,8 @@ public class MongoDocumentStore implemen
/**
* The number of documents to cache.
*/
- private static final int CACHE_DOCUMENTS =
Integer.getInteger("oak.mongoMK.cacheDocs", 20 * 1024);
+ private static final int CACHE_DOCUMENTS = Integer.getInteger(
+ "oak.mongoMK.cacheDocs", 10 * 1024);
private static final boolean LOG_TIME = false;
@@ -75,13 +76,14 @@ public class MongoDocumentStore implemen
clusterNodes = db.getCollection(
Collection.CLUSTER_NODES.toString());
+ // indexes:
// the _id field is the primary key, so we don't need to define it
- // the following code is just a template in case we need more indexes
- // DBObject index = new BasicDBObject();
- // index.put(KEY_PATH, 1L);
- // DBObject options = new BasicDBObject();
- // options.put("unique", Boolean.TRUE);
- // nodesCollection.ensureIndex(index, options);
+ DBObject index = new BasicDBObject();
+ // modification time (descending)
+ index.put(UpdateOp.MODIFIED, -1L);
+ DBObject options = new BasicDBObject();
+ options.put("unique", Boolean.FALSE);
+ nodes.ensureIndex(index, options);
// TODO expire entries if the parent was changed
nodesCache = CacheBuilder.newBuilder()
@@ -176,11 +178,21 @@ public class MongoDocumentStore implemen
@Override
public List<Map<String, Object>> query(Collection collection,
String fromKey, String toKey, int limit) {
+ return query(collection, fromKey, toKey, null, 0, limit);
+ }
+
+ @Override
+ public List<Map<String, Object>> query(Collection collection,
+ String fromKey, String toKey, String indexedProperty, long
startValue, int limit) {
log("query", fromKey, toKey, limit);
DBCollection dbCollection = getDBCollection(collection);
QueryBuilder queryBuilder = QueryBuilder.start(UpdateOp.ID);
queryBuilder.greaterThanEquals(fromKey);
queryBuilder.lessThan(toKey);
+ if (indexedProperty != null) {
+ queryBuilder.and(indexedProperty);
+ queryBuilder.greaterThanEquals(startValue);
+ }
DBObject query = queryBuilder.get();
long start = start();
try {
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java?rev=1489832&r1=1489831&r2=1489832&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
Wed Jun 5 12:08:52 2013
@@ -50,6 +50,7 @@ import org.apache.jackrabbit.mongomk.Doc
import org.apache.jackrabbit.mongomk.Node.Children;
import org.apache.jackrabbit.mongomk.Revision.RevisionComparator;
import org.apache.jackrabbit.mongomk.blob.MongoBlobStore;
+import org.apache.jackrabbit.mongomk.util.TimingDocumentStoreWrapper;
import org.apache.jackrabbit.mongomk.util.Utils;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.slf4j.Logger;
@@ -74,7 +75,13 @@ public class MongoMK implements MicroKer
* The number of nodes to cache.
*/
private static final int CACHE_NODES =
- Integer.getInteger("oak.mongoMK.cacheNodes", 1024);
+ Integer.getInteger("oak.mongoMK.cacheNodes", 5 * 1024);
+
+ /**
+ * The number of diffs to cache.
+ */
+ private static final int CACHE_DIFF =
+ Integer.getInteger("oak.mongoMK.cacheNodes", 16);
/**
* When trying to access revisions that are older than this many
@@ -90,6 +97,18 @@ public class MongoMK implements MicroKer
System.getProperty("oak.mongoMK.backgroundOps", "true"));
/**
+ * Enable fast diff operations.
+ */
+ private static final boolean FAST_DIFF = Boolean.parseBoolean(
+ System.getProperty("oak.mongoMK.fastDiff", "true"));
+
+ /**
+ * The threshold where special handling for many child node starts.
+ */
+ private static final int MANY_CHILDREN_THRESHOLD = Integer.getInteger(
+ "oak.mongoMK.manyChildren", 50);
+
+ /**
* How long to remember the relative order of old revision of all cluster
* nodes, in milliseconds. The default is one hour.
*/
@@ -99,7 +118,6 @@ public class MongoMK implements MicroKer
* The delay for asynchronous operations (delayed commit propagation and
* cache update).
*/
- // TODO test observation with multiple Oak instances
protected int asyncDelay = 1000;
/**
@@ -142,6 +160,11 @@ public class MongoMK implements MicroKer
private final Cache<String, Node.Children> nodeChildrenCache;
/**
+ * Diff cache.
+ */
+ private final Cache<String, String> diffCache;
+
+ /**
* The unsaved last revisions. This contains the parents of all changed
* nodes, once those nodes are committed but the parent node itself wasn't
* committed yet. The parents are not immediately persisted as this would
@@ -185,7 +208,11 @@ public class MongoMK implements MicroKer
private boolean stopBackground;
MongoMK(Builder builder) {
- this.store = builder.getDocumentStore();
+ DocumentStore s = builder.getDocumentStore();
+ if (builder.getTiming()) {
+ s = new TimingDocumentStoreWrapper(s);
+ }
+ this.store = s;
this.blobStore = builder.getBlobStore();
int cid = builder.getClusterId();
cid = Integer.getInteger("oak.mongoMK.clusterId", cid);
@@ -213,6 +240,10 @@ public class MongoMK implements MicroKer
.maximumSize(CACHE_CHILDREN)
.build();
+ diffCache = CacheBuilder.newBuilder()
+ .maximumSize(CACHE_DIFF)
+ .build();
+
init();
// initial reading of the revisions of other cluster nodes
backgroundRead();
@@ -580,20 +611,27 @@ public class MongoMK implements MicroKer
if (children != null) {
nodeChildrenCache.put(key, children);
}
+ } else if (children.hasMore) {
+ if (limit > children.children.size()) {
+ children = readChildren(path, rev, limit);
+ if (children != null) {
+ nodeChildrenCache.put(key, children);
+ }
+ }
}
return children;
}
Node.Children readChildren(String path, Revision rev, int limit) {
- String from = PathUtils.concat(path, "a");
- from = Utils.getIdFromPath(from);
- from = from.substring(0, from.length() - 1);
- String to = PathUtils.concat(path, "z");
- to = Utils.getIdFromPath(to);
- to = to.substring(0, to.length() - 2) + "0";
- List<Map<String, Object>> list =
store.query(DocumentStore.Collection.NODES, from, to, limit);
+ String from = getPathLowerLimit(path);
+ String to = getPathUpperLimit(path);
+ List<Map<String, Object>> list =
store.query(DocumentStore.Collection.NODES,
+ from, to, limit);
Children c = new Children(path, rev);
Set<Revision> validRevisions = new HashSet<Revision>();
+ if (list.size() >= limit) {
+ c.hasMore = true;
+ }
for (Map<String, Object> e : list) {
// filter out deleted children
if (getLiveRevision(e, rev, validRevisions) == null) {
@@ -606,6 +644,20 @@ public class MongoMK implements MicroKer
}
return c;
}
+
+ private static String getPathLowerLimit(String path) {
+ String from = PathUtils.concat(path, "a");
+ from = Utils.getIdFromPath(from);
+ from = from.substring(0, from.length() - 1);
+ return from;
+ }
+
+ private static String getPathUpperLimit(String path) {
+ String to = PathUtils.concat(path, "z");
+ to = Utils.getIdFromPath(to);
+ to = to.substring(0, to.length() - 2) + "0";
+ return to;
+ }
private Node readNode(String path, Revision rev) {
String id = Utils.getIdFromPath(path);
@@ -652,6 +704,7 @@ public class MongoMK implements MicroKer
Map<String, String> valueMap = (Map<String, String>) v;
if (valueMap != null) {
if (valueMap instanceof TreeMap) {
+ // TODO instanceof should be avoided
// use descending keys (newest first) if map is sorted
valueMap = ((TreeMap<String, String>)
valueMap).descendingMap();
}
@@ -721,7 +774,18 @@ public class MongoMK implements MicroKer
}
@Override
- public String diff(String fromRevisionId, String toRevisionId, String path,
+ public synchronized String diff(String fromRevisionId, String
toRevisionId, String path,
+ int depth) throws MicroKernelException {
+ String key = fromRevisionId + "-" + toRevisionId + "-" + path + "-" +
depth;
+ String d = diffCache.getIfPresent(key);
+ if (d == null) {
+ d = diffImpl(fromRevisionId, toRevisionId, path, depth);
+ diffCache.put(key, d);
+ }
+ return d;
+ }
+
+ private String diffImpl(String fromRevisionId, String toRevisionId, String
path,
int depth) throws MicroKernelException {
if (fromRevisionId.equals(toRevisionId)) {
return "";
@@ -734,8 +798,11 @@ public class MongoMK implements MicroKer
}
fromRevisionId = stripBranchRevMarker(fromRevisionId);
toRevisionId = stripBranchRevMarker(toRevisionId);
- Node from = getNode(path, Revision.fromString(fromRevisionId));
- Node to = getNode(path, Revision.fromString(toRevisionId));
+ Revision fromRev = Revision.fromString(fromRevisionId);
+ Revision toRev = Revision.fromString(toRevisionId);
+ Node from = getNode(path, fromRev);
+ Node to = getNode(path, toRev);
+
if (from == null || to == null) {
// TODO implement correct behavior if the node does't/didn't exist
throw new MicroKernelException("Diff is only supported if the node
exists in both cases");
@@ -760,19 +827,53 @@ public class MongoMK implements MicroKer
w.tag('^').key(p).encodedValue(to.getProperty(p)).newline();
}
}
- Revision fromRev = Revision.fromString(fromRevisionId);
- Revision toRev = Revision.fromString(toRevisionId);
// TODO this does not work well for large child node lists
// use a MongoDB index instead
- Children fromChildren = getChildren(path, fromRev, Integer.MAX_VALUE);
- Children toChildren = getChildren(path, toRev, Integer.MAX_VALUE);
+ int max = MANY_CHILDREN_THRESHOLD;
+ Children fromChildren, toChildren;
+ fromChildren = getChildren(path, fromRev, max);
+ toChildren = getChildren(path, toRev, max);
+ if (!fromChildren.hasMore && !toChildren.hasMore) {
+ diffFewChildren(w, fromChildren, toChildren);
+ } else {
+ if (FAST_DIFF) {
+ diffManyChildren(w, path, fromRev, toRev);
+ } else {
+ max = Integer.MAX_VALUE;
+ fromChildren = getChildren(path, fromRev, max);
+ toChildren = getChildren(path, toRev, max);
+ diffFewChildren(w, fromChildren, toChildren);
+ }
+ }
+ return w.toString();
+ }
+
+ private void diffManyChildren(JsopWriter w, String path, Revision fromRev,
Revision toRev) {
+ long minTimestamp = Math.min(fromRev.getTimestamp(),
toRev.getTimestamp());
+ Revision rev = isRevisionNewer(fromRev, toRev) ? toRev : fromRev;
+ long minValue = Commit.getModified(minTimestamp);
+ String fromKey = getPathLowerLimit(path);
+ String toKey = getPathUpperLimit(path);
+ List<Map<String, Object>> list =
store.query(DocumentStore.Collection.NODES, fromKey, toKey,
+ UpdateOp.MODIFIED, minValue, Integer.MAX_VALUE);
+ for (Map<String, Object> e : list) {
+ if (isOlder(e, rev)) {
+ continue;
+ }
+ String id = e.get(UpdateOp.ID).toString();
+ String p = Utils.getPathFromId(id);
+ w.tag('^').key(p).object().endObject().newline();
+ }
+ }
+
+ private void diffFewChildren(JsopWriter w, Children fromChildren, Children
toChildren) {
Set<String> childrenSet = new HashSet<String>(toChildren.children);
for (String n : fromChildren.children) {
if (!childrenSet.contains(n)) {
w.tag('-').value(n).newline();
} else {
- Node n1 = getNode(n, fromRev);
- Node n2 = getNode(n, toRev);
+ Node n1 = getNode(n, fromChildren.rev);
+ Node n2 = getNode(n, toChildren.rev);
// this is not fully correct:
// a change is detected if the node changed recently,
// even if the revisions are well in the past
@@ -788,7 +889,6 @@ public class MongoMK implements MicroKer
w.tag('+').key(n).object().endObject().newline();
}
}
- return w.toString();
}
@Override
@@ -818,26 +918,26 @@ public class MongoMK implements MicroKer
throw new MicroKernelException("Only depth 0 is supported, depth
is " + depth);
}
revisionId = revisionId != null ? revisionId : headRevision.toString();
- if (revisionId.startsWith("b")) {
- // reading from the branch is reading from the trunk currently
- revisionId = stripBranchRevMarker(revisionId);
- }
+ revisionId = stripBranchRevMarker(revisionId);
Revision rev = Revision.fromString(revisionId);
Node n = getNode(path, rev);
if (n == null) {
return null;
- // throw new MicroKernelException("Node not found at path " +
path);
}
JsopStream json = new JsopStream();
boolean includeId = filter != null && filter.contains(":id");
includeId |= filter != null && filter.contains(":hash");
json.object();
n.append(json, includeId);
+ int max;
if (maxChildNodes == -1) {
+ max = MANY_CHILDREN_THRESHOLD;
maxChildNodes = Integer.MAX_VALUE;
+ } else {
+ // avoid overflow (if maxChildNodes is Integer.MAX_VALUE)
+ max = Math.max(maxChildNodes, maxChildNodes + 1);
}
- // FIXME: must not read all children!
- Children c = getChildren(path, rev, Integer.MAX_VALUE);
+ Children c = getChildren(path, rev, max);
for (long i = offset; i < c.children.size(); i++) {
if (maxChildNodes-- <= 0) {
break;
@@ -845,13 +945,14 @@ public class MongoMK implements MicroKer
String name = PathUtils.getName(c.children.get((int) i));
json.key(name).object().endObject();
}
- json.key(":childNodeCount").value(c.children.size());
+ if (c.hasMore) {
+ // TODO use a better way to notify there are more children
+ json.key(":childNodeCount").value(Integer.MAX_VALUE);
+ } else {
+ json.key(":childNodeCount").value(c.children.size());
+ }
json.endObject();
- String result = json.toString();
- // if (filter != null && filter.contains(":hash")) {
- // result = result.replaceAll("\":id\"", "\":hash\"");
- // }
- return result;
+ return json.toString();
}
@Override
@@ -1070,6 +1171,7 @@ public class MongoMK implements MicroKer
// first, search the newest deleted revision
Revision deletedRev = null;
if (valueMap instanceof TreeMap) {
+ // TODO instanceof should be avoided
// use descending keys (newest first) if map is sorted
valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
}
@@ -1113,6 +1215,34 @@ public class MongoMK implements MicroKer
return liveRev;
}
+ boolean isOlder(Map<String, Object> map, Revision rev) {
+ Revision revision;
+ for (String key : map.keySet()) {
+ if (!Utils.isPropertyName(key)) {
+ if (key.equals(UpdateOp.ID)) {
+ continue;
+ } else if (key.equals(UpdateOp.LAST_REV)) {
+ // TODO could use just this property? it would be faster
+ continue;
+ } else if (key.equals(UpdateOp.PREVIOUS)) {
+ continue;
+ } else if (key.equals(UpdateOp.MODIFIED)) {
+ continue;
+ }
+ }
+ Object v = map.get(key);
+ @SuppressWarnings("unchecked")
+ Map<String, String> valueMap = (Map<String, String>) v;
+ for (String r : valueMap.keySet()) {
+ revision = Revision.fromString(r);
+ if (!isRevisionNewer(rev, revision)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
/**
* Get the revision of the latest change made to this node.
*
@@ -1147,6 +1277,7 @@ public class MongoMK implements MicroKer
// we have seen a previous change from another cluster node
// (which might be conflicting or not) - we need to make
// sure this change is visible from now on
+ // TODO verify this is really needed
publishRevision(propRev, changeRev);
}
if (newestRev == null || isRevisionNewer(propRev, newestRev)) {
@@ -1256,6 +1387,7 @@ public class MongoMK implements MicroKer
// make branch commits visible
UpdateOp op = new UpdateOp("/", Utils.getIdFromPath("/"), false);
Revision revision = Revision.fromString(revisionId);
+ Commit.setModified(op, revision);
Branch b = branches.getBranch(revision);
if (b != null) {
for (Revision rev : b.getCommits()) {
@@ -1412,6 +1544,7 @@ public class MongoMK implements MicroKer
private BlobStore blobStore;
private int clusterId = Integer.getInteger("oak.mongoMK.clusterId",
0);
private int asyncDelay = 1000;
+ private boolean timing;
/**
* Set the MongoDB connection to use. By default an in-memory store is
used.
@@ -1428,6 +1561,21 @@ public class MongoMK implements MicroKer
}
/**
+ * Use the timing document store wrapper.
+ *
+ * @param timing whether to use the timing wrapper.
+ * @return this
+ */
+ public Builder setTiming(boolean timing) {
+ this.timing = true;
+ return this;
+ }
+
+ public boolean getTiming() {
+ return timing;
+ }
+
+ /**
* Set the document store to use. By default an in-memory store is
used.
*
* @param documentStore the document store
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Node.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Node.java?rev=1489832&r1=1489831&r2=1489832&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Node.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Node.java
Wed Jun 5 12:08:52 2013
@@ -76,6 +76,7 @@ public class Node {
String id = Utils.getIdFromPath(path);
UpdateOp op = new UpdateOp(path, id, isNew);
op.set(UpdateOp.ID, id);
+ Commit.setModified(op, rev);
op.setMapEntry(UpdateOp.DELETED, rev.toString(), "false");
op.setMapEntry(UpdateOp.LAST_REV, "" + rev.getClusterId(),
rev.toString());
for (String p : properties.keySet()) {
@@ -120,6 +121,8 @@ public class Node {
final ArrayList<String> children = new ArrayList<String>();
+ boolean hasMore;
+
Children(String path, Revision rev) {
this.path = path;
this.rev = rev;
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java?rev=1489832&r1=1489831&r2=1489832&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Revision.java
Wed Jun 5 12:08:52 2013
@@ -172,6 +172,8 @@ public class Revision {
public boolean equals(Object other) {
if (this == other) {
return true;
+ } else if (other == null) {
+ return false;
} else if (other.getClass() != this.getClass()) {
return false;
}
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java?rev=1489832&r1=1489831&r2=1489832&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java
Wed Jun 5 12:08:52 2013
@@ -81,6 +81,19 @@ public class LoggingDocumentStoreWrapper
throw convert(e);
}
}
+
+ @Override
+ @Nonnull
+ public List<Map<String, Object>> query(Collection collection, String
fromKey,
+ String toKey, String indexedProperty, long startValue, int limit) {
+ try {
+ logMethod("query", collection, fromKey, toKey, indexedProperty,
startValue, limit);
+ return logResult(store.query(collection, fromKey, toKey,
indexedProperty, startValue, limit));
+ } catch (Exception e) {
+ logException(e);
+ throw convert(e);
+ }
+ }
@Override
public void remove(Collection collection, String key) {
Added:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/TimingDocumentStoreWrapper.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/TimingDocumentStoreWrapper.java?rev=1489832&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/TimingDocumentStoreWrapper.java
(added)
+++
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/TimingDocumentStoreWrapper.java
Wed Jun 5 12:08:52 2013
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mongomk.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.mongomk.DocumentStore;
+import org.apache.jackrabbit.mongomk.UpdateOp;
+
+/**
+ * A MicroKernel wrapper that can be used to log and also time MicroKernel
+ * calls.
+ */
+public class TimingDocumentStoreWrapper implements DocumentStore {
+
+ private static final boolean DEBUG =
Boolean.parseBoolean(System.getProperty("base.debug", "true"));
+ private static final AtomicInteger NEXT_ID = new AtomicInteger();
+
+ private final DocumentStore base;
+ private final int id = NEXT_ID.getAndIncrement();
+
+ private long startTime;
+ private final Map<String, Count> counts = new HashMap<String, Count>();
+ private long lastLogTime;
+ private long totalLogTime;
+
+ /**
+ * A class that keeps track of timing data and call counts.
+ */
+ static class Count {
+ public long count;
+ public long max;
+ public long total;
+ public long paramSize;
+ public long resultSize;
+
+ void update(long time, int paramSize, int resultSize) {
+ count++;
+ if (time > max) {
+ max = time;
+ }
+ total += time;
+ this.paramSize += paramSize;
+ this.resultSize += resultSize;
+ }
+ }
+
+ public TimingDocumentStoreWrapper(DocumentStore base) {
+ this.base = base;
+ lastLogTime = now();
+ }
+
+ @Override
+ @CheckForNull
+ public Map<String, Object> find(Collection collection, String key) {
+ try {
+ long start = now();
+ Map<String, Object> result = base.find(collection, key);
+ updateAndLogTimes("find", start, 0, size(result));
+ return result;
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ @CheckForNull
+ public Map<String, Object> find(Collection collection, String key, int
maxCacheAge) {
+ try {
+ long start = now();
+ Map<String, Object> result = base.find(collection, key,
maxCacheAge);
+ updateAndLogTimes("find2", start, 0, size(result));
+ return result;
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ @Nonnull
+ public List<Map<String, Object>> query(Collection collection, String
fromKey,
+ String toKey, int limit) {
+ try {
+ long start = now();
+ List<Map<String, Object>> result = base.query(collection, fromKey,
toKey, limit);
+ updateAndLogTimes("query", start, 0, size(result));
+ return result;
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ @Nonnull
+ public List<Map<String, Object>> query(Collection collection, String
fromKey,
+ String toKey, String indexedProperty, long startValue, int limit) {
+ try {
+ long start = now();
+ List<Map<String, Object>> result = base.query(collection, fromKey,
toKey, indexedProperty, startValue, limit);
+ updateAndLogTimes("query2", start, 0, size(result));
+ return result;
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public void remove(Collection collection, String key) {
+ try {
+ long start = now();
+ base.remove(collection, key);
+ updateAndLogTimes("remove", start, 0, 0);
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public boolean create(Collection collection, List<UpdateOp> updateOps) {
+ try {
+ long start = now();
+ boolean result = base.create(collection, updateOps);
+ updateAndLogTimes("create", start, 0, 0);
+ return result;
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ @Nonnull
+ public Map<String, Object> createOrUpdate(Collection collection, UpdateOp
update)
+ throws MicroKernelException {
+ try {
+ long start = now();
+ Map<String, Object> result = base.createOrUpdate(collection,
update);
+ updateAndLogTimes("createOrUpdate", start, 0, size(result));
+ return result;
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ @CheckForNull
+ public Map<String, Object> findAndUpdate(Collection collection, UpdateOp
update)
+ throws MicroKernelException {
+ try {
+ long start = now();
+ Map<String, Object> result = base.findAndUpdate(collection,
update);
+ updateAndLogTimes("findAndUpdate", start, 0, size(result));
+ return result;
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public void invalidateCache() {
+ try {
+ long start = now();
+ base.invalidateCache();
+ updateAndLogTimes("invalidateCache", start, 0, 0);
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public void invalidateCache(Collection collection, String key) {
+ try {
+ long start = now();
+ base.invalidateCache(collection, key);
+ updateAndLogTimes("invalidateCache2", start, 0, 0);
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public void dispose() {
+ try {
+ long start = now();
+ base.dispose();
+ updateAndLogTimes("dispose", start, 0, 0);
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public boolean isCached(Collection collection, String key) {
+ try {
+ long start = now();
+ boolean result = base.isCached(collection, key);
+ updateAndLogTimes("isCached", start, 0, 0);
+ return result;
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ private static RuntimeException convert(Exception e) {
+ if (e instanceof RuntimeException) {
+ return (RuntimeException) e;
+ }
+ return new MicroKernelException("Unexpected exception: " +
e.toString(), e);
+ }
+
+ private void log(String message) {
+ if (DEBUG) {
+ System.out.println("[" + id + "] " + message);
+ }
+ }
+
+ private static int size(Map<String, Object> m) {
+ return Utils.estimateMemoryUsage(m);
+ }
+
+ private static int size(List<Map<String, Object>> list) {
+ int result = 0;
+ for (Map<String, Object> m : list) {
+ result += size(m);
+ }
+ return result;
+ }
+
+ private static long now() {
+ return System.currentTimeMillis();
+ }
+
+ private void updateAndLogTimes(String operation, long start, int
paramSize, int resultSize) {
+ long now = now();
+ if (startTime == 0) {
+ startTime = now;
+ }
+ Count c = counts.get(operation);
+ if (c == null) {
+ c = new Count();
+ counts.put(operation, c);
+ }
+ c.update(now - start, paramSize, resultSize);
+ long t = now - lastLogTime;
+ if (t >= 2000) {
+ totalLogTime += t;
+ lastLogTime = now;
+ long totalCount = 0, totalTime = 0;
+ for (Count count : counts.values()) {
+ totalCount += count.count;
+ totalTime += count.total;
+ }
+ totalCount = Math.max(1, totalCount);
+ totalTime = Math.max(1, totalTime);
+ for (Entry<String, Count> e : counts.entrySet()) {
+ c = e.getValue();
+ long count = c.count;
+ long total = c.total;
+ long in = c.paramSize / 1024 / 1024;
+ long out = c.resultSize / 1024 / 1024;
+ if (count > 0) {
+ log(e.getKey() +
+ " count " + count +
+ " " + (100 * count / totalCount) + "%" +
+ " in " + in + " out " + out +
+ " time " + total +
+ " " + (100 * total / totalTime) + "%");
+ }
+ }
+ log("all count " + totalCount + " time " + totalTime + " " +
+ (100 * totalTime / totalLogTime) + "%");
+ log("------");
+ }
+ }
+
+}
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java?rev=1489832&r1=1489831&r2=1489832&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java
(original)
+++
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/SimpleTest.java
Wed Jun 5 12:08:52 2013
@@ -306,13 +306,13 @@ public class SimpleTest {
jsop.tag('+').key(s).object().key(s).value("x").endObject();
rev = mk.commit("/", jsop.toString(),
null, null);
- nodes = mk.getNodes("/" + s, rev, 0, 0, 10, null);
+ nodes = mk.getNodes("/" + s, rev, 0, 0, 100, null);
jsop = new JsopBuilder();
jsop.object().key(s).value("x").
key(":childNodeCount").value(0).endObject();
String n = jsop.toString();
assertEquals(n, nodes);
- nodes = mk.getNodes("/", rev, 0, 0, 10, null);
+ nodes = mk.getNodes("/", rev, 0, 0, 100, null);
jsop = new JsopBuilder();
jsop.object().key(s).object().endObject().
key(":childNodeCount").value(1).endObject();