Author: mreutegg
Date: Tue Feb 11 11:44:07 2014
New Revision: 1567066
URL: http://svn.apache.org/r1567066
Log:
OAK-1406: Background operations block writes
Only acquire exclusive lock when needed. Invalidate cache in background
thread without holding lock.
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1567066&r1=1567065&r2=1567066&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
Tue Feb 11 11:44:07 2014
@@ -89,7 +89,7 @@ public class DocumentMK implements Micro
}
void backgroundRead() {
- nodeStore.backgroundRead();
+ nodeStore.backgroundRead(true);
}
void backgroundWrite() {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1567066&r1=1567065&r2=1567066&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Tue Feb 11 11:44:07 2014
@@ -26,21 +26,17 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.WeakReference;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -53,6 +49,8 @@ import com.google.common.cache.Cache;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.mk.blobs.BlobStore;
import org.apache.jackrabbit.mk.json.JsopStream;
@@ -84,11 +82,6 @@ import org.slf4j.LoggerFactory;
public final class DocumentNodeStore
implements NodeStore, RevisionContext, Observable {
- /**
- * The maximum number of document to update at once in a multi update.
- */
- static final int BACKGROUND_MULTI_UPDATE_LIMIT = 10000;
-
private static final Logger LOG =
LoggerFactory.getLogger(DocumentNodeStore.class);
/**
@@ -325,7 +318,7 @@ public final class DocumentNodeStore
// initialize branchCommits
branches.init(store, this);
// initial reading of the revisions of other cluster nodes
- backgroundRead();
+ backgroundRead(false);
if (headRevision == null) {
// no revision read from other cluster nodes
setHeadRevision(newRevision());
@@ -796,7 +789,7 @@ public final class DocumentNodeStore
if (isNew) {
CacheValue key = childNodeCacheKey(path, rev, null);
Node.Children c = new Node.Children();
- TreeSet<String> set = new TreeSet<String>(added);
+ Set<String> set = Sets.newTreeSet(added);
set.removeAll(removed);
for (String p : added) {
set.add(Utils.unshareString(p));
@@ -824,7 +817,7 @@ public final class DocumentNodeStore
NodeDocument.Children docChildren =
docChildrenCache.getIfPresent(docChildrenKey);
if (docChildren != null) {
int currentSize = docChildren.childNames.size();
- TreeSet<String> names = new
TreeSet<String>(docChildren.childNames);
+ NavigableSet<String> names =
Sets.newTreeSet(docChildren.childNames);
// incomplete cache entries must not be updated with
// names at the end of the list because there might be
// a next name in DocumentStore smaller than the one added
@@ -922,7 +915,7 @@ public final class DocumentNodeStore
// first revision is the ancestor (tailSet is inclusive)
// do not undo changes for this revision
Revision base = it.next();
- Map<String, UpdateOp> operations = new HashMap<String, UpdateOp>();
+ Map<String, UpdateOp> operations = Maps.newHashMap();
while (it.hasNext()) {
Revision reset = it.next();
getRoot(reset).compareAgainstBaseState(getRoot(base),
@@ -1227,25 +1220,12 @@ public final class DocumentNodeStore
return;
}
try {
-
- // does not create new revisions
+ // split documents (does not create new revisions)
backgroundSplit();
-
- // we need to protect backgroundRead as well,
- // as increment set the head revision in the read operation
- // (the read operation might see changes from other cluster nodes,
- // and so create a new head revision for the current cluster node,
- // to order revisions)
- Lock writeLock = backgroundOperationLock.writeLock();
- writeLock.lock();
- try {
- backgroundWrite();
- backgroundRead();
- dispatcher.contentChanged(getRoot(), null);
- } finally {
- writeLock.unlock();
- }
-
+ // write back pending updates to _lastRev
+ backgroundWrite();
+ // pull in changes from other cluster nodes
+ backgroundRead(true);
} catch (RuntimeException e) {
if (isDisposed.get()) {
return;
@@ -1261,7 +1241,13 @@ public final class DocumentNodeStore
clusterNodeInfo.renewLease(asyncDelay);
}
- void backgroundRead() {
+ /**
+ * Perform a background read and make external changes visible.
+ *
+ * @param dispatchChange whether to dispatch external changes
+ * to {@link #dispatcher}.
+ */
+ void backgroundRead(boolean dispatchChange) {
String id = Utils.getIdFromPath("/");
NodeDocument doc = store.find(Collection.NODES, id, asyncDelay);
if (doc == null) {
@@ -1270,37 +1256,51 @@ public final class DocumentNodeStore
Map<Integer, Revision> lastRevMap = doc.getLastRev();
Revision.RevisionComparator revisionComparator =
getRevisionComparator();
- boolean hasNewRevisions = false;
// the (old) head occurred first
Revision headSeen = Revision.newRevision(0);
// then we saw this new revision (from another cluster node)
Revision otherSeen = Revision.newRevision(0);
+
+ Map<Revision, Revision> externalChanges = Maps.newHashMap();
for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
int machineId = e.getKey();
if (machineId == clusterId) {
+ // ignore own lastRev
continue;
}
Revision r = e.getValue();
Revision last = lastKnownRevision.get(machineId);
if (last == null || r.compareRevisionTime(last) > 0) {
- if (!hasNewRevisions) {
- // publish our revision once before any foreign revision
-
- // the latest revisions of the current cluster node
- // happened before the latest revisions of other cluster
nodes
- revisionComparator.add(newRevision(), headSeen);
- }
- hasNewRevisions = true;
lastKnownRevision.put(machineId, r);
- revisionComparator.add(r, otherSeen);
+ externalChanges.put(r, otherSeen);
}
}
- if (hasNewRevisions) {
+
+ if (!externalChanges.isEmpty()) {
+ // invalidate caches
store.invalidateCache();
// TODO only invalidate affected items
docChildrenCache.invalidateAll();
- // the head revision is after other revisions
- setHeadRevision(newRevision());
+
+ // make sure update to revision comparator is atomic
+ // and no local commit is in progress
+ backgroundOperationLock.writeLock().lock();
+ try {
+ // the latest revisions of the current cluster node
+ // happened before the latest revisions of other cluster nodes
+ revisionComparator.add(newRevision(), headSeen);
+ // then we saw other revisions
+ for (Map.Entry<Revision, Revision> e :
externalChanges.entrySet()) {
+ revisionComparator.add(e.getKey(), e.getValue());
+ }
+ // the new head revision is after other revisions
+ setHeadRevision(newRevision());
+ if (dispatchChange) {
+ dispatcher.contentChanged(getRoot(), null);
+ }
+ } finally {
+ backgroundOperationLock.writeLock().unlock();
+ }
}
revisionComparator.purge(Revision.getCurrentTimestamp() -
REMEMBER_REVISION_ORDER_MILLIS);
}
@@ -1330,50 +1330,13 @@ public final class DocumentNodeStore
if (unsavedLastRevisions.getPaths().isEmpty()) {
return;
}
- ArrayList<String> paths = new
ArrayList<String>(unsavedLastRevisions.getPaths());
- // sort by depth (high depth first), then path
- Collections.sort(paths, PathComparator.INSTANCE);
-
- UpdateOp updateOp = null;
- Revision lastRev = null;
- List<String> ids = new ArrayList<String>();
- for (int i = 0; i < paths.size(); ) {
- String p = paths.get(i);
- Revision r = unsavedLastRevisions.get(p);
- if (r == null) {
- i++;
- continue;
- }
- int size = ids.size();
- if (updateOp == null) {
- // create UpdateOp
- Commit commit = new Commit(this, null, r);
- commit.touchNode(p);
- updateOp = commit.getUpdateOperationForNode(p);
- lastRev = r;
- ids.add(Utils.getIdFromPath(p));
- i++;
- } else if (r.equals(lastRev)) {
- // use multi update when possible
- ids.add(Utils.getIdFromPath(p));
- i++;
- }
- // call update if any of the following is true:
- // - this is the second-to-last or last path (update last path, the
- // root document, individually)
- // - revision is not equal to last revision (size of ids didn't
change)
- // - the update limit is reached
- if (i + 2 > paths.size()
- || size == ids.size()
- || ids.size() >= BACKGROUND_MULTI_UPDATE_LIMIT) {
- store.update(Collection.NODES, ids, updateOp);
- for (String id : ids) {
- unsavedLastRevisions.remove(Utils.getPathFromId(id));
- }
- ids.clear();
- updateOp = null;
- lastRev = null;
- }
+ // write back the pending _lastRevs with an exclusive lock to
+ // ensure consistency
+ backgroundOperationLock.writeLock().lock();
+ try {
+ unsavedLastRevisions.persist(this);
+ } finally {
+ backgroundOperationLock.writeLock().unlock();
}
}
@@ -1435,7 +1398,7 @@ public final class DocumentNodeStore
long minValue = Commit.getModified(minTimestamp);
String fromKey = Utils.getKeyLowerLimit(path);
String toKey = Utils.getKeyUpperLimit(path);
- Set<String> paths = new HashSet<String>();
+ Set<String> paths = Sets.newHashSet();
for (NodeDocument doc : store.query(Collection.NODES, fromKey, toKey,
NodeDocument.MODIFIED, minValue, Integer.MAX_VALUE)) {
paths.add(Utils.getPathFromId(doc.getId()));
@@ -1495,7 +1458,7 @@ public final class DocumentNodeStore
}
private void diffFewChildren(JsopWriter w, Node.Children fromChildren,
Revision fromRev, Node.Children toChildren, Revision toRev) {
- Set<String> childrenSet = new HashSet<String>(toChildren.children);
+ Set<String> childrenSet = Sets.newHashSet(toChildren.children);
for (String n : fromChildren.children) {
if (!childrenSet.contains(n)) {
w.tag('-').value(n).newline();
@@ -1513,7 +1476,7 @@ public final class DocumentNodeStore
}
}
}
- childrenSet = new HashSet<String>(fromChildren.children);
+ childrenSet = Sets.newHashSet(fromChildren.children);
for (String n : toChildren.children) {
if (!childrenSet.contains(n)) {
w.tag('+').key(n).object().endObject().newline();
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1567066&r1=1567065&r2=1567066&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
Tue Feb 11 11:44:07 2014
@@ -547,22 +547,6 @@ public class NodeDocument extends Docume
}
/**
- * Returns <code>true</code> if this node is considered deleted at the
- * given <code>readRevision</code>.
- *
- * @param context the revision context.
- * @param readRevision the read revision.
- * @param validRevisions the set of revisions already checked against
- * <code>readRevision</code> and considered valid.
- * @return <code>true</code> if deleted, <code>false</code> otherwise.
- */
- public boolean isDeleted(RevisionContext context,
- Revision readRevision,
- Set<Revision> validRevisions) {
- return getLiveRevision(context, readRevision, validRevisions) == null;
- }
-
- /**
* Get the earliest (oldest) revision where the node was alive at or before
* the provided revision, if the node was alive at the given revision.
*
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java?rev=1567066&r1=1567065&r2=1567066&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
Tue Feb 11 11:44:07 2014
@@ -16,19 +16,24 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
/**
* Keeps track of when nodes where last modified. To be persisted later by
@@ -36,6 +41,11 @@ import static com.google.common.base.Pre
*/
class UnsavedModifications {
+ /**
+ * The maximum number of document to update at once in a multi update.
+ */
+ static final int BACKGROUND_MULTI_UPDATE_LIMIT = 10000;
+
private final ConcurrentHashMap<String, Revision> map = new
ConcurrentHashMap<String, Revision>();
/**
@@ -76,11 +86,6 @@ class UnsavedModifications {
return map.get(path);
}
- @CheckForNull
- public Revision remove(String path) {
- return map.remove(path);
- }
-
@Nonnull
public Collection<String> getPaths() {
return map.keySet();
@@ -127,4 +132,63 @@ class UnsavedModifications {
});
}
}
+
+ /**
+ * Persist the pending changes to _lastRev to the given store. This method
+ * does not guarantee consistency when there are concurrent updates on
+ * this instance through {@link #put(String, Revision)}. The caller must
+ * use proper synchronization to ensure no paths are added while this
method
+ * is called.
+ *
+ * @param store the document node store.
+ */
+ public void persist(@Nonnull DocumentNodeStore store) {
+ checkNotNull(store);
+
+ ArrayList<String> paths = new ArrayList<String>(getPaths());
+ // sort by depth (high depth first), then path
+ Collections.sort(paths, PathComparator.INSTANCE);
+
+ UpdateOp updateOp = null;
+ Revision lastRev = null;
+ List<String> ids = new ArrayList<String>();
+ for (int i = 0; i < paths.size(); ) {
+ String p = paths.get(i);
+ Revision r = map.get(p);
+ if (r == null) {
+ i++;
+ continue;
+ }
+ int size = ids.size();
+ if (updateOp == null) {
+ // create UpdateOp
+ Commit commit = new Commit(store, null, r);
+ commit.touchNode(p);
+ updateOp = commit.getUpdateOperationForNode(p);
+ lastRev = r;
+ ids.add(Utils.getIdFromPath(p));
+ i++;
+ } else if (r.equals(lastRev)) {
+ // use multi update when possible
+ ids.add(Utils.getIdFromPath(p));
+ i++;
+ }
+ // call update if any of the following is true:
+ // - this is the second-to-last or last path (update last path, the
+ // root document, individually)
+ // - revision is not equal to last revision (size of ids didn't
change)
+ // - the update limit is reached
+ if (i + 2 > paths.size()
+ || size == ids.size()
+ || ids.size() >= BACKGROUND_MULTI_UPDATE_LIMIT) {
+ store.getDocumentStore().update(NODES, ids, updateOp);
+ for (String id : ids) {
+ map.remove(Utils.getPathFromId(id));
+ }
+ ids.clear();
+ updateOp = null;
+ lastRev = null;
+ }
+ }
+ }
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java?rev=1567066&r1=1567065&r2=1567066&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java
Tue Feb 11 11:44:07 2014
@@ -35,7 +35,7 @@ public class BackgroundWriteTest {
new TestStore()).setAsyncDelay(0).open();
List<String> paths = new ArrayList<String>();
StringBuilder sb = new StringBuilder();
- for (int i = 0; paths.size() <
DocumentNodeStore.BACKGROUND_MULTI_UPDATE_LIMIT * 2; i++) {
+ for (int i = 0; paths.size() <
UnsavedModifications.BACKGROUND_MULTI_UPDATE_LIMIT * 2; i++) {
String child = "node-" + i;
sb.append("+\"").append(child).append("\":{}");
for (int j = 0; j < 1000; j++) {
@@ -62,7 +62,7 @@ public class BackgroundWriteTest {
public <T extends Document> void update(Collection<T> collection,
List<String> keys,
UpdateOp updateOp) {
- assertTrue(keys.size() <=
DocumentNodeStore.BACKGROUND_MULTI_UPDATE_LIMIT);
+ assertTrue(keys.size() <=
UnsavedModifications.BACKGROUND_MULTI_UPDATE_LIMIT);
super.update(collection, keys, updateOp);
}
}