Author: thomasm
Date: Thu Oct 24 10:40:56 2013
New Revision: 1535336

URL: http://svn.apache.org/r1535336
Log:
OAK-1080: MongoMK: improved concurrency (fix invalid local revision order that 
breaks tests with multiple cluster nodes)

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1535336&r1=1535335&r2=1535336&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
 Thu Oct 24 10:40:56 2013
@@ -849,17 +849,30 @@ public final class MongoNodeStore
         if (!ENABLE_BACKGROUND_OPS || stopBackground) {
             return;
         }
-        synchronized (this) {
+        try {
+            
+            // does not create new revisions
+            backgroundSplit();
+            
+            // we need to protect backgroundRead as well,
+            // as increment set the head revision in the read operation
+            // (the read operation might see changes from other cluster nodes,
+            // and so create a new head revision for the current cluster node,
+            // to order revisions)
+            Lock writeLock = backgroundOperationLock.writeLock();
+            writeLock.lock();
             try {
-                backgroundSplit();
                 backgroundWrite();
                 backgroundRead();
-            } catch (RuntimeException e) {
-                if (isDisposed.get()) {
-                    return;
-                }
-                LOG.warn("Background operation failed: " + e.toString(), e);
+            } finally {
+                writeLock.unlock();
+            }
+            
+        } catch (RuntimeException e) {
+            if (isDisposed.get()) {
+                return;
             }
+            LOG.warn("Background operation failed: " + e.toString(), e);
         }
     }
 
@@ -939,67 +952,61 @@ public final class MongoNodeStore
         if (unsavedLastRevisions.getPaths().size() == 0) {
             return;
         }
-        Lock writeLock = backgroundOperationLock.writeLock();
-        writeLock.lock();
-        try {
-            ArrayList<String> paths = new 
ArrayList<String>(unsavedLastRevisions.getPaths());
-            // sort by depth (high depth first), then path
-            Collections.sort(paths, new Comparator<String>() {
-
-                @Override
-                public int compare(String o1, String o2) {
-                    int d1 = Utils.pathDepth(o1);
-                    int d2 = Utils.pathDepth(o1);
-                    if (d1 != d2) {
-                        return Integer.signum(d1 - d2);
-                    }
-                    return o1.compareTo(o2);
-                }
-
-            });
-
-            long now = Revision.getCurrentTimestamp();
-            UpdateOp updateOp = null;
-            Revision lastRev = null;
-            List<String> ids = new ArrayList<String>();
-            for (int i = 0; i < paths.size(); i++) {
-                String p = paths.get(i);
-                Revision r = unsavedLastRevisions.get(p);
-                if (r == null) {
-                    continue;
-                }
-                // FIXME: with below code fragment the root (and other nodes
-                // 'close' to the root) will not be updated in MongoDB when 
there
-                // are frequent changes.
-                if (Revision.getTimestampDifference(now, r.getTimestamp()) < 
asyncDelay) {
-                    continue;
-                }
-                int size = ids.size();
-                if (updateOp == null) {
-                    // create UpdateOp
-                    Commit commit = new Commit(this, null, r);
-                    commit.touchNode(p);
-                    updateOp = commit.getUpdateOperationForNode(p);
-                    lastRev = r;
-                    ids.add(Utils.getIdFromPath(p));
-                } else if (r.equals(lastRev)) {
-                    // use multi update when possible
-                    ids.add(Utils.getIdFromPath(p));
-                }
-                // update if this is the last path or
-                // revision is not equal to last revision
-                if (i + 1 >= paths.size() || size == ids.size()) {
-                    store.update(Collection.NODES, ids, updateOp);
-                    for (String id : ids) {
-                        unsavedLastRevisions.remove(Utils.getPathFromId(id));
-                    }
-                    ids.clear();
-                    updateOp = null;
-                    lastRev = null;
-                }
+        ArrayList<String> paths = new 
ArrayList<String>(unsavedLastRevisions.getPaths());
+        // sort by depth (high depth first), then path
+        Collections.sort(paths, new Comparator<String>() {
+
+            @Override
+            public int compare(String o1, String o2) {
+                int d1 = Utils.pathDepth(o1);
+                int d2 = Utils.pathDepth(o1);
+                if (d1 != d2) {
+                    return Integer.signum(d1 - d2);
+                }
+                return o1.compareTo(o2);
+            }
+
+        });
+
+        long now = Revision.getCurrentTimestamp();
+        UpdateOp updateOp = null;
+        Revision lastRev = null;
+        List<String> ids = new ArrayList<String>();
+        for (int i = 0; i < paths.size(); i++) {
+            String p = paths.get(i);
+            Revision r = unsavedLastRevisions.get(p);
+            if (r == null) {
+                continue;
+            }
+            // FIXME: with below code fragment the root (and other nodes
+            // 'close' to the root) will not be updated in MongoDB when there
+            // are frequent changes.
+            if (Revision.getTimestampDifference(now, r.getTimestamp()) < 
asyncDelay) {
+                continue;
+            }
+            int size = ids.size();
+            if (updateOp == null) {
+                // create UpdateOp
+                Commit commit = new Commit(this, null, r);
+                commit.touchNode(p);
+                updateOp = commit.getUpdateOperationForNode(p);
+                lastRev = r;
+                ids.add(Utils.getIdFromPath(p));
+            } else if (r.equals(lastRev)) {
+                // use multi update when possible
+                ids.add(Utils.getIdFromPath(p));
+            }
+            // update if this is the last path or
+            // revision is not equal to last revision
+            if (i + 1 >= paths.size() || size == ids.size()) {
+                store.update(Collection.NODES, ids, updateOp);
+                for (String id : ids) {
+                    unsavedLastRevisions.remove(Utils.getPathFromId(id));
+                }
+                ids.clear();
+                updateOp = null;
+                lastRev = null;
             }
-        } finally {
-            writeLock.unlock();
         }
     }
 

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java?rev=1535336&r1=1535335&r2=1535336&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
 Thu Oct 24 10:40:56 2013
@@ -380,7 +380,9 @@ public class Revision {
                         return;
                     }
                     if (last.revision.compareRevisionTime(r) > 0) {
-                        throw new IllegalArgumentException("Can not add an 
earlier revision: " + last.revision + " > " + r);
+                        throw new IllegalArgumentException(
+                                "Can not add an earlier revision: " + 
last.revision + " > " + r + 
+                                "; current cluster node is " + 
currentClusterNodeId);
                     }
                     newList = new ArrayList<RevisionRange>(list);
                 }


Reply via email to