Author: thomasm
Date: Thu Oct 24 10:40:56 2013
New Revision: 1535336
URL: http://svn.apache.org/r1535336
Log:
OAK-1080: MongoMK: improved concurrency (fix invalid local revision order that
breaks tests with multiple cluster nodes)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1535336&r1=1535335&r2=1535336&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
Thu Oct 24 10:40:56 2013
@@ -849,17 +849,30 @@ public final class MongoNodeStore
if (!ENABLE_BACKGROUND_OPS || stopBackground) {
return;
}
- synchronized (this) {
+ try {
+
+ // does not create new revisions
+ backgroundSplit();
+
+ // we need to protect backgroundRead as well,
+ // as increment set the head revision in the read operation
+ // (the read operation might see changes from other cluster nodes,
+ // and so create a new head revision for the current cluster node,
+ // to order revisions)
+ Lock writeLock = backgroundOperationLock.writeLock();
+ writeLock.lock();
try {
- backgroundSplit();
backgroundWrite();
backgroundRead();
- } catch (RuntimeException e) {
- if (isDisposed.get()) {
- return;
- }
- LOG.warn("Background operation failed: " + e.toString(), e);
+ } finally {
+ writeLock.unlock();
+ }
+
+ } catch (RuntimeException e) {
+ if (isDisposed.get()) {
+ return;
}
+ LOG.warn("Background operation failed: " + e.toString(), e);
}
}
@@ -939,67 +952,61 @@ public final class MongoNodeStore
if (unsavedLastRevisions.getPaths().size() == 0) {
return;
}
- Lock writeLock = backgroundOperationLock.writeLock();
- writeLock.lock();
- try {
- ArrayList<String> paths = new
ArrayList<String>(unsavedLastRevisions.getPaths());
- // sort by depth (high depth first), then path
- Collections.sort(paths, new Comparator<String>() {
-
- @Override
- public int compare(String o1, String o2) {
- int d1 = Utils.pathDepth(o1);
- int d2 = Utils.pathDepth(o1);
- if (d1 != d2) {
- return Integer.signum(d1 - d2);
- }
- return o1.compareTo(o2);
- }
-
- });
-
- long now = Revision.getCurrentTimestamp();
- UpdateOp updateOp = null;
- Revision lastRev = null;
- List<String> ids = new ArrayList<String>();
- for (int i = 0; i < paths.size(); i++) {
- String p = paths.get(i);
- Revision r = unsavedLastRevisions.get(p);
- if (r == null) {
- continue;
- }
- // FIXME: with below code fragment the root (and other nodes
- // 'close' to the root) will not be updated in MongoDB when
there
- // are frequent changes.
- if (Revision.getTimestampDifference(now, r.getTimestamp()) <
asyncDelay) {
- continue;
- }
- int size = ids.size();
- if (updateOp == null) {
- // create UpdateOp
- Commit commit = new Commit(this, null, r);
- commit.touchNode(p);
- updateOp = commit.getUpdateOperationForNode(p);
- lastRev = r;
- ids.add(Utils.getIdFromPath(p));
- } else if (r.equals(lastRev)) {
- // use multi update when possible
- ids.add(Utils.getIdFromPath(p));
- }
- // update if this is the last path or
- // revision is not equal to last revision
- if (i + 1 >= paths.size() || size == ids.size()) {
- store.update(Collection.NODES, ids, updateOp);
- for (String id : ids) {
- unsavedLastRevisions.remove(Utils.getPathFromId(id));
- }
- ids.clear();
- updateOp = null;
- lastRev = null;
- }
+ ArrayList<String> paths = new
ArrayList<String>(unsavedLastRevisions.getPaths());
+ // sort by depth (high depth first), then path
+ Collections.sort(paths, new Comparator<String>() {
+
+ @Override
+ public int compare(String o1, String o2) {
+ int d1 = Utils.pathDepth(o1);
+ int d2 = Utils.pathDepth(o1);
+ if (d1 != d2) {
+ return Integer.signum(d1 - d2);
+ }
+ return o1.compareTo(o2);
+ }
+
+ });
+
+ long now = Revision.getCurrentTimestamp();
+ UpdateOp updateOp = null;
+ Revision lastRev = null;
+ List<String> ids = new ArrayList<String>();
+ for (int i = 0; i < paths.size(); i++) {
+ String p = paths.get(i);
+ Revision r = unsavedLastRevisions.get(p);
+ if (r == null) {
+ continue;
+ }
+ // FIXME: with below code fragment the root (and other nodes
+ // 'close' to the root) will not be updated in MongoDB when there
+ // are frequent changes.
+ if (Revision.getTimestampDifference(now, r.getTimestamp()) <
asyncDelay) {
+ continue;
+ }
+ int size = ids.size();
+ if (updateOp == null) {
+ // create UpdateOp
+ Commit commit = new Commit(this, null, r);
+ commit.touchNode(p);
+ updateOp = commit.getUpdateOperationForNode(p);
+ lastRev = r;
+ ids.add(Utils.getIdFromPath(p));
+ } else if (r.equals(lastRev)) {
+ // use multi update when possible
+ ids.add(Utils.getIdFromPath(p));
+ }
+ // update if this is the last path or
+ // revision is not equal to last revision
+ if (i + 1 >= paths.size() || size == ids.size()) {
+ store.update(Collection.NODES, ids, updateOp);
+ for (String id : ids) {
+ unsavedLastRevisions.remove(Utils.getPathFromId(id));
+ }
+ ids.clear();
+ updateOp = null;
+ lastRev = null;
}
- } finally {
- writeLock.unlock();
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java?rev=1535336&r1=1535335&r2=1535336&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
Thu Oct 24 10:40:56 2013
@@ -380,7 +380,9 @@ public class Revision {
return;
}
if (last.revision.compareRevisionTime(r) > 0) {
- throw new IllegalArgumentException("Can not add an
earlier revision: " + last.revision + " > " + r);
+ throw new IllegalArgumentException(
+ "Can not add an earlier revision: " +
last.revision + " > " + r +
+ "; current cluster node is " +
currentClusterNodeId);
}
newList = new ArrayList<RevisionRange>(list);
}