Author: mreutegg Date: Thu Jan 24 14:46:29 2013 New Revision: 1438025 URL: http://svn.apache.org/viewvc?rev=1438025&view=rev Log: OAK-566: MongoMK throws exception when adding nodes concurrently
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionTest.java jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java?rev=1438025&r1=1438024&r2=1438025&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java (original) +++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java Thu Jan 24 14:46:29 2013 @@ -21,8 +21,14 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.regex.Pattern; +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.mk.api.MicroKernelException; import org.apache.jackrabbit.mongomk.impl.MongoNodeStore; import org.apache.jackrabbit.mongomk.impl.model.MongoCommit; import org.apache.jackrabbit.mongomk.impl.model.MongoNode; @@ -34,25 +40,34 @@ import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.QueryBuilder; +import static com.google.common.base.Preconditions.checkArgument; + /** - * FIXME - This is same as FetchNodesAction except that it does not require - * the list of all valid commits upfront. It also has some optimizations on how - * it fetches nodes. Consolidate the two. - * * An action for fetching nodes. */ public class FetchNodesActionNew extends BaseAction<Map<String, MongoNode>> { + /** + * Trunk commits within this time frame are considered in doubt and are + * checked more thoroughly whether they are valid. + */ + private static final long IN_DOUBT_TIME_FRAME = 10000; + public static final int LIMITLESS_DEPTH = -1; private static final Logger LOG = LoggerFactory.getLogger(FetchNodesActionNew.class); private final Set<String> paths; - private long revisionId = -1; + private long revisionId; private String branchId; private int depth = LIMITLESS_DEPTH; /** + * Maps valid commit revisionId to the baseRevId of the commit. + */ + private final SortedMap<Long, Long> validCommits = new TreeMap<Long, Long>(); + + /** * Constructs a new {@code FetchNodesAction} to fetch a node and optionally * its descendants under the specified path. * @@ -64,6 +79,7 @@ public class FetchNodesActionNew extends public FetchNodesActionNew(MongoNodeStore nodeStore, String path, int depth, long revisionId) { super(nodeStore); + checkArgument(revisionId >= 0, "revisionId must be >= 0"); paths = new HashSet<String>(); paths.add(path); this.depth = depth; @@ -80,6 +96,7 @@ public class FetchNodesActionNew extends */ public FetchNodesActionNew(MongoNodeStore nodeStore, Set<String> paths, long revisionId) { super(nodeStore); + checkArgument(revisionId >= 0, "revisionId must be >= 0"); this.paths = paths; this.revisionId = revisionId; } @@ -101,7 +118,7 @@ public class FetchNodesActionNew extends // FIXME - Should deal with multiple paths as long as depth = 0 if (paths.size() == 1 && depth == 0) { - String path = paths.toArray(new String[0])[0]; + String path = paths.iterator().next(); MongoNode node = nodeStore.getFromCache(path, branchId, revisionId); if (node != null) { Map<String, MongoNode> nodes = new HashMap<String, MongoNode>(); @@ -123,7 +140,7 @@ public class FetchNodesActionNew extends if (paths.size() > 1) { queryBuilder = queryBuilder.in(paths); } else { - String path = paths.toArray(new String[0])[0]; + String path = paths.iterator().next(); if (depth == 0) { queryBuilder = queryBuilder.is(path); } else { @@ -133,9 +150,7 @@ public class FetchNodesActionNew extends } // FIXME - This needs to be improved to not fetch all revisions of a path. - if (revisionId > -1) { - queryBuilder = queryBuilder.and(MongoNode.KEY_REVISION_ID).lessThanEquals(revisionId); - } + queryBuilder = queryBuilder.and(MongoNode.KEY_REVISION_ID).lessThanEquals(revisionId); if (branchId == null) { DBObject query = new BasicDBObject(MongoNode.KEY_BRANCH_ID, new BasicDBObject("$exists", false)); @@ -189,9 +204,15 @@ public class FetchNodesActionNew extends numberOfNodesToFetch = 1; } - Map<String, MongoNode> nodes = new HashMap<String, MongoNode>(); - Map<Long, MongoCommit> commits = new HashMap<Long, MongoCommit>(); + // make sure we read from a valid commit + MongoCommit commit = fetchCommit(revisionId); + if (commit != null) { + validCommits.put(revisionId, commit.getBaseRevisionId()); + } else { + throw new MicroKernelException("Invalid revision: " + revisionId); + } + Map<String, MongoNode> nodes = new HashMap<String, MongoNode>(); while (dbCursor.hasNext() && (numberOfNodesToFetch == -1 || nodes.size() < numberOfNodesToFetch)) { MongoNode node = (MongoNode)dbCursor.next(); String path = node.getPath(); @@ -200,47 +221,105 @@ public class FetchNodesActionNew extends if (nodes.containsKey(path)) { LOG.debug("Converted node @{} with path {} was not put into map" + " because a newer version is available", node.getRevisionId(), path); - continue; } else { long revisionId = node.getRevisionId(); - LOG.debug("Converting node {} (@{})", path, revisionId); - - if (!commits.containsKey(revisionId) && nodeStore.getFromCache(revisionId) == null) { - LOG.debug("Fetching commit @{}", revisionId); - FetchCommitAction action = new FetchCommitAction(nodeStore, revisionId); - try { - MongoCommit commit = action.execute(); - commits.put(revisionId, commit); - } catch (Exception e) { - LOG.debug("Node will not be converted as it is not part of a valid commit {} ({})", - path, revisionId); - continue; - } + if (isValid(node)) { + nodes.put(path, node); + LOG.debug("Converted node @{} with path {} was put into map", revisionId, path); + } else { + LOG.debug("Node will not be converted as it is not part of a valid commit {} ({})", + path, revisionId); } - nodes.put(path, node); - LOG.debug("Converted node @{} with path {} was put into map", revisionId, path); } + } + dbCursor.close(); + return nodes; + } - // This is for unordered revision ids. - /* - MongoNode existingNodeMongo = nodeMongos.get(path); - if (existingNodeMongo != null) { - long existingRevId = existingNodeMongo.getRevisionId(); - - if (revisionId > existingRevId) { - nodeMongos.put(path, nodeMongo); - LOG.debug("Converted nodes was put into map and replaced {} ({})", path, revisionId); + /** + * @param node the node to check. + * @return <code>true</code> if the given node is from a valid commit; + * <code>false</code> otherwise. + */ + private boolean isValid(@Nonnull MongoNode node) { + long revisionId = node.getRevisionId(); + if (!validCommits.containsKey(revisionId) && nodeStore.getFromCache(revisionId) == null) { + if (branchId == null) { + // check if the given revisionId is a valid trunk commit + return isValidTrunkCommit(revisionId); + } else { + // for branch commits we only check the failed flag and + // assume there are no concurrent branch commits + // FIXME: may need to revisit this + MongoCommit commit = fetchCommit(revisionId); + if (commit != null) { + validCommits.put(revisionId, commit.getBaseRevisionId()); } else { - LOG.debug("Converted nodes was not put into map because a newer version" - + " is available {} ({})", path, revisionId); + return false; } + } + } + return true; + } + + /** + * Checks if the given <code>revisionId</code> is from a valid trunk + * commit. + * + * @param revisionId the commit revision. + * @return whether the revisionId is valid. + */ + private boolean isValidTrunkCommit(long revisionId) { + if (validCommits.containsKey(revisionId)) { + return true; + } + // if there is a lower valid revision than revisionId, we + // know it is invalid + if (!validCommits.headMap(revisionId).isEmpty()) { + return false; + } + // at this point we know the validCommits does not go + // back in history far enough to know if the revisionId is valid. + // need to fetch base commit of oldest valid commit + long inDoubt = System.currentTimeMillis() - IN_DOUBT_TIME_FRAME; + MongoCommit commit; + do { + // base revision of the oldest known valid commit + long baseRev = validCommits.values().iterator().next(); + commit = fetchCommit(baseRev); + if (commit.getBaseRevisionId() != null) { + validCommits.put(commit.getRevisionId(), commit.getBaseRevisionId()); } else { - nodeMongos.put(path, nodeMongo); - LOG.debug("Converted node @{} with path {} was put into map", revisionId, path); + // end of commit history + } + if (commit.getRevisionId() == revisionId) { + return true; + } else if (commit.getRevisionId() < revisionId) { + // given revisionId is between two valid revisions -> invalid + return false; } - */ + } while (commit.getTimestamp() > inDoubt); + // revisionId is past in doubt time frame + // perform simple check + return fetchCommit(revisionId) != null; + } + + /** + * Fetches the commit with the given revisionId. + * + * @param revisionId the revisionId of a commit. + * @return the commit or <code>null</code> if the commit does not exist or + * is marked as failed. + */ + @CheckForNull + private MongoCommit fetchCommit(long revisionId) { + LOG.debug("Fetching commit @{}", revisionId); + FetchCommitAction action = new FetchCommitAction(nodeStore, revisionId); + try { + return action.execute(); + } catch (Exception e) { + // not a valid commit } - dbCursor.close(); - return nodes; + return null; } } \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java?rev=1438025&r1=1438024&r2=1438025&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java (original) +++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java Thu Jan 24 14:46:29 2013 @@ -29,7 +29,6 @@ import org.apache.jackrabbit.mk.api.Micr import org.apache.jackrabbit.mongomk.AbstractMongoConnectionTest; import org.apache.jackrabbit.mongomk.impl.blob.MongoGridFSBlobStore; import org.junit.After; -import org.junit.Ignore; import org.junit.Test; import com.mongodb.DB; @@ -67,7 +66,6 @@ public class MongoMKConcurrentAddTest ex * threads do not overlap / conflict. */ @Test - @Ignore("OAK-566") public void testConcurrentAdd() throws Exception { // create workers List<Callable<String>> cs = new LinkedList<Callable<String>>(); Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionTest.java?rev=1438025&r1=1438024&r2=1438025&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionTest.java (original) +++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionTest.java Thu Jan 24 14:46:29 2013 @@ -19,6 +19,7 @@ package org.apache.jackrabbit.mongomk.im import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.Arrays; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.jackrabbit.mk.api.MicroKernelException; import org.apache.jackrabbit.mongomk.BaseMongoMicroKernelTest; import org.apache.jackrabbit.mongomk.api.model.Commit; import org.apache.jackrabbit.mongomk.api.model.Node; @@ -65,17 +67,17 @@ public class FetchNodesActionTest extend @Test public void invalidLastRevision() throws Exception { - Long revisionId1 = addNode("a"); - Long revisionId2 = addNode("b"); + addNode("a"); + addNode("b"); Long revisionId3 = addNode("c"); invalidateCommit(revisionId3); - List<Node> actuals = createAndExecuteQuery(revisionId3); - - String json = String.format("{\"/#%2$s\" : { \"a#%1$s\" : {}, \"b#%2$s\" : {} }}", - revisionId1, revisionId2); - Iterator<Node> expecteds = NodeBuilder.build(json).getChildNodeEntries(0, -1); - NodeAssert.assertEquals(expecteds, actuals); + try { + createAndExecuteQuery(revisionId3); + fail("Expected MicroKernelException"); + } catch (MicroKernelException e) { + // expected + } } @Test Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java?rev=1438025&r1=1438024&r2=1438025&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java (original) +++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java Thu Jan 24 14:46:29 2013 @@ -26,7 +26,6 @@ import org.apache.jackrabbit.mongomk.Bas import org.apache.jackrabbit.mongomk.api.model.Commit; import org.apache.jackrabbit.mongomk.impl.MongoNodeStore; import org.apache.jackrabbit.mongomk.impl.model.CommitBuilder; -import org.junit.Ignore; import org.junit.Test; import static org.junit.Assert.fail; @@ -166,7 +165,6 @@ public class ConcurrentConflictingCommit * a child node with name 'a'" error due to previous commit. */ @Test - @Ignore("OAK-566") public void leakedInvalidChild() throws Exception { mk.commit("/", "+\"c\" : {}", null, null); int n = 3;