Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1532757&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
Wed Oct 16 13:23:58 2013
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import
org.apache.jackrabbit.oak.plugins.mongomk.util.LoggingDocumentStoreWrapper;
+import
org.apache.jackrabbit.oak.plugins.mongomk.util.TimingDocumentStoreWrapper;
+import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
+import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.PostCommitHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.cache.Cache;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Implementation of a NodeStore on MongoDB.
+ */
+public final class MongoNodeStore implements NodeStore, RevisionContext {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MongoNodeStore.class);
+
+ /**
+ * Do not cache more than this number of children for a document.
+ */
+ private static final int NUM_CHILDREN_CACHE_LIMIT =
Integer.getInteger("oak.mongoMK.childrenCacheLimit", 16 * 1024);
+
+ /**
+ * When trying to access revisions that are older than this many
+ * milliseconds, a warning is logged. The default is one minute.
+ */
+ private static final int WARN_REVISION_AGE =
+ Integer.getInteger("oak.mongoMK.revisionAge", 60 * 1000);
+
+ /**
+ * Enable background operations
+ */
+ private static final boolean ENABLE_BACKGROUND_OPS =
Boolean.parseBoolean(System.getProperty("oak.mongoMK.backgroundOps", "true"));
+
+ /**
+ * How long to remember the relative order of old revision of all cluster
+ * nodes, in milliseconds. The default is one hour.
+ */
+ private static final int REMEMBER_REVISION_ORDER_MILLIS = 60 * 60 * 1000;
+
+ /**
+ * The MongoDB store (might be used by multiple MongoMKs).
+ */
+ protected final DocumentStore store;
+
+ /**
+ * Whether this instance is disposed.
+ */
+ private final AtomicBoolean isDisposed = new AtomicBoolean();
+
+ /**
+ * The delay for asynchronous operations (delayed commit propagation and
+ * cache update).
+ */
+ protected int asyncDelay = 1000;
+
+ /**
+ * The cluster instance info.
+ */
+ private final ClusterNodeInfo clusterNodeInfo;
+
+ /**
+ * The unique cluster id, similar to the unique machine id in MongoDB.
+ */
+ private final int clusterId;
+
+ /**
+ * The comparator for revisions.
+ */
+ private final Revision.RevisionComparator revisionComparator;
+
+ /**
+ * Unmerged branches of this MongoNodeStore instance.
+ */
+ // TODO at some point, open (unmerged) branches
+ // need to be garbage collected (in-memory and on disk)
+ private final UnmergedBranches branches;
+
+ /**
+ * The unsaved last revisions. This contains the parents of all changed
+ * nodes, once those nodes are committed but the parent node itself wasn't
+ * committed yet. The parents are not immediately persisted as this would
+ * cause each commit to change all parents (including the root node), which
+ * would limit write scalability.
+ *
+ * Key: path, value: revision.
+ */
+ private final UnsavedModifications unsavedLastRevisions = new
UnsavedModifications();
+
+ /**
+ * Set of IDs for documents that may need to be split.
+ */
+ private final Map<String, String> splitCandidates =
Maps.newConcurrentMap();
+
+ /**
+ * The splitting point in milliseconds. If a document is split, revisions
+ * older than this number of milliseconds are moved to a different
document.
+ * The default is 0, meaning documents are never split. Revisions that are
+ * newer than this are kept in the newest document.
+ */
+ private final long splitDocumentAgeMillis;
+
+ /**
+ * The last known revision for each cluster instance.
+ *
+ * Key: the machine id, value: revision.
+ */
+ private final Map<Integer, Revision> lastKnownRevision =
+ new ConcurrentHashMap<Integer, Revision>();
+
+ /**
+ * The last known head revision. This is the last-known revision.
+ */
+ private volatile Revision headRevision;
+
+ private Thread backgroundThread;
+
+ /**
+ * Enable using simple revisions (just a counter). This feature is useful
+ * for testing.
+ */
+ private AtomicInteger simpleRevisionCounter;
+
+ private boolean stopBackground;
+
+ /**
+ * The node cache.
+ *
+ * Key: path@rev, value: node
+ */
+ private final Cache<String, Node> nodeCache;
+ private final CacheStats nodeCacheStats;
+
+ /**
+ * Child node cache.
+ *
+ * Key: path@rev, value: children
+ */
+ private final Cache<String, Node.Children> nodeChildrenCache;
+ private final CacheStats nodeChildrenCacheStats;
+
+ /**
+ * Child doc cache.
+ */
+ private final Cache<String, NodeDocument.Children> docChildrenCache;
+ private final CacheStats docChildrenCacheStats;
+
+ public MongoNodeStore(MongoMK.Builder builder) {
+ if (builder.isUseSimpleRevision()) {
+ this.simpleRevisionCounter = new AtomicInteger(0);
+ }
+ DocumentStore s = builder.getDocumentStore();
+ if (builder.getTiming()) {
+ s = new TimingDocumentStoreWrapper(s);
+ }
+ if (builder.getLogging()) {
+ s = new LoggingDocumentStoreWrapper(s);
+ }
+ this.store = s;
+ int cid = builder.getClusterId();
+ cid = Integer.getInteger("oak.mongoMK.clusterId", cid);
+ if (cid == 0) {
+ clusterNodeInfo = ClusterNodeInfo.getInstance(store);
+ // TODO we should ensure revisions generated from now on
+ // are never "older" than revisions already in the repository for
+ // this cluster id
+ cid = clusterNodeInfo.getId();
+ } else {
+ clusterNodeInfo = null;
+ }
+ this.clusterId = cid;
+ this.revisionComparator = new Revision.RevisionComparator(clusterId);
+ this.branches = new UnmergedBranches(getRevisionComparator());
+ this.splitDocumentAgeMillis = builder.getSplitDocumentAgeMillis();
+ this.asyncDelay = builder.getAsyncDelay();
+
+ //TODO Make stats collection configurable as it add slight overhead
+
+ nodeCache = builder.buildCache(builder.getNodeCacheSize());
+ nodeCacheStats = new CacheStats(nodeCache, "MongoMk-Node",
+ builder.getWeigher(), builder.getNodeCacheSize());
+
+ nodeChildrenCache = builder.buildCache(builder.getChildrenCacheSize());
+ nodeChildrenCacheStats = new CacheStats(nodeChildrenCache,
"MongoMk-NodeChildren",
+ builder.getWeigher(), builder.getChildrenCacheSize());
+
+ docChildrenCache =
builder.buildCache(builder.getDocChildrenCacheSize());
+ docChildrenCacheStats = new CacheStats(docChildrenCache,
"MongoMk-DocChildren",
+ builder.getWeigher(), builder.getDocChildrenCacheSize());
+
+ init();
+ // initial reading of the revisions of other cluster nodes
+ backgroundRead();
+ getRevisionComparator().add(headRevision, Revision.newRevision(0));
+ headRevision = newRevision();
+ }
+
+ void init() {
+ headRevision = newRevision();
+ Node n = readNode("/", headRevision);
+ if (n == null) {
+ // root node is missing: repository is not initialized
+ Commit commit = new Commit(this, null, headRevision);
+ n = new Node("/", headRevision);
+ commit.addNode(n);
+ commit.applyToDocumentStore();
+ } else {
+ // initialize branchCommits
+ branches.init(store, this);
+ }
+ backgroundThread = new Thread(
+ new BackgroundOperation(this, isDisposed),
+ "MongoMK background thread");
+ backgroundThread.setDaemon(true);
+ backgroundThread.start();
+ }
+
+ void dispose() {
+ // force background write (with asyncDelay > 0, the root wouldn't be
written)
+ // TODO make this more obvious / explicit
+ // TODO tests should also work if this is not done
+ asyncDelay = 0;
+ runBackgroundOperations();
+ if (!isDisposed.getAndSet(true)) {
+ synchronized (isDisposed) {
+ isDisposed.notifyAll();
+ }
+ try {
+ backgroundThread.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ if (clusterNodeInfo != null) {
+ clusterNodeInfo.dispose();
+ }
+ store.dispose();
+ LOG.info("Disposed MongoMK with clusterNodeId: {}", clusterId);
+ }
+ }
+
+ public void stopBackground() {
+ stopBackground = true;
+ }
+
+ @Nonnull
+ Revision getHeadRevision() {
+ return headRevision;
+ }
+
+ Revision setHeadRevision(Revision newHead) {
+ Revision previous = headRevision;
+ headRevision = newHead;
+ return previous;
+ }
+
+ @Nonnull
+ DocumentStore getDocumentStore() {
+ return store;
+ }
+
+ /**
+ * Create a new revision.
+ *
+ * @return the revision
+ */
+ @Nonnull
+ Revision newRevision() {
+ if (simpleRevisionCounter != null) {
+ return new Revision(simpleRevisionCounter.getAndIncrement(), 0,
clusterId);
+ }
+ return Revision.newRevision(clusterId);
+ }
+
+ public void setAsyncDelay(int delay) {
+ this.asyncDelay = delay;
+ }
+
+ public int getAsyncDelay() {
+ return asyncDelay;
+ }
+
+ public ClusterNodeInfo getClusterInfo() {
+ return clusterNodeInfo;
+ }
+
+ public CacheStats getNodeCacheStats() {
+ return nodeCacheStats;
+ }
+
+ public CacheStats getNodeChildrenCacheStats() {
+ return nodeChildrenCacheStats;
+ }
+
+ public CacheStats getDocChildrenCacheStats() {
+ return docChildrenCacheStats;
+ }
+
+ public int getPendingWriteCount() {
+ return unsavedLastRevisions.getPaths().size();
+ }
+
+ public long getSplitDocumentAgeMillis() {
+ return this.splitDocumentAgeMillis;
+ }
+
+ /**
+ * Checks that revision x is newer than another revision.
+ *
+ * @param x the revision to check
+ * @param previous the presumed earlier revision
+ * @return true if x is newer
+ */
+ boolean isRevisionNewer(@Nonnull Revision x, @Nonnull Revision previous) {
+ return getRevisionComparator().compare(x, previous) > 0;
+ }
+
+ /**
+ * Enqueue the document with the given id as a split candidate.
+ *
+ * @param id the id of the document to check if it needs to be split.
+ */
+ void addSplitCandidate(String id) {
+ splitCandidates.put(id, id);
+ }
+
+ void copyNode(String sourcePath, String targetPath, Commit commit) {
+ moveOrCopyNode(false, sourcePath, targetPath, commit);
+ }
+
+ void moveNode(String sourcePath, String targetPath, Commit commit) {
+ moveOrCopyNode(true, sourcePath, targetPath, commit);
+ }
+
+ void markAsDeleted(String path, Commit commit, boolean subTreeAlso) {
+ Revision rev = commit.getBaseRevision();
+ checkState(rev != null, "Base revision of commit must not be null");
+ commit.removeNode(path);
+
+ if (subTreeAlso) {
+ // recurse down the tree
+ // TODO causes issue with large number of children
+ Node n = getNode(path, rev);
+
+ if (n != null) {
+ Node.Children c = getChildren(path, rev, Integer.MAX_VALUE);
+ for (String childPath : c.children) {
+ markAsDeleted(childPath, commit, true);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the node for the given path and revision. The returned object might
+ * not be modified directly.
+ *
+ * @param path the path of the node.
+ * @param rev the read revision.
+ * @return the node or <code>null</code> if the node does not exist at the
+ * given revision.
+ */
+ @CheckForNull
+ Node getNode(final @Nonnull String path, final @Nonnull Revision rev) {
+ checkRevisionAge(checkNotNull(rev), checkNotNull(path));
+ try {
+ String key = path + "@" + rev;
+ Node node = nodeCache.get(key, new Callable<Node>() {
+ @Override
+ public Node call() throws Exception {
+ Node n = readNode(path, rev);
+ if (n == null) {
+ n = Node.MISSING;
+ }
+ return n;
+ }
+ });
+ return node == Node.MISSING ? null : node;
+ } catch (ExecutionException e) {
+ throw new MicroKernelException(e);
+ }
+ }
+
+ public Node.Children getChildren(final String path, final Revision rev,
final int limit) throws
+ MicroKernelException {
+ checkRevisionAge(rev, path);
+ String key = path + "@" + rev;
+ Node.Children children;
+ try {
+ children = nodeChildrenCache.get(key, new
Callable<Node.Children>() {
+ @Override
+ public Node.Children call() throws Exception {
+ return readChildren(path, rev, limit);
+ }
+ });
+ } catch (ExecutionException e) {
+ throw new MicroKernelException("Error occurred while fetching
children nodes for path "+path, e);
+ }
+
+ //In case the limit > cached children size and there are more child
nodes
+ //available then refresh the cache
+ if (children.hasMore) {
+ if (limit > children.children.size()) {
+ children = readChildren(path, rev, limit);
+ if (children != null) {
+ nodeChildrenCache.put(key, children);
+ }
+ }
+ }
+ return children;
+ }
+
+ Node.Children readChildren(String path, Revision rev, int limit) {
+ // TODO use offset, to avoid O(n^2) and running out of memory
+ // to do that, use the *name* of the last entry of the previous batch
of children
+ // as the starting point
+ Iterable<NodeDocument> docs;
+ Node.Children c = new Node.Children();
+ int rawLimit = limit;
+ Set<Revision> validRevisions = new HashSet<Revision>();
+ do {
+ c.children.clear();
+ c.hasMore = true;
+ docs = readChildren(path, rawLimit);
+ int numReturned = 0;
+ for (NodeDocument doc : docs) {
+ numReturned++;
+ // filter out deleted children
+ if (doc.isDeleted(this, rev, validRevisions)) {
+ continue;
+ }
+ String p = Utils.getPathFromId(doc.getId());
+ if (c.children.size() < limit) {
+ // add to children until limit is reached
+ c.children.add(p);
+ }
+ }
+ if (numReturned < rawLimit) {
+ // fewer documents returned than requested
+ // -> no more documents
+ c.hasMore = false;
+ }
+ // double rawLimit for next round
+ rawLimit = (int) Math.min(((long) rawLimit) * 2,
Integer.MAX_VALUE);
+ } while (c.children.size() < limit && c.hasMore);
+ return c;
+ }
+
+ @Nonnull
+ Iterable<NodeDocument> readChildren(final String path, int limit) {
+ String from = Utils.getKeyLowerLimit(path);
+ String to = Utils.getKeyUpperLimit(path);
+ if (limit > NUM_CHILDREN_CACHE_LIMIT) {
+ // do not use cache
+ return store.query(Collection.NODES, from, to, limit);
+ }
+ // check cache
+ NodeDocument.Children c = docChildrenCache.getIfPresent(path);
+ if (c == null) {
+ c = new NodeDocument.Children();
+ List<NodeDocument> docs = store.query(Collection.NODES, from, to,
limit);
+ for (NodeDocument doc : docs) {
+ String p = Utils.getPathFromId(doc.getId());
+ c.childNames.add(PathUtils.getName(p));
+ }
+ c.isComplete = docs.size() < limit;
+ docChildrenCache.put(path, c);
+ } else if (c.childNames.size() < limit && !c.isComplete) {
+ // fetch more and update cache
+ String lastName = c.childNames.get(c.childNames.size() - 1);
+ String lastPath = PathUtils.concat(path, lastName);
+ from = Utils.getIdFromPath(lastPath);
+ int remainingLimit = limit - c.childNames.size();
+ List<NodeDocument> docs = store.query(Collection.NODES,
+ from, to, remainingLimit);
+ NodeDocument.Children clone = c.clone();
+ for (NodeDocument doc : docs) {
+ String p = Utils.getPathFromId(doc.getId());
+ clone.childNames.add(PathUtils.getName(p));
+ }
+ clone.isComplete = docs.size() < remainingLimit;
+ docChildrenCache.put(path, clone);
+ c = clone;
+ }
+ return Iterables.transform(c.childNames, new Function<String,
NodeDocument>() {
+ @Override
+ public NodeDocument apply(String name) {
+ String p = PathUtils.concat(path, name);
+ return store.find(Collection.NODES, Utils.getIdFromPath(p));
+ }
+ });
+ }
+
+ @CheckForNull
+ private Node readNode(String path, Revision readRevision) {
+ String id = Utils.getIdFromPath(path);
+ Revision lastRevision = getPendingModifications().get(path);
+ NodeDocument doc = store.find(Collection.NODES, id);
+ if (doc == null) {
+ return null;
+ }
+ return doc.getNodeAtRevision(this, readRevision, lastRevision);
+ }
+
+ /**
+ * Apply the changes of a node to the cache.
+ *
+ * @param rev the revision
+ * @param path the path
+ * @param isNew whether this is a new node
+ * @param isDelete whether the node is deleted
+ * @param isWritten whether the MongoDB documented was added / updated
+ * @param isBranchCommit whether this is from a branch commit
+ * @param added the list of added child nodes
+ * @param removed the list of removed child nodes
+ *
+ */
+ public void applyChanges(Revision rev, String path,
+ boolean isNew, boolean isDelete, boolean
isWritten,
+ boolean isBranchCommit, ArrayList<String> added,
+ ArrayList<String> removed) {
+ UnsavedModifications unsaved = unsavedLastRevisions;
+ if (isBranchCommit) {
+ Revision branchRev = rev.asBranchRevision();
+ unsaved =
branches.getBranch(branchRev).getModifications(branchRev);
+ }
+ // track unsaved modifications of nodes that were not
+ // written in the commit (implicitly modified parent)
+ // or any modification if this is a branch commit
+ if (!isWritten || isBranchCommit) {
+ Revision prev = unsaved.put(path, rev);
+ if (prev != null) {
+ if (isRevisionNewer(prev, rev)) {
+ // revert
+ unsaved.put(path, prev);
+ String msg = String.format("Attempt to update " +
+ "unsavedLastRevision for %s with %s, which is " +
+ "older than current %s.",
+ path, rev, prev);
+ throw new MicroKernelException(msg);
+ }
+ }
+ } else {
+ // the document was updated:
+ // we no longer need to update it in a background process
+ unsaved.remove(path);
+ }
+ String key = path + "@" + rev;
+ Node.Children c = nodeChildrenCache.getIfPresent(key);
+ if (isNew || (!isDelete && c != null)) {
+ Node.Children c2 = new Node.Children();
+ TreeSet<String> set = new TreeSet<String>();
+ if (c != null) {
+ set.addAll(c.children);
+ }
+ set.removeAll(removed);
+ for (String name : added) {
+ // make sure the name string does not contain
+ // unnecessary baggage
+ set.add(new String(name));
+ }
+ c2.children.addAll(set);
+ nodeChildrenCache.put(key, c2);
+ }
+ if (!added.isEmpty()) {
+ NodeDocument.Children docChildren =
docChildrenCache.getIfPresent(path);
+ if (docChildren != null) {
+ int currentSize = docChildren.childNames.size();
+ TreeSet<String> names = new
TreeSet<String>(docChildren.childNames);
+ // incomplete cache entries must not be updated with
+ // names at the end of the list because there might be
+ // a next name in MongoDB smaller than the one added
+ if (!docChildren.isComplete) {
+ for (String childPath : added) {
+ String name = PathUtils.getName(childPath);
+ if (names.higher(name) != null) {
+ // make sure the name string does not contain
+ // unnecessary baggage
+ names.add(new String(name));
+ }
+ }
+ } else {
+ // add all
+ for (String childPath : added) {
+ // make sure the name string does not contain
+ // unnecessary baggage
+ names.add(new String(PathUtils.getName(childPath)));
+ }
+ }
+ // any changes?
+ if (names.size() != currentSize) {
+ // create new cache entry with updated names
+ boolean complete = docChildren.isComplete;
+ docChildren = new NodeDocument.Children();
+ docChildren.isComplete = complete;
+ docChildren.childNames.addAll(names);
+ docChildrenCache.put(path, docChildren);
+ }
+ }
+ }
+ }
+
+ //-------------------------< NodeStore
>------------------------------------
+
+ @Nonnull
+ @Override
+ public NodeState getRoot() {
+ // TODO: implement
+ return null;
+ }
+
+ @Nonnull
+ @Override
+ public NodeState merge(@Nonnull NodeBuilder builder,
+ @Nonnull CommitHook commitHook,
+ PostCommitHook committed)
+ throws CommitFailedException {
+ // TODO: implement
+ return null;
+ }
+
+ @Nonnull
+ @Override
+ public NodeState rebase(@Nonnull NodeBuilder builder) {
+ // TODO: implement
+ return null;
+ }
+
+ @Override
+ public NodeState reset(@Nonnull NodeBuilder builder) {
+ // TODO: implement
+ return null;
+ }
+
+ @Override
+ public Blob createBlob(InputStream inputStream) throws IOException {
+ // TODO: implement
+ return null;
+ }
+
+ @Nonnull
+ @Override
+ public String checkpoint(long lifetime) {
+ // TODO: implement
+ return null;
+ }
+
+ @CheckForNull
+ @Override
+ public NodeState retrieve(@Nonnull String checkpoint) {
+ // TODO: implement
+ return null;
+ }
+
+ //------------------------< RevisionContext
>-------------------------------
+
+ @Override
+ public UnmergedBranches getBranches() {
+ return branches;
+ }
+
+ @Override
+ public UnsavedModifications getPendingModifications() {
+ return unsavedLastRevisions;
+ }
+
+ @Override
+ public Revision.RevisionComparator getRevisionComparator() {
+ return revisionComparator;
+ }
+
+ @Override
+ public void publishRevision(Revision foreignRevision, Revision
changeRevision) {
+ Revision.RevisionComparator revisionComparator =
getRevisionComparator();
+ if (revisionComparator.compare(headRevision, foreignRevision) >= 0) {
+ // already visible
+ return;
+ }
+ int clusterNodeId = foreignRevision.getClusterId();
+ if (clusterNodeId == this.clusterId) {
+ return;
+ }
+ // the (old) head occurred first
+ Revision headSeen = Revision.newRevision(0);
+ // then we saw this new revision (from another cluster node)
+ Revision otherSeen = Revision.newRevision(0);
+ // and after that, the current change
+ Revision changeSeen = Revision.newRevision(0);
+ revisionComparator.add(foreignRevision, otherSeen);
+ // TODO invalidating the whole cache is not really needed,
+ // but how to ensure we invalidate the right part of the cache?
+ // possibly simply wait for the background thread to pick
+ // up the changes, but this depends on how often this method is called
+ store.invalidateCache();
+ // the latest revisions of the current cluster node
+ // happened before the latest revisions of other cluster nodes
+ revisionComparator.add(headRevision, headSeen);
+ revisionComparator.add(changeRevision, changeSeen);
+ // the head revision is after other revisions
+ headRevision = Revision.newRevision(clusterId);
+ }
+
+ @Override
+ public int getClusterId() {
+ return clusterId;
+ }
+
+ //----------------------< background operations
>---------------------------
+
+ void runBackgroundOperations() {
+ if (isDisposed.get()) {
+ return;
+ }
+ backgroundRenewClusterIdLease();
+ if (simpleRevisionCounter != null) {
+ // only when using timestamp
+ return;
+ }
+ if (!ENABLE_BACKGROUND_OPS || stopBackground) {
+ return;
+ }
+ synchronized (this) {
+ try {
+ backgroundSplit();
+ backgroundWrite();
+ backgroundRead();
+ } catch (RuntimeException e) {
+ if (isDisposed.get()) {
+ return;
+ }
+ LOG.warn("Background operation failed: " + e.toString(), e);
+ }
+ }
+ }
+
+ private void backgroundRenewClusterIdLease() {
+ if (clusterNodeInfo == null) {
+ return;
+ }
+ clusterNodeInfo.renewLease(asyncDelay);
+ }
+
+ void backgroundRead() {
+ String id = Utils.getIdFromPath("/");
+ NodeDocument doc = store.find(Collection.NODES, id, asyncDelay);
+ if (doc == null) {
+ return;
+ }
+ Map<Integer, Revision> lastRevMap = doc.getLastRev();
+
+ Revision.RevisionComparator revisionComparator =
getRevisionComparator();
+ boolean hasNewRevisions = false;
+ // the (old) head occurred first
+ Revision headSeen = Revision.newRevision(0);
+ // then we saw this new revision (from another cluster node)
+ Revision otherSeen = Revision.newRevision(0);
+ for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
+ int machineId = e.getKey();
+ if (machineId == clusterId) {
+ continue;
+ }
+ Revision r = e.getValue();
+ Revision last = lastKnownRevision.get(machineId);
+ if (last == null || r.compareRevisionTime(last) > 0) {
+ lastKnownRevision.put(machineId, r);
+ hasNewRevisions = true;
+ revisionComparator.add(r, otherSeen);
+ }
+ }
+ if (hasNewRevisions) {
+ // TODO invalidating the whole cache is not really needed,
+ // instead only those children that are cached could be checked
+ store.invalidateCache();
+ // TODO only invalidate affected items
+ docChildrenCache.invalidateAll();
+ // add a new revision, so that changes are visible
+ Revision r = Revision.newRevision(clusterId);
+ // the latest revisions of the current cluster node
+ // happened before the latest revisions of other cluster nodes
+ revisionComparator.add(r, headSeen);
+ // the head revision is after other revisions
+ headRevision = Revision.newRevision(clusterId);
+ }
+ revisionComparator.purge(Revision.getCurrentTimestamp() -
REMEMBER_REVISION_ORDER_MILLIS);
+ }
+
+ private void backgroundSplit() {
+ for (Iterator<String> it = splitCandidates.keySet().iterator();
it.hasNext();) {
+ String id = it.next();
+ NodeDocument doc = store.find(Collection.NODES, id);
+ if (doc == null) {
+ continue;
+ }
+ for (UpdateOp op : doc.split(this)) {
+ NodeDocument before = store.createOrUpdate(Collection.NODES,
op);
+ if (before != null) {
+ NodeDocument after = store.find(Collection.NODES,
op.getId());
+ if (after != null) {
+ LOG.info("Split operation on {}. Size before: {},
after: {}",
+ new Object[]{id, before.getMemory(),
after.getMemory()});
+ }
+ }
+ }
+ it.remove();
+ }
+ }
+
+ void backgroundWrite() {
+ if (unsavedLastRevisions.getPaths().size() == 0) {
+ return;
+ }
+ ArrayList<String> paths = new
ArrayList<String>(unsavedLastRevisions.getPaths());
+ // sort by depth (high depth first), then path
+ Collections.sort(paths, new Comparator<String>() {
+
+ @Override
+ public int compare(String o1, String o2) {
+ int d1 = Utils.pathDepth(o1);
+ int d2 = Utils.pathDepth(o1);
+ if (d1 != d2) {
+ return Integer.signum(d1 - d2);
+ }
+ return o1.compareTo(o2);
+ }
+
+ });
+
+ long now = Revision.getCurrentTimestamp();
+ UpdateOp updateOp = null;
+ Revision lastRev = null;
+ List<String> ids = new ArrayList<String>();
+ for (int i = 0; i < paths.size(); i++) {
+ String p = paths.get(i);
+ Revision r = unsavedLastRevisions.get(p);
+ if (r == null) {
+ continue;
+ }
+ // FIXME: with below code fragment the root (and other nodes
+ // 'close' to the root) will not be updated in MongoDB when there
+ // are frequent changes.
+ if (Revision.getTimestampDifference(now, r.getTimestamp()) <
asyncDelay) {
+ continue;
+ }
+ int size = ids.size();
+ if (updateOp == null) {
+ // create UpdateOp
+ Commit commit = new Commit(this, null, r);
+ commit.touchNode(p);
+ updateOp = commit.getUpdateOperationForNode(p);
+ lastRev = r;
+ ids.add(Utils.getIdFromPath(p));
+ } else if (r.equals(lastRev)) {
+ // use multi update when possible
+ ids.add(Utils.getIdFromPath(p));
+ }
+ // update if this is the last path or
+ // revision is not equal to last revision
+ if (i + 1 >= paths.size() || size == ids.size()) {
+ store.update(Collection.NODES, ids, updateOp);
+ for (String id : ids) {
+ unsavedLastRevisions.remove(Utils.getPathFromId(id));
+ }
+ ids.clear();
+ updateOp = null;
+ lastRev = null;
+ }
+ }
+ }
+
+ //-----------------------------< internal
>---------------------------------
+
+ private void moveOrCopyNode(boolean move,
+ String sourcePath,
+ String targetPath,
+ Commit commit) {
+ // TODO Optimize - Move logic would not work well with very move of
very large subtrees
+ // At minimum we can optimize by traversing breadth wise and collect
node id
+ // and fetch them via '$in' queries
+
+ // TODO Transient Node - Current logic does not account for operations
which are part
+ // of this commit i.e. transient nodes. If its required it would need
to be looked
+ // into
+
+ Node n = getNode(sourcePath, commit.getBaseRevision());
+
+ // Node might be deleted already
+ if (n == null) {
+ return;
+ }
+
+ Node newNode = new Node(targetPath, commit.getRevision());
+ n.copyTo(newNode);
+
+ commit.addNode(newNode);
+ if (move) {
+ markAsDeleted(sourcePath, commit, false);
+ }
+ Node.Children c = getChildren(sourcePath, commit.getBaseRevision(),
Integer.MAX_VALUE);
+ for (String srcChildPath : c.children) {
+ String childName = PathUtils.getName(srcChildPath);
+ String destChildPath = PathUtils.concat(targetPath, childName);
+ moveOrCopyNode(move, srcChildPath, destChildPath, commit);
+ }
+ }
+
+ private void checkRevisionAge(Revision r, String path) {
+ // TODO only log if there are new revisions available for the given
node
+ if (LOG.isDebugEnabled()) {
+ if (headRevision.getTimestamp() - r.getTimestamp() >
WARN_REVISION_AGE) {
+ LOG.debug("Requesting an old revision for path " + path + ", "
+
+ ((headRevision.getTimestamp() - r.getTimestamp()) /
1000) + " seconds old");
+ }
+ }
+ }
+
+ /**
+ * A background thread.
+ */
+ static class BackgroundOperation implements Runnable {
+ final WeakReference<MongoNodeStore> ref;
+ private final AtomicBoolean isDisposed;
+ private int delay;
+
+ BackgroundOperation(MongoNodeStore nodeStore, AtomicBoolean
isDisposed) {
+ ref = new WeakReference<MongoNodeStore>(nodeStore);
+ delay = nodeStore.getAsyncDelay();
+ this.isDisposed = isDisposed;
+ }
+
+ @Override
+ public void run() {
+ while (delay != 0 && !isDisposed.get()) {
+ synchronized (isDisposed) {
+ try {
+ isDisposed.wait(delay);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ MongoNodeStore nodeStore = ref.get();
+ if (nodeStore != null) {
+ nodeStore.runBackgroundOperations();
+ delay = nodeStore.getAsyncDelay();
+ }
+ }
+ }
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java?rev=1532757&r1=1532756&r2=1532757&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
Wed Oct 16 13:23:58 2013
@@ -38,6 +38,7 @@ public class DocumentSplitTest extends B
@Test
public void splitRevisions() throws Exception {
DocumentStore store = mk.getDocumentStore();
+ MongoNodeStore ns = mk.getNodeStore();
Set<Revision> revisions = Sets.newHashSet();
NodeDocument doc = store.find(Collection.NODES,
Utils.getIdFromPath("/"));
assertNotNull(doc);
@@ -48,7 +49,7 @@ public class DocumentSplitTest extends B
revisions.add(Revision.fromString(mk.commit("/", "+\"foo/node-" +
revisions.size() + "\":{}" +
"+\"bar/node-" + revisions.size() + "\":{}", null, null)));
}
- mk.runBackgroundOperations();
+ ns.runBackgroundOperations();
String head = mk.getHeadRevision();
doc = store.find(Collection.NODES, Utils.getIdFromPath("/"));
assertNotNull(doc);
@@ -60,15 +61,16 @@ public class DocumentSplitTest extends B
assertTrue(doc.isCommitted(rev));
}
// check if document is still there
- assertNotNull(mk.getNode("/", Revision.fromString(head)));
+ assertNotNull(ns.getNode("/", Revision.fromString(head)));
mk.commit("/", "+\"baz\":{}", null, null);
- mk.setAsyncDelay(0);
+ ns.setAsyncDelay(0);
mk.backgroundWrite();
}
@Test
public void splitDeleted() throws Exception {
DocumentStore store = mk.getDocumentStore();
+ MongoNodeStore ns = mk.getNodeStore();
Set<Revision> revisions = Sets.newHashSet();
mk.commit("/", "+\"foo\":{}", null, null);
NodeDocument doc = store.find(Collection.NODES,
Utils.getIdFromPath("/foo"));
@@ -83,7 +85,7 @@ public class DocumentSplitTest extends B
}
create = !create;
}
- mk.runBackgroundOperations();
+ ns.runBackgroundOperations();
String head = mk.getHeadRevision();
doc = store.find(Collection.NODES, Utils.getIdFromPath("/foo"));
assertNotNull(doc);
@@ -94,7 +96,7 @@ public class DocumentSplitTest extends B
assertTrue(doc.containsRevision(rev));
assertTrue(doc.isCommitted(rev));
}
- Node node = mk.getNode("/foo", Revision.fromString(head));
+ Node node = ns.getNode("/foo", Revision.fromString(head));
// check status of node
if (create) {
assertNull(node);
@@ -106,6 +108,7 @@ public class DocumentSplitTest extends B
@Test
public void splitCommitRoot() throws Exception {
DocumentStore store = mk.getDocumentStore();
+ MongoNodeStore ns = mk.getNodeStore();
mk.commit("/", "+\"foo\":{}+\"bar\":{}", null, null);
NodeDocument doc = store.find(Collection.NODES,
Utils.getIdFromPath("/foo"));
assertNotNull(doc);
@@ -116,7 +119,7 @@ public class DocumentSplitTest extends B
commitRoots.add(Revision.fromString(mk.commit("/",
"^\"foo/prop\":" +
commitRoots.size() + "^\"bar/prop\":" +
commitRoots.size(), null, null)));
}
- mk.runBackgroundOperations();
+ ns.runBackgroundOperations();
doc = store.find(Collection.NODES, Utils.getIdFromPath("/foo"));
assertNotNull(doc);
Map<Revision, String> commits = doc.getLocalCommitRoot();
@@ -130,6 +133,7 @@ public class DocumentSplitTest extends B
@Test
public void splitPropertyRevisions() throws Exception {
DocumentStore store = mk.getDocumentStore();
+ MongoNodeStore ns = mk.getNodeStore();
mk.commit("/", "+\"foo\":{}", null, null);
NodeDocument doc = store.find(Collection.NODES,
Utils.getIdFromPath("/foo"));
assertNotNull(doc);
@@ -139,7 +143,7 @@ public class DocumentSplitTest extends B
revisions.add(Revision.fromString(mk.commit("/", "^\"foo/prop\":" +
revisions.size(), null, null)));
}
- mk.runBackgroundOperations();
+ ns.runBackgroundOperations();
doc = store.find(Collection.NODES, Utils.getIdFromPath("/foo"));
assertNotNull(doc);
Map<Revision, String> localRevs = doc.getLocalRevisions();
@@ -165,6 +169,7 @@ public class DocumentSplitTest extends B
builder = new MongoMK.Builder();
builder.setDocumentStore(ds).setBlobStore(bs).setAsyncDelay(0);
MongoMK mk1 = builder.setClusterId(1).open();
+ MongoNodeStore ns1 = mk1.getNodeStore();
mk1.commit("/", "+\"test\":{\"prop1\":0}", null, null);
// make sure the new node is visible to other MongoMK instances
@@ -173,9 +178,11 @@ public class DocumentSplitTest extends B
builder = new MongoMK.Builder();
builder.setDocumentStore(ds).setBlobStore(bs).setAsyncDelay(0);
MongoMK mk2 = builder.setClusterId(2).open();
+ MongoNodeStore ns2 = mk2.getNodeStore();
builder = new MongoMK.Builder();
builder.setDocumentStore(ds).setBlobStore(bs).setAsyncDelay(0);
MongoMK mk3 = builder.setClusterId(3).open();
+ MongoNodeStore ns3 = mk3.getNodeStore();
for (int i = 0; i < NodeDocument.REVISIONS_SPLIT_OFF_SIZE; i++) {
mk1.commit("/", "^\"test/prop1\":" + i, null, null);
@@ -183,9 +190,9 @@ public class DocumentSplitTest extends B
mk3.commit("/", "^\"test/prop3\":" + i, null, null);
}
- mk1.runBackgroundOperations();
- mk2.runBackgroundOperations();
- mk3.runBackgroundOperations();
+ ns1.runBackgroundOperations();
+ ns2.runBackgroundOperations();
+ ns3.runBackgroundOperations();
NodeDocument doc = ds.find(Collection.NODES,
Utils.getIdFromPath("/test"));
assertNotNull(doc);
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java?rev=1532757&r1=1532756&r2=1532757&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java
Wed Oct 16 13:23:58 2013
@@ -78,8 +78,9 @@ public class SimpleTest {
// mark as commit root
NodeDocument.setRevision(op, rev, "c");
DocumentStore s = mk.getDocumentStore();
+ MongoNodeStore ns = mk.getNodeStore();
assertTrue(s.create(Collection.NODES, Lists.newArrayList(op)));
- Node n2 = mk.getNode("/test", rev);
+ Node n2 = ns.getNode("/test", rev);
assertEquals("Hello", n2.getProperty("name"));
mk.dispose();
}
@@ -234,6 +235,7 @@ public class SimpleTest {
@Test
public void commit() {
MongoMK mk = createMK();
+ MongoNodeStore ns = mk.getNodeStore();
String rev = mk.commit("/", "+\"test\":{\"name\": \"Hello\"}", null,
null);
String test = mk.getNodes("/test", rev, 0, 0, Integer.MAX_VALUE, null);
@@ -243,9 +245,9 @@ public class SimpleTest {
String r1 = mk.commit("/test", "+\"b\":{\"name\": \"!\"}", null, null);
test = mk.getNodes("/test", r0, 0, 0, Integer.MAX_VALUE, null);
Children c;
- c = mk.getChildren("/", Revision.fromString(r0), Integer.MAX_VALUE);
+ c = ns.getChildren("/", Revision.fromString(r0), Integer.MAX_VALUE);
assertEquals("[/test]", c.toString());
- c = mk.getChildren("/test", Revision.fromString(r1),
Integer.MAX_VALUE);
+ c = ns.getChildren("/test", Revision.fromString(r1),
Integer.MAX_VALUE);
assertEquals("[/test/a, /test/b]", c.toString());
rev = mk.commit("", "^\"/test\":1", null, null);
@@ -259,23 +261,24 @@ public class SimpleTest {
@Test
public void delete() {
MongoMK mk = createMK();
+ MongoNodeStore ns = mk.getNodeStore();
mk.commit("/", "+\"testDel\":{\"name\": \"Hello\"}", null, null);
mk.commit("/testDel", "+\"a\":{\"name\": \"World\"}", null, null);
mk.commit("/testDel", "+\"b\":{\"name\": \"!\"}", null, null);
String r1 = mk.commit("/testDel", "+\"c\":{\"name\": \"!\"}", null,
null);
- Children c = mk.getChildren("/testDel", Revision.fromString(r1),
+ Children c = ns.getChildren("/testDel", Revision.fromString(r1),
Integer.MAX_VALUE);
assertEquals(3, c.children.size());
String r2 = mk.commit("/testDel", "-\"c\"", null, null);
- c = mk.getChildren("/testDel", Revision.fromString(r2),
+ c = ns.getChildren("/testDel", Revision.fromString(r2),
Integer.MAX_VALUE);
assertEquals(2, c.children.size());
String r3 = mk.commit("/", "-\"testDel\"", null, null);
- Node n = mk.getNode("/testDel", Revision.fromString(r3));
+ Node n = ns.getNode("/testDel", Revision.fromString(r3));
assertNull(n);
}