Author: mreutegg
Date: Thu Jan 24 14:46:29 2013
New Revision: 1438025

URL: http://svn.apache.org/viewvc?rev=1438025&view=rev
Log:
OAK-566: MongoMK throws exception when adding nodes concurrently

Modified:
    
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java
    
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java
    
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionTest.java
    
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java

Modified: 
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java?rev=1438025&r1=1438024&r2=1438025&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java
 Thu Jan 24 14:46:29 2013
@@ -21,8 +21,14 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.regex.Pattern;
 
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.mongomk.impl.MongoNodeStore;
 import org.apache.jackrabbit.mongomk.impl.model.MongoCommit;
 import org.apache.jackrabbit.mongomk.impl.model.MongoNode;
@@ -34,25 +40,34 @@ import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
 import com.mongodb.QueryBuilder;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 /**
- * FIXME - This is same as FetchNodesAction except that it does not require
- * the list of all valid commits upfront. It also has some optimizations on how
- * it fetches nodes. Consolidate the two.
- *
  * An action for fetching nodes.
  */
 public class FetchNodesActionNew extends BaseAction<Map<String, MongoNode>> {
 
+    /**
+     * Trunk commits within this time frame are considered in doubt and are
+     * checked more thoroughly whether they are valid.
+     */
+    private static final long IN_DOUBT_TIME_FRAME = 10000;
+
     public static final int LIMITLESS_DEPTH = -1;
     private static final Logger LOG = 
LoggerFactory.getLogger(FetchNodesActionNew.class);
 
     private final Set<String> paths;
-    private long revisionId = -1;
+    private long revisionId;
 
     private String branchId;
     private int depth = LIMITLESS_DEPTH;
 
     /**
+     * Maps valid commit revisionId to the baseRevId of the commit.
+     */
+    private final SortedMap<Long, Long> validCommits = new TreeMap<Long, 
Long>();
+
+    /**
      * Constructs a new {@code FetchNodesAction} to fetch a node and optionally
      * its descendants under the specified path.
      *
@@ -64,6 +79,7 @@ public class FetchNodesActionNew extends
     public FetchNodesActionNew(MongoNodeStore nodeStore, String path, int 
depth,
             long revisionId) {
         super(nodeStore);
+        checkArgument(revisionId >= 0, "revisionId must be >= 0");
         paths = new HashSet<String>();
         paths.add(path);
         this.depth = depth;
@@ -80,6 +96,7 @@ public class FetchNodesActionNew extends
      */
     public FetchNodesActionNew(MongoNodeStore nodeStore, Set<String> paths, 
long revisionId) {
         super(nodeStore);
+        checkArgument(revisionId >= 0, "revisionId must be >= 0");
         this.paths = paths;
         this.revisionId = revisionId;
     }
@@ -101,7 +118,7 @@ public class FetchNodesActionNew extends
 
         // FIXME - Should deal with multiple paths as long as depth = 0
         if (paths.size() == 1 && depth == 0) {
-            String path = paths.toArray(new String[0])[0];
+            String path = paths.iterator().next();
             MongoNode node = nodeStore.getFromCache(path, branchId, 
revisionId);
             if (node != null) {
                 Map<String, MongoNode> nodes = new HashMap<String, 
MongoNode>();
@@ -123,7 +140,7 @@ public class FetchNodesActionNew extends
         if (paths.size() > 1) {
             queryBuilder = queryBuilder.in(paths);
         } else {
-            String path = paths.toArray(new String[0])[0];
+            String path = paths.iterator().next();
             if (depth == 0) {
                 queryBuilder = queryBuilder.is(path);
             } else {
@@ -133,9 +150,7 @@ public class FetchNodesActionNew extends
         }
 
         // FIXME - This needs to be improved to not fetch all revisions of a 
path.
-        if (revisionId > -1) {
-            queryBuilder = 
queryBuilder.and(MongoNode.KEY_REVISION_ID).lessThanEquals(revisionId);
-        }
+        queryBuilder = 
queryBuilder.and(MongoNode.KEY_REVISION_ID).lessThanEquals(revisionId);
 
         if (branchId == null) {
             DBObject query = new BasicDBObject(MongoNode.KEY_BRANCH_ID, new 
BasicDBObject("$exists", false));
@@ -189,9 +204,15 @@ public class FetchNodesActionNew extends
             numberOfNodesToFetch = 1;
         }
 
-        Map<String, MongoNode> nodes = new HashMap<String, MongoNode>();
-        Map<Long, MongoCommit> commits = new HashMap<Long, MongoCommit>();
+        // make sure we read from a valid commit
+        MongoCommit commit = fetchCommit(revisionId);
+        if (commit != null) {
+            validCommits.put(revisionId, commit.getBaseRevisionId());
+        } else {
+            throw new MicroKernelException("Invalid revision: " + revisionId);
+        }
 
+        Map<String, MongoNode> nodes = new HashMap<String, MongoNode>();
         while (dbCursor.hasNext() && (numberOfNodesToFetch == -1 || 
nodes.size() < numberOfNodesToFetch)) {
             MongoNode node = (MongoNode)dbCursor.next();
             String path = node.getPath();
@@ -200,47 +221,105 @@ public class FetchNodesActionNew extends
             if (nodes.containsKey(path)) {
                 LOG.debug("Converted node @{} with path {} was not put into 
map"
                         + " because a newer version is available", 
node.getRevisionId(), path);
-                continue;
             } else {
                 long revisionId = node.getRevisionId();
-                LOG.debug("Converting node {} (@{})", path, revisionId);
-
-                if (!commits.containsKey(revisionId) && 
nodeStore.getFromCache(revisionId) == null) {
-                    LOG.debug("Fetching commit @{}", revisionId);
-                    FetchCommitAction action = new 
FetchCommitAction(nodeStore, revisionId);
-                    try {
-                        MongoCommit commit = action.execute();
-                        commits.put(revisionId, commit);
-                    } catch (Exception e) {
-                        LOG.debug("Node will not be converted as it is not 
part of a valid commit {} ({})",
-                                path, revisionId);
-                        continue;
-                    }
+                if (isValid(node)) {
+                    nodes.put(path, node);
+                    LOG.debug("Converted node @{} with path {} was put into 
map", revisionId, path);
+                } else {
+                    LOG.debug("Node will not be converted as it is not part of 
a valid commit {} ({})",
+                            path, revisionId);
                 }
-                nodes.put(path, node);
-                LOG.debug("Converted node @{} with path {} was put into map", 
revisionId, path);
             }
+        }
+        dbCursor.close();
+        return nodes;
+    }
 
-            // This is for unordered revision ids.
-            /*
-            MongoNode existingNodeMongo = nodeMongos.get(path);
-            if (existingNodeMongo != null) {
-                long existingRevId = existingNodeMongo.getRevisionId();
-
-                if (revisionId > existingRevId) {
-                    nodeMongos.put(path, nodeMongo);
-                    LOG.debug("Converted nodes was put into map and replaced 
{} ({})", path, revisionId);
+    /**
+     * @param node the node to check.
+     * @return <code>true</code> if the given node is from a valid commit;
+     *         <code>false</code> otherwise.
+     */
+    private boolean isValid(@Nonnull MongoNode node) {
+        long revisionId = node.getRevisionId();
+        if (!validCommits.containsKey(revisionId) && 
nodeStore.getFromCache(revisionId) == null) {
+            if (branchId == null) {
+                // check if the given revisionId is a valid trunk commit
+                return isValidTrunkCommit(revisionId);
+            } else {
+                // for branch commits we only check the failed flag and
+                // assume there are no concurrent branch commits
+                // FIXME: may need to revisit this
+                MongoCommit commit = fetchCommit(revisionId);
+                if (commit != null) {
+                    validCommits.put(revisionId, commit.getBaseRevisionId());
                 } else {
-                    LOG.debug("Converted nodes was not put into map because a 
newer version"
-                            + " is available {} ({})", path, revisionId);
+                    return false;
                 }
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Checks if the given <code>revisionId</code> is from a valid trunk
+     * commit.
+     *
+     * @param revisionId the commit revision.
+     * @return whether the revisionId is valid.
+     */
+    private boolean isValidTrunkCommit(long revisionId) {
+        if (validCommits.containsKey(revisionId)) {
+            return true;
+        }
+        // if there is a lower valid revision than revisionId, we
+        // know it is invalid
+        if (!validCommits.headMap(revisionId).isEmpty()) {
+            return false;
+        }
+        // at this point we know the validCommits does not go
+        // back in history far enough to know if the revisionId is valid.
+        // need to fetch base commit of oldest valid commit
+        long inDoubt = System.currentTimeMillis() - IN_DOUBT_TIME_FRAME;
+        MongoCommit commit;
+        do {
+            // base revision of the oldest known valid commit
+            long baseRev = validCommits.values().iterator().next();
+            commit = fetchCommit(baseRev);
+            if (commit.getBaseRevisionId() != null) {
+                validCommits.put(commit.getRevisionId(), 
commit.getBaseRevisionId());
             } else {
-                nodeMongos.put(path, nodeMongo);
-                LOG.debug("Converted node @{} with path {} was put into map", 
revisionId, path);
+                // end of commit history
+            }
+            if (commit.getRevisionId() == revisionId) {
+                return true;
+            } else if (commit.getRevisionId() < revisionId) {
+                // given revisionId is between two valid revisions -> invalid
+                return false;
             }
-            */
+        } while (commit.getTimestamp() > inDoubt);
+        // revisionId is past in doubt time frame
+        // perform simple check
+        return fetchCommit(revisionId) != null;
+    }
+
+    /**
+     * Fetches the commit with the given revisionId.
+     *
+     * @param revisionId the revisionId of a commit.
+     * @return the commit or <code>null</code> if the commit does not exist or
+     *         is marked as failed.
+     */
+    @CheckForNull
+    private MongoCommit fetchCommit(long revisionId) {
+        LOG.debug("Fetching commit @{}", revisionId);
+        FetchCommitAction action = new FetchCommitAction(nodeStore, 
revisionId);
+        try {
+            return action.execute();
+        } catch (Exception e) {
+            // not a valid commit
         }
-        dbCursor.close();
-        return nodes;
+        return null;
     }
 }
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java?rev=1438025&r1=1438024&r2=1438025&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java
 Thu Jan 24 14:46:29 2013
@@ -29,7 +29,6 @@ import org.apache.jackrabbit.mk.api.Micr
 import org.apache.jackrabbit.mongomk.AbstractMongoConnectionTest;
 import org.apache.jackrabbit.mongomk.impl.blob.MongoGridFSBlobStore;
 import org.junit.After;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.mongodb.DB;
@@ -67,7 +66,6 @@ public class MongoMKConcurrentAddTest ex
      * threads do not overlap / conflict.
      */
     @Test
-    @Ignore("OAK-566")
     public void testConcurrentAdd() throws Exception {
         // create workers
         List<Callable<String>> cs = new LinkedList<Callable<String>>();

Modified: 
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionTest.java?rev=1438025&r1=1438024&r2=1438025&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionTest.java
 Thu Jan 24 14:46:29 2013
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.mongomk.im
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.mongomk.BaseMongoMicroKernelTest;
 import org.apache.jackrabbit.mongomk.api.model.Commit;
 import org.apache.jackrabbit.mongomk.api.model.Node;
@@ -65,17 +67,17 @@ public class FetchNodesActionTest extend
 
     @Test
     public void invalidLastRevision() throws Exception {
-        Long revisionId1 = addNode("a");
-        Long revisionId2 = addNode("b");
+        addNode("a");
+        addNode("b");
         Long revisionId3 = addNode("c");
 
         invalidateCommit(revisionId3);
-        List<Node> actuals = createAndExecuteQuery(revisionId3);
-
-        String json = String.format("{\"/#%2$s\" : { \"a#%1$s\" : {}, 
\"b#%2$s\" : {} }}",
-                revisionId1, revisionId2);
-        Iterator<Node> expecteds = 
NodeBuilder.build(json).getChildNodeEntries(0, -1);
-        NodeAssert.assertEquals(expecteds, actuals);
+        try {
+            createAndExecuteQuery(revisionId3);
+            fail("Expected MicroKernelException");
+        } catch (MicroKernelException e) {
+            // expected
+        }
     }
 
     @Test

Modified: 
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java?rev=1438025&r1=1438024&r2=1438025&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java
 Thu Jan 24 14:46:29 2013
@@ -26,7 +26,6 @@ import org.apache.jackrabbit.mongomk.Bas
 import org.apache.jackrabbit.mongomk.api.model.Commit;
 import org.apache.jackrabbit.mongomk.impl.MongoNodeStore;
 import org.apache.jackrabbit.mongomk.impl.model.CommitBuilder;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.junit.Assert.fail;
@@ -166,7 +165,6 @@ public class ConcurrentConflictingCommit
      * a child node with name 'a'" error due to previous commit.
      */
     @Test
-    @Ignore("OAK-566")
     public void leakedInvalidChild() throws Exception {
         mk.commit("/", "+\"c\" : {}", null, null);
         int n = 3;


Reply via email to