Author: mreutegg
Date: Wed Aug 21 07:55:51 2013
New Revision: 1516107
URL: http://svn.apache.org/r1516107
Log:
OAK-926: MongoMK: split documents when they are too large
- Encapsulate access to REVISIONS, COMMIT_ROOT and other fields in NodeDocument
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnmergedBranches.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java?rev=1516107&r1=1516106&r2=1516107&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
Wed Aug 21 07:55:51 2013
@@ -16,13 +16,10 @@
*/
package org.apache.jackrabbit.oak.plugins.mongomk;
-import java.util.Map;
-
import javax.annotation.Nonnull;
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
-import org.apache.jackrabbit.oak.commons.PathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,16 +109,8 @@ class Collision {
commitRootPath = p;
} else {
// next look at commit root
- @SuppressWarnings("unchecked")
- Map<String, Integer> commitRoots = (Map<String, Integer>)
document.get(NodeDocument.COMMIT_ROOT);
- if (commitRoots != null) {
- Integer depth = commitRoots.get(revision);
- if (depth != null) {
- commitRootPath = PathUtils.getAncestorPath(p,
PathUtils.getDepth(p) - depth);
- } else {
- throwNoCommitRootException(revision, document);
- }
- } else {
+ commitRootPath = document.getCommitRootPath(revision);
+ if (commitRootPath == null) {
throwNoCommitRootException(revision, document);
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java?rev=1516107&r1=1516106&r2=1516107&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
Wed Aug 21 07:55:51 2013
@@ -129,8 +129,7 @@ public class Commit {
public void touchNode(String path) {
UpdateOp op = getUpdateOperationForNode(path);
- op.setMapEntry(NodeDocument.LAST_REV,
- String.valueOf(revision.getClusterId()), revision.toString());
+ NodeDocument.setLastRev(op, revision);
}
void updateProperty(String path, String propertyName, String value) {
@@ -219,8 +218,7 @@ public class Commit {
if (baseBranchRevision == null) {
// only apply _lastRev for trunk commits, _lastRev for
// branch commits only become visible on merge
- op.setMapEntry(NodeDocument.LAST_REV,
- String.valueOf(revision.getClusterId()),
revision.toString());
+ NodeDocument.setLastRev(op, revision);
}
if (op.isNew) {
op.setMapEntry(NodeDocument.DELETED, revision.toString(),
"false");
@@ -228,7 +226,7 @@ public class Commit {
if (op == commitRoot) {
// apply at the end
} else {
- op.setMapEntry(NodeDocument.COMMIT_ROOT, revision.toString(),
commitRootDepth);
+ NodeDocument.setCommitRoot(op, revision, commitRootDepth);
if (op.isNew()) {
newNodes.add(op);
} else {
@@ -240,7 +238,7 @@ public class Commit {
// no updates and root of commit is also new. that is,
// it is the root of a subtree added in a commit.
// so we try to add the root like all other nodes
- commitRoot.setMapEntry(NodeDocument.REVISIONS,
revision.toString(), commitValue);
+ NodeDocument.setRevision(commitRoot, revision, commitValue);
newNodes.add(commitRoot);
}
try {
@@ -253,7 +251,7 @@ public class Commit {
if (op == commitRoot) {
// don't write the commit root just yet
// (because there might be a conflict)
- commitRoot.unsetMapEntry(NodeDocument.REVISIONS,
revision.toString());
+ NodeDocument.unsetRevision(commitRoot, revision);
}
changedNodes.add(op);
}
@@ -262,7 +260,7 @@ public class Commit {
}
for (UpdateOp op : changedNodes) {
// set commit root on changed nodes
- op.setMapEntry(NodeDocument.COMMIT_ROOT, revision.toString(),
commitRootDepth);
+ NodeDocument.setCommitRoot(op, revision, commitRootDepth);
opLog.add(op);
createOrUpdateNode(store, op);
}
@@ -271,7 +269,7 @@ public class Commit {
// first to check if there was a conflict, and only then to commit
// the revision, with the revision property set)
if (changedNodes.size() > 0 || !commitRoot.isNew) {
- commitRoot.setMapEntry(NodeDocument.REVISIONS,
revision.toString(), commitValue);
+ NodeDocument.setRevision(commitRoot, revision, commitValue);
opLog.add(commitRoot);
createOrUpdateNode(store, commitRoot);
operations.put(commitRootPath, commitRoot);
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1516107&r1=1516106&r2=1516107&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
Wed Aug 21 07:55:51 2013
@@ -281,7 +281,7 @@ public class MongoMK implements MicroKer
commit.applyToDocumentStore();
} else {
// initialize branchCommits
- branches.init(store, clusterId);
+ branches.init(store, this);
}
backgroundThread = new Thread(
new BackgroundOperation(this, isDisposed),
@@ -1089,7 +1089,7 @@ public class MongoMK implements MicroKer
NodeDocument.setModified(op, mergeCommit);
if (b != null) {
for (Revision rev : b.getCommits()) {
- op.setMapEntry(NodeDocument.REVISIONS, rev.toString(), "c-" +
mergeCommit.toString());
+ NodeDocument.setRevision(op, rev, "c-" +
mergeCommit.toString());
op.containsMapEntry(NodeDocument.COLLISIONS, rev.toString(),
false);
}
if (store.findAndUpdate(Collection.NODES, op) != null) {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1516107&r1=1516106&r2=1516107&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
Wed Aug 21 07:55:51 2013
@@ -21,6 +21,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -29,7 +30,6 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.apache.jackrabbit.oak.cache.CacheValue;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
import org.slf4j.Logger;
@@ -55,7 +55,7 @@ public class NodeDocument extends Docume
* The list of revision to root commit depth mappings to find out if a
* revision is actually committed.
*/
- static final String COMMIT_ROOT = "_commitRoot";
+ private static final String COMMIT_ROOT = "_commitRoot";
/**
* The number of previous documents (documents that contain old revisions
of
@@ -87,12 +87,12 @@ public class NodeDocument extends Docume
* root of the commit. Key: revision, value: true or the base revision of
an
* un-merged branch commit.
*/
- static final String REVISIONS = "_revisions";
+ private static final String REVISIONS = "_revisions";
/**
* The last revision. Key: machine id, value: revision.
*/
- static final String LAST_REV = "_lastRev";
+ private static final String LAST_REV = "_lastRev";
private final long time = System.currentTimeMillis();
@@ -153,6 +153,58 @@ public class NodeDocument extends Docume
}
/**
+ * Gets a sorted map of uncommitted revisions of this document with the
+ * local cluster node id as returned by the {@link RevisionContext}. These
+ * are the {@link #REVISIONS} entries where {@link
Utils#isCommitted(String)}
+ * returns false.
+ *
+ * @param context the revision context.
+ * @return the uncommitted revisions of this document.
+ */
+ public SortedMap<Revision, Revision>
getUncommittedRevisions(RevisionContext context) {
+ @SuppressWarnings("unchecked")
+ Map<String, String> valueMap = (Map<String, String>)
get(NodeDocument.REVISIONS);
+ SortedMap<Revision, Revision> revisions =
+ new TreeMap<Revision,
Revision>(context.getRevisionComparator());
+ if (valueMap != null) {
+ for (Map.Entry<String, String> commit : valueMap.entrySet()) {
+ if (!Utils.isCommitted(commit.getValue())) {
+ Revision r = Revision.fromString(commit.getKey());
+ if (r.getClusterId() == context.getClusterId()) {
+ Revision b = Revision.fromString(commit.getValue());
+ revisions.put(r, b);
+ }
+ }
+ }
+ }
+ return revisions;
+ }
+
+ /**
+ * Returns the commit root path for the given <code>revision</code> or
+ * <code>null</code> if this document does not have a commit root entry for
+ * the given <code>revision</code>.
+ *
+ * @param revision a revision.
+ * @return the commit root path or <code>null</code>.
+ */
+ @CheckForNull
+ public String getCommitRootPath(String revision) {
+ @SuppressWarnings("unchecked")
+ Map<String, Integer> valueMap = (Map<String, Integer>)
get(COMMIT_ROOT);
+ if (valueMap == null) {
+ return null;
+ }
+ Integer depth = valueMap.get(revision);
+ if (depth != null) {
+ String p = Utils.getPathFromId(getId());
+ return PathUtils.getAncestorPath(p, PathUtils.getDepth(p) - depth);
+ } else {
+ return null;
+ }
+ }
+
+ /**
* Get the revision of the latest change made to this node.
*
* @param changeRev the revision of the current change
@@ -238,7 +290,7 @@ public class NodeDocument extends Docume
return true;
}
@SuppressWarnings("unchecked")
- Map<String, String> revisions = (Map<String, String>)
get(NodeDocument.REVISIONS);
+ Map<String, String> revisions = (Map<String, String>) get(REVISIONS);
if (isCommitted(context, rev, readRevision, revisions)) {
validRevisions.add(rev);
return true;
@@ -249,7 +301,7 @@ public class NodeDocument extends Docume
}
// check commit root
@SuppressWarnings("unchecked")
- Map<String, Integer> commitRoot = (Map<String, Integer>)
get(NodeDocument.COMMIT_ROOT);
+ Map<String, Integer> commitRoot = (Map<String, Integer>)
get(COMMIT_ROOT);
String commitRootPath = null;
if (commitRoot != null) {
Integer depth = commitRoot.get(rev.toString());
@@ -272,7 +324,7 @@ public class NodeDocument extends Docume
return false;
}
@SuppressWarnings("unchecked")
- Map<String, String> rootRevisions = (Map<String, String>)
doc.get(NodeDocument.REVISIONS);
+ Map<String, String> rootRevisions = (Map<String, String>)
doc.get(REVISIONS);
if (isCommitted(context, rev, readRevision, rootRevisions)) {
validRevisions.add(rev);
return true;
@@ -501,8 +553,38 @@ public class NodeDocument extends Docume
return new UpdateOp[]{old, main};
}
- public static void setModified(UpdateOp op, Revision revision) {
- op.set(MODIFIED, Commit.getModified(revision.getTimestamp()));
+ //-------------------------< UpdateOp modifiers
>---------------------------
+
+ public static void setModified(@Nonnull UpdateOp op,
+ @Nonnull Revision revision) {
+ checkNotNull(op).set(MODIFIED,
Commit.getModified(checkNotNull(revision).getTimestamp()));
+ }
+
+ public static void setRevision(@Nonnull UpdateOp op,
+ @Nonnull Revision revision,
+ @Nonnull String commitValue) {
+ checkNotNull(op).setMapEntry(REVISIONS,
+ checkNotNull(revision).toString(), checkNotNull(commitValue));
+ }
+
+ public static void unsetRevision(@Nonnull UpdateOp op,
+ @Nonnull Revision revision) {
+ checkNotNull(op).unsetMapEntry(REVISIONS,
+ checkNotNull(revision).toString());
+ }
+
+ public static void setLastRev(@Nonnull UpdateOp op,
+ @Nonnull Revision revision) {
+ checkNotNull(op).setMapEntry(LAST_REV,
+ String.valueOf(checkNotNull(revision).getClusterId()),
+ revision.toString());
+ }
+
+ public static void setCommitRoot(@Nonnull UpdateOp op,
+ @Nonnull Revision revision,
+ int commitRootDepth) {
+ checkNotNull(op).setMapEntry(COMMIT_ROOT,
+ checkNotNull(revision).toString(), commitRootDepth);
}
//----------------------------< internal
>----------------------------------
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnmergedBranches.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnmergedBranches.java?rev=1516107&r1=1516106&r2=1516107&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnmergedBranches.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnmergedBranches.java
Wed Aug 21 07:55:51 2013
@@ -18,10 +18,8 @@ package org.apache.jackrabbit.oak.plugin
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
-import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,38 +60,24 @@ class UnmergedBranches {
* <code>clusterId</code>.
*
* @param store the document store.
- * @param clusterId the cluster node id of the local MongoMK.
+ * @param context the revision context.
*/
- void init(DocumentStore store, int clusterId) {
+ void init(DocumentStore store, RevisionContext context) {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException("already initialized");
}
NodeDocument doc = store.find(Collection.NODES,
Utils.getIdFromPath("/"));
- @SuppressWarnings("unchecked")
- Map<String, String> valueMap = (Map<String, String>)
doc.get(NodeDocument.REVISIONS);
- if (valueMap != null) {
- SortedMap<Revision, Revision> tmp =
- new TreeMap<Revision, Revision>(comparator);
- for (Map.Entry<String, String> commit : valueMap.entrySet()) {
- if (!Utils.isCommitted(commit.getValue())) {
- Revision r = Revision.fromString(commit.getKey());
- if (r.getClusterId() == clusterId) {
- Revision b = Revision.fromString(commit.getValue());
- tmp.put(r, b);
- }
- }
- }
- while (!tmp.isEmpty()) {
- SortedSet<Revision> commits = new
TreeSet<Revision>(comparator);
- Revision head = tmp.lastKey();
- commits.add(head);
- Revision base = tmp.remove(head);
- while (tmp.containsKey(base)) {
- commits.add(base);
- base = tmp.remove(base);
- }
- branches.add(new Branch(commits, base, comparator));
+ SortedMap<Revision, Revision> revisions =
doc.getUncommittedRevisions(context);
+ while (!revisions.isEmpty()) {
+ SortedSet<Revision> commits = new TreeSet<Revision>(comparator);
+ Revision head = revisions.lastKey();
+ commits.add(head);
+ Revision base = revisions.remove(head);
+ while (revisions.containsKey(base)) {
+ commits.add(base);
+ base = revisions.remove(base);
}
+ branches.add(new Branch(commits, base, comparator));
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java?rev=1516107&r1=1516106&r2=1516107&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java
Wed Aug 21 07:55:51 2013
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertNul
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.util.Map;
import java.util.Random;
import org.apache.jackrabbit.mk.api.MicroKernelException;
@@ -76,7 +75,7 @@ public class SimpleTest {
n.setProperty("name", "Hello");
UpdateOp op = n.asOperation(true);
// mark as commit root
- op.setMapEntry(NodeDocument.REVISIONS, rev.toString(), "true");
+ NodeDocument.setRevision(op, rev, "c");
DocumentStore s = mk.getDocumentStore();
assertTrue(s.create(Collection.NODES, Lists.newArrayList(op)));
Node n2 = mk.getNode("/test", rev);
@@ -394,22 +393,22 @@ public class SimpleTest {
// foo must not have head in revisions and must refer to test
// as commit root (depth = 1)
NodeDocument foo = store.find(Collection.NODES, "2:/test/foo");
- assertTrue(foo.get(NodeDocument.REVISIONS) == null);
- assertEquals(1, ((Map<?, ?>)
foo.get(NodeDocument.COMMIT_ROOT)).get(head));
+ assertFalse(foo.containsRevision(head));
+ assertEquals("/test", foo.getCommitRootPath(head));
head = mk.commit("", "+\"/bar\":{}+\"/test/foo/bar\":{}", head,
null);
// root node is root of commit
rootDoc = store.find(Collection.NODES, "0:/");
- assertTrue(((Map<?, ?>)
rootDoc.get(NodeDocument.REVISIONS)).containsKey(head));
+ assertTrue(rootDoc.containsRevision(head));
// /bar refers to root nodes a commit root
NodeDocument bar = store.find(Collection.NODES, "1:/bar");
- assertEquals(0, ((Map<?, ?>)
bar.get(NodeDocument.COMMIT_ROOT)).get(head));
+ assertEquals("/", bar.getCommitRootPath(head));
// /test/foo/bar refers to root nodes a commit root
bar = store.find(Collection.NODES, "3:/test/foo/bar");
- assertEquals(0, ((Map<?, ?>)
bar.get(NodeDocument.COMMIT_ROOT)).get(head));
+ assertEquals("/", bar.getCommitRootPath(head));
} finally {
mk.dispose();