Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Tue Aug 11 12:55:41 2015 @@ -22,6 +22,8 @@ package org.apache.jackrabbit.oak.plugin import static com.google.common.collect.ImmutableList.of; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.mergeSorted; +import static java.util.Collections.singletonList; +import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; import java.io.IOException; import java.util.Iterator; @@ -126,6 +128,8 @@ public class LastRevRecoveryAgent { //Map of known last rev of checked paths UnsavedModifications knownLastRevs = new UnsavedModifications(); closer.register(knownLastRevs); + final DocumentStore docStore = nodeStore.getDocumentStore(); + final JournalEntry changes = JOURNAL.newDocument(docStore); while (suspects.hasNext()) { NodeDocument doc = suspects.next(); @@ -153,6 +157,7 @@ public class LastRevRecoveryAgent { //2. Update lastRev for parent paths aka rollup if (lastRevForParents != null) { String path = doc.getPath(); + changes.modified(path); // track all changes while (true) { if (PathUtils.denotesRoot(path)) { break; @@ -176,6 +181,9 @@ public class LastRevRecoveryAgent { } } + // take the root's lastRev + final Revision lastRootRev = unsaved.get("/"); + //Note the size before persist as persist operation //would empty the internal state int size = unsaved.getPaths().size(); @@ -184,7 +192,41 @@ public class LastRevRecoveryAgent { //UnsavedModifications is designed to be used in concurrent //access mode. For recovery case there is no concurrent access //involve so just pass a new lock instance - unsaved.persist(nodeStore, new ReentrantLock()); + + // the lock uses to do the persisting is a plain reentrant lock + // thus it doesn't matter, where exactly the check is done + // as to whether the recovered lastRev has already been + // written to the journal. + unsaved.persist(nodeStore, new UnsavedModifications.Snapshot() { + + @Override + public void acquiring() { + if (lastRootRev == null) { + // this should never happen - when unsaved has no changes + // that is reflected in the 'map' to be empty - in that + // case 'persist()' quits early and never calls + // acquiring() here. + // + // but even if it would occur - if we have no lastRootRev + // then we cannot and probably don't have to persist anything + return; + } + + final String id = JournalEntry.asId(lastRootRev); // lastRootRev never null at this point + final JournalEntry existingEntry = docStore.find(Collection.JOURNAL, id); + if (existingEntry != null) { + // then the journal entry was already written - as can happen if + // someone else (or the original instance itself) wrote the + // journal entry, then died. + // in this case, don't write it again. + // hence: nothing to be done here. return. + return; + } + + // otherwise store a new journal entry now + docStore.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev))); + } + }, new ReentrantLock()); log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " + "cluster node [{}]: {}", size, clusterId, updates);
Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java Tue Aug 11 12:55:41 2015 @@ -73,7 +73,8 @@ public class LocalDiffCache implements D @Nonnull @Override public Entry newEntry(final @Nonnull Revision from, - final @Nonnull Revision to) { + final @Nonnull Revision to, + boolean local /*ignored*/) { return new Entry() { private final Map<String, String> changesPerPath = Maps.newHashMap(); private int size; Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java Tue Aug 11 12:55:41 2015 @@ -80,7 +80,8 @@ public class MemoryDiffCache implements @Nonnull @Override public Entry newEntry(@Nonnull Revision from, - @Nonnull Revision to) { + @Nonnull Revision to, + boolean local /*ignored*/) { return new MemoryEntry(from, to); } Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java Tue Aug 11 12:55:41 2015 @@ -16,8 +16,13 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import java.util.Set; import java.util.SortedSet; +import javax.annotation.Nonnull; + +import com.google.common.collect.Sets; + /** * A merge commit containing multiple commit revisions. One for each branch * commit to merge. @@ -25,6 +30,7 @@ import java.util.SortedSet; class MergeCommit extends Commit { private final SortedSet<Revision> mergeRevs; + private final Set<Revision> branchCommits = Sets.newHashSet(); MergeCommit(DocumentNodeStore nodeStore, Revision baseRevision, @@ -37,8 +43,18 @@ class MergeCommit extends Commit { return mergeRevs; } + void addBranchCommits(@Nonnull Branch branch) { + for (Revision r : branch.getCommits()) { + if (!branch.getCommit(r).isRebase()) { + branchCommits.add(r); + } + } + } + @Override public void applyToCache(Revision before, boolean isBranchCommit) { - // do nothing for a merge commit + // do nothing for a merge commit, only notify node + // store about merged revisions + nodeStore.revisionsMerged(branchCommits); } } Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java Tue Aug 11 12:55:41 2015 @@ -29,8 +29,8 @@ import org.apache.jackrabbit.oak.cache.C */ class TieredDiffCache implements DiffCache { - private final LocalDiffCache localCache; - private final MemoryDiffCache memoryCache; + private final DiffCache localCache; + private final DiffCache memoryCache; TieredDiffCache(DocumentMK.Builder builder) { this.localCache = new LocalDiffCache(builder); @@ -51,7 +51,8 @@ class TieredDiffCache implements DiffCac } /** - * Creates a new entry in the {@link LocalDiffCache} only! + * Creates a new entry in the {@link LocalDiffCache} for local changes + * and {@link MemoryDiffCache} for external changes * * @param from the from revision. * @param to the to revision. @@ -59,8 +60,12 @@ class TieredDiffCache implements DiffCac */ @Nonnull @Override - public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) { - return localCache.newEntry(from, to); + public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) { + if (local) { + return localCache.newEntry(from, to, true); + } else { + return memoryCache.newEntry(from, to, false); + } } @Nonnull Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java Tue Aug 11 12:55:41 2015 @@ -162,11 +162,14 @@ class UnsavedModifications implements Cl * lock for a short period of time. * * @param store the document node store. + * @param snapshot callback when the snapshot of the pending changes is + * acquired. * @param lock the lock to acquire to get a consistent snapshot of the * revisions to write back. * @return stats about the write operation. */ public BackgroundWriteStats persist(@Nonnull DocumentNodeStore store, + @Nonnull Snapshot snapshot, @Nonnull Lock lock) { BackgroundWriteStats stats = new BackgroundWriteStats(); if (map.size() == 0) { @@ -178,13 +181,14 @@ class UnsavedModifications implements Cl Clock clock = store.getClock(); long time = clock.getTime(); - // get a copy of the map while holding the lock + // get a copy of the map while holding the lock lock.lock(); MapFactory tmpFactory = null; Map<String, Revision> pending; try { stats.lock = clock.getTime() - time; time = clock.getTime(); + snapshot.acquiring(); if (map.size() > IN_MEMORY_SIZE_LIMIT) { tmpFactory = MapFactory.createFactory(); pending = tmpFactory.create(PathComparator.INSTANCE); @@ -265,4 +269,15 @@ class UnsavedModifications implements Cl public String toString() { return map.toString(); } + + public interface Snapshot { + + Snapshot IGNORE = new Snapshot() { + @Override + public void acquiring() { + } + }; + + void acquiring(); + } } Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java Tue Aug 11 12:55:41 2015 @@ -35,6 +35,7 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.document.Document; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; +import org.apache.jackrabbit.oak.plugins.document.JournalEntry; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; import org.apache.jackrabbit.oak.plugins.document.Revision; import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator; @@ -73,6 +74,12 @@ public class MemoryDocumentStore impleme private ConcurrentSkipListMap<String, Document> settings = new ConcurrentSkipListMap<String, Document>(); + /** + * The 'externalChanges' collection. + */ + private ConcurrentSkipListMap<String, JournalEntry> externalChanges = + new ConcurrentSkipListMap<String, JournalEntry>(); + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); /** @@ -226,8 +233,10 @@ public class MemoryDocumentStore impleme return (ConcurrentSkipListMap<String, T>) nodes; } else if (collection == Collection.CLUSTER_NODES) { return (ConcurrentSkipListMap<String, T>) clusterNodes; - }else if (collection == Collection.SETTINGS) { + } else if (collection == Collection.SETTINGS) { return (ConcurrentSkipListMap<String, T>) settings; + } else if (collection == Collection.JOURNAL) { + return (ConcurrentSkipListMap<String, T>) externalChanges; } else { throw new IllegalArgumentException( "Unknown collection: " + collection.toString()); @@ -329,6 +338,11 @@ public class MemoryDocumentStore impleme } @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + return null; + } + + @Override public void dispose() { // ignore } Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java Tue Aug 11 12:55:41 2015 @@ -195,7 +195,7 @@ abstract class CacheInvalidator { PeekingIterator<TreeNode> pitr = Iterators.peekingIterator(treeItr); Map<String, TreeNode> sameLevelNodes = Maps.newHashMap(); - // Fetch only the lastRev map and id + // Fetch only the modCount and id final BasicDBObject keys = new BasicDBObject(Document.ID, 1); keys.put(Document.MOD_COUNT, 1); @@ -228,7 +228,7 @@ abstract class CacheInvalidator { QueryBuilder query = QueryBuilder.start(Document.ID) .in(idBatch); - // Fetch lastRev and modCount for each such nodes + // Fetch modCount for each such nodes DBCursor cursor = nodes.find(query.get(), keys); cursor.setReadPreference(ReadPreference.primary()); LOG.debug( Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java Tue Aug 11 12:55:41 2015 @@ -91,7 +91,7 @@ public class MongoDiffCache extends Memo if (changes == null && loader != null) { changes = loader.call(); // put into memory cache - super.newEntry(from, to).append(path, changes); + super.newEntry(from, to, false).append(path, changes); } return changes; } finally { @@ -102,7 +102,8 @@ public class MongoDiffCache extends Memo @Nonnull @Override public Entry newEntry(@Nonnull final Revision from, - @Nonnull final Revision to) { + @Nonnull final Revision to, + boolean local /*ignored*/) { return new MemoryEntry(from, to) { private Diff commit = new Diff(from, to); @@ -172,7 +173,7 @@ public class MongoDiffCache extends Memo // diff is complete LOG.debug("Built diff from {} commits", numCommits); // apply to diff cache and serve later requests from cache - d.applyToEntry(super.newEntry(from, to)).done(); + d.applyToEntry(super.newEntry(from, to, false)).done(); // return changes return d.getChanges(path); } Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Tue Aug 11 12:55:41 2015 @@ -66,6 +66,7 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.document.cache.ForwardingListener; import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocOffHeapCache; import org.apache.jackrabbit.oak.plugins.document.cache.OffHeapCache; +import org.apache.jackrabbit.oak.plugins.document.mongo.CacheInvalidator.InvalidationResult; import org.apache.jackrabbit.oak.plugins.document.util.StringValue; import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.stats.Clock; @@ -115,6 +116,7 @@ public class MongoDocumentStore implemen private final DBCollection nodes; private final DBCollection clusterNodes; private final DBCollection settings; + private final DBCollection journal; private final Cache<CacheValue, NodeDocument> nodesCache; private final CacheStats cacheStats; @@ -192,12 +194,10 @@ public class MongoDocumentStore implemen .put("version", version) .build(); - nodes = db.getCollection( - Collection.NODES.toString()); - clusterNodes = db.getCollection( - Collection.CLUSTER_NODES.toString()); - settings = db.getCollection( - Collection.SETTINGS.toString()); + nodes = db.getCollection(Collection.NODES.toString()); + clusterNodes = db.getCollection(Collection.CLUSTER_NODES.toString()); + settings = db.getCollection(Collection.SETTINGS.toString()); + journal = db.getCollection(Collection.JOURNAL.toString()); maxReplicationLagMillis = builder.getMaxReplicationLagMillis(); @@ -295,6 +295,59 @@ public class MongoDocumentStore implemen //that would lead to lesser number of queries return CacheInvalidator.createHierarchicalInvalidator(this).invalidateCache(); } + + @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + LOG.debug("invalidateCache: start"); + final InvalidationResult result = new InvalidationResult(); + int size = 0; + + final Iterator<String> it = keys.iterator(); + while(it.hasNext()) { + // read chunks of documents only + final List<String> ids = new ArrayList<String>(IN_CLAUSE_BATCH_SIZE); + while(it.hasNext() && ids.size() < IN_CLAUSE_BATCH_SIZE) { + final String id = it.next(); + if (getCachedNodeDoc(id) != null) { + // only add those that we actually do have cached + ids.add(id); + } + } + size += ids.size(); + if (LOG.isTraceEnabled()) { + LOG.trace("invalidateCache: batch size: {} of total so far {}", + ids.size(), size); + } + + QueryBuilder query = QueryBuilder.start(Document.ID).in(ids); + // Fetch only the modCount and id + final BasicDBObject fields = new BasicDBObject(Document.ID, 1); + fields.put(Document.MOD_COUNT, 1); + + DBCursor cursor = nodes.find(query.get(), fields); + cursor.setReadPreference(ReadPreference.primary()); + result.queryCount++; + + for (DBObject obj : cursor) { + result.cacheEntriesProcessedCount++; + String id = (String) obj.get(Document.ID); + Number modCount = (Number) obj.get(Document.MOD_COUNT); + + CachedNodeDocument cachedDoc = getCachedNodeDoc(id); + if (cachedDoc != null + && !Objects.equal(cachedDoc.getModCount(), modCount)) { + invalidateCache(Collection.NODES, id); + result.invalidationCount++; + } else { + result.upToDateCount++; + } + } + } + + result.cacheSize = size; + LOG.trace("invalidateCache: end. total: {}", size); + return result; + } @Override public <T extends Document> void invalidateCache(Collection<T> collection, String key) { @@ -360,29 +413,30 @@ public class MongoDocumentStore implemen try { TreeLock lock = acquire(key); try { - if (maxCacheAge == 0) { - invalidateCache(collection, key); - } - while (true) { - doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() { - @Override - public NodeDocument call() throws Exception { - NodeDocument doc = (NodeDocument) findUncached(collection, key, getReadPreference(maxCacheAge)); - if (doc == null) { - doc = NodeDocument.NULL; + if (maxCacheAge > 0 || preferCached) { + // try again some other thread may have populated + // the cache by now + doc = nodesCache.getIfPresent(cacheKey); + if (doc != null) { + if (preferCached || + getTime() - doc.getCreated() < maxCacheAge) { + if (doc == NodeDocument.NULL) { + return null; } - return doc; + return (T) doc; } - }); - if (maxCacheAge == 0 || preferCached) { - break; - } - if (getTime() - doc.getCreated() < maxCacheAge) { - break; } - // too old: invalidate, try again - invalidateCache(collection, key); } + final NodeDocument d = (NodeDocument) findUncached( + collection, key, + getReadPreference(maxCacheAge)); + invalidateCache(collection, key); + doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() { + @Override + public NodeDocument call() throws Exception { + return d == null ? NodeDocument.NULL : d; + } + }); } finally { lock.unlock(); } @@ -393,6 +447,8 @@ public class MongoDocumentStore implemen } } catch (ExecutionException e) { t = e.getCause(); + } catch (RuntimeException e) { + t = e; } throw new DocumentStoreException("Failed to load document with " + key, t); } @@ -514,9 +570,13 @@ public class MongoDocumentStore implemen } DBObject query = queryBuilder.get(); String parentId = Utils.getParentIdFromLowerLimit(fromKey); + long lockTime = -1; final long start = PERFLOG.start(); - TreeLock lock = withLock ? acquireExclusive(parentId != null ? parentId : "") : null; + TreeLock lock = acquireExclusive(parentId != null ? parentId : ""); try { + if (start != -1) { + lockTime = System.currentTimeMillis() - start; + } DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC); if (!disableIndexHint) { cursor.hint(hint); @@ -574,7 +634,7 @@ public class MongoDocumentStore implemen if (lock != null) { lock.unlock(); } - PERFLOG.end(start, 1, "query for children from [{}] to [{}]", fromKey, toKey); + PERFLOG.end(start, 1, "query for children from [{}] to [{}], lock:{}", fromKey, toKey, lockTime); } } @@ -968,7 +1028,9 @@ public class MongoDocumentStore implemen return clusterNodes; } else if (collection == Collection.SETTINGS) { return settings; - }else { + } else if (collection == Collection.JOURNAL) { + return journal; + } else { throw new IllegalArgumentException( "Unknown collection: " + collection.toString()); } Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Tue Aug 11 12:55:41 2015 @@ -282,6 +282,12 @@ public class RDBDocumentStore implements } return null; } + + @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + //TODO: optimize me + return invalidateCache(); + } @Override public <T extends Document> void invalidateCache(Collection<T> collection, String id) { @@ -783,7 +789,7 @@ public class RDBDocumentStore implements private Set<String> tablesToBeDropped = new HashSet<String>(); // table names - private String tnNodes, tnClusterNodes, tnSettings; + private String tnNodes, tnClusterNodes, tnSettings, tnJournal; // ratio between Java characters and UTF-8 encoding // a) single characters will fit into 3 bytes @@ -825,6 +831,7 @@ public class RDBDocumentStore implements this.tnNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.NODES)); this.tnClusterNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.CLUSTER_NODES)); this.tnSettings = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.SETTINGS)); + this.tnJournal = RDBJDBCTools.createTableName(options.getTablePrefix(), "JOURNAL"); this.ch = new RDBConnectionHandler(ds); this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of RDBDocumentStore creation") : null; @@ -878,6 +885,7 @@ public class RDBDocumentStore implements createTableFor(con, Collection.CLUSTER_NODES, tablesCreated, tablesPresent, tableDiags); createTableFor(con, Collection.NODES, tablesCreated, tablesPresent, tableDiags); createTableFor(con, Collection.SETTINGS, tablesCreated, tablesPresent, tableDiags); + createTableFor(con, Collection.JOURNAL, tablesCreated, tablesPresent, tableDiags); } finally { con.commit(); con.close(); @@ -1314,6 +1322,8 @@ public class RDBDocumentStore implements return this.tnNodes; } else if (collection == Collection.SETTINGS) { return this.tnSettings; + } else if (collection == Collection.JOURNAL) { + return this.tnJournal; } else { throw new IllegalArgumentException("Unknown collection: " + collection.toString()); } Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java Tue Aug 11 12:55:41 2015 @@ -251,6 +251,17 @@ public class LoggingDocumentStoreWrapper throw convert(e); } } + + @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + try { + logMethod("invalidateCache", keys); + return store.invalidateCache(keys); + } catch (Exception e) { + logException(e); + throw convert(e); + } + } @Override public <T extends Document> void invalidateCache(Collection<T> collection, String key) { Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java Tue Aug 11 12:55:41 2015 @@ -107,6 +107,11 @@ public class SynchronizingDocumentStoreW } @Override + public synchronized CacheInvalidationStats invalidateCache(Iterable<String> keys) { + return store.invalidateCache(keys); + } + + @Override public synchronized <T extends Document> void invalidateCache(Collection<T> collection, String key) { store.invalidateCache(collection, key); } Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java Tue Aug 11 12:55:41 2015 @@ -282,6 +282,18 @@ public class TimingDocumentStoreWrapper throw convert(e); } } + + @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + try { + long start = now(); + CacheInvalidationStats result = base.invalidateCache(keys); + updateAndLogTimes("invalidateCache3", start, 0, 0); + return result; + } catch (Exception e) { + throw convert(e); + } + } @Override public <T extends Document> void invalidateCache(Collection<T> collection, String key) { Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Tue Aug 11 12:55:41 2015 @@ -34,6 +34,7 @@ import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import com.google.common.base.Function; import com.google.common.collect.AbstractIterator; import com.mongodb.BasicDBObject; @@ -49,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterables.transform; /** * Utility methods. @@ -557,4 +559,31 @@ public class Utils { public static boolean isHiddenPath(@Nonnull String path) { return path.contains("/:"); } + + /** + * Transforms the given {@link Iterable} from {@link String} to + * {@link StringValue} elements. The {@link Iterable} must no have + * {@code null} values. + */ + public static Iterable<StringValue> asStringValueIterable( + @Nonnull Iterable<String> values) { + return transform(values, new Function<String, StringValue>() { + @Override + public StringValue apply(String input) { + return new StringValue(input); + } + }); + } + + /** + * Transforms the given paths into ids using {@link #getIdFromPath(String)}. + */ + public static Iterable<String> pathToId(@Nonnull Iterable<String> paths) { + return transform(paths, new Function<String, String>() { + @Override + public String apply(String input) { + return getIdFromPath(input); + } + }); + } } Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java Tue Aug 11 12:55:41 2015 @@ -161,7 +161,7 @@ public abstract class NodeObserver imple while (!generator.isDone()) { generator.generate(); } - PERF_LOGGER.end(start, 10, + PERF_LOGGER.end(start, 100, "Generated events (before: {}, after: {})", previousRoot, root); } catch (Exception e) { Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java?rev=1695297&view=auto ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java (added) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java Tue Aug 11 12:55:41 2015 @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import com.google.common.collect.Lists; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.plugins.document.util.Utils; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.junit.After; +import org.junit.Before; + +import static java.util.Arrays.asList; +import static org.junit.Assert.fail; + +/** + * Base class for journal related tests. + */ +public abstract class AbstractJournalTest { + + protected TestBuilder builder; + protected List<DocumentMK> mks = Lists.newArrayList(); + protected Random random; + + @Before + public void setup() { + random = new Random(); + } + + @Before + @After + public void clear() { + for (DocumentMK mk : mks) { + mk.dispose(); + } + mks.clear(); + } + + protected static void invalidateDocChildrenCache(DocumentNodeStore store) { + store.invalidateDocChildrenCache(); + } + + protected static void renewClusterIdLease(DocumentNodeStore store) { + store.renewClusterIdLease(); + } + + protected Set<String> choose(List<String> paths, int howMany) { + final Set<String> result = new HashSet<String>(); + while(result.size()<howMany) { + result.add(paths.get(random.nextInt(paths.size()))); + } + return result; + } + + protected List<String> createRandomPaths(int depth, int avgChildrenPerLevel, int num) { + final Set<String> result = new HashSet<String>(); + while(result.size()<num) { + result.add(createRandomPath(depth, avgChildrenPerLevel)); + } + return new ArrayList<String>(result); + } + + protected String createRandomPath(int depth, int avgChildrenPerLevel) { + StringBuilder sb = new StringBuilder(); + for(int i=0; i<depth; i++) { + sb.append("/"); + sb.append("r").append(random.nextInt(avgChildrenPerLevel)); + } + return sb.toString(); + } + + protected void assertDocCache(DocumentNodeStore ns, boolean expected, String path) { + String id = Utils.getIdFromPath(path); + boolean exists = ns.getDocumentStore().getIfCached(Collection.NODES, id)!=null; + if (exists!=expected) { + if (expected) { + fail("assertDocCache: did not find in cache even though expected: "+path); + } else { + fail("assertDocCache: found in cache even though not expected: "+path); + } + } + } + + protected void setProperty(DocumentNodeStore ns, String path, String key, String value, boolean runBgOpsAfterCreation) throws + CommitFailedException { + NodeBuilder rootBuilder = ns.getRoot().builder(); + doGetOrCreate(rootBuilder, path).setProperty(key, value); + ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + if (runBgOpsAfterCreation) { + ns.runBackgroundOperations(); + } + } + + protected void getOrCreate(DocumentNodeStore ns, List<String> paths, boolean runBgOpsAfterCreation) throws CommitFailedException { + NodeBuilder rootBuilder = ns.getRoot().builder(); + for(String path:paths) { + doGetOrCreate(rootBuilder, path); + } + ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + if (runBgOpsAfterCreation) { + ns.runBackgroundOperations(); + } + } + + protected void getOrCreate(DocumentNodeStore ns, String path, boolean runBgOpsAfterCreation) throws CommitFailedException { + NodeBuilder rootBuilder = ns.getRoot().builder(); + doGetOrCreate(rootBuilder, path); + ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + if (runBgOpsAfterCreation) { + ns.runBackgroundOperations(); + } + } + + protected NodeBuilder doGetOrCreate(NodeBuilder builder, String path) { + String[] parts = path.split("/"); + for(int i=1; i<parts.length; i++) { + builder = builder.child(parts[i]); + } + return builder; + } + + protected void assertJournalEntries(DocumentNodeStore ds, String... expectedChanges) { + List<String> exp = new LinkedList<String>(asList(expectedChanges)); + for(boolean branch : new Boolean[]{false, true}) { + String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch)); + String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch)); + List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, expectedChanges.length+5); + if (entries.size()>0) { + for (JournalEntry journalEntry : entries) { + if (!exp.remove(journalEntry.get("_c"))) { + fail("Found an unexpected change: " + journalEntry.get("_c") + ", while all I expected was: " + asList(expectedChanges)); + } + } + } + } + if (exp.size()>0) { + fail("Did not find all expected changes, left over: "+exp+" (from original list which is: "+asList(expectedChanges)+")"); + } + } + + protected int countJournalEntries(DocumentNodeStore ds, int max) { + int total = 0; + for(boolean branch : new Boolean[]{false, true}) { + String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch)); + String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch)); + List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, max); + total+=entries.size(); + } + return total; + } + + protected NodeDocument getDocument(DocumentNodeStore nodeStore, String path) { + return nodeStore.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path)); + } + + protected TestBuilder newDocumentMKBuilder() { + return new TestBuilder(); + } + + protected DocumentMK createMK(int clusterId, int asyncDelay, + DocumentStore ds, BlobStore bs) { + builder = newDocumentMKBuilder(); + return register(builder.setDocumentStore(ds) + .setBlobStore(bs).setClusterId(clusterId) + .setAsyncDelay(asyncDelay).open()); + } + + protected DocumentMK register(DocumentMK mk) { + mks.add(mk); + return mk; + } + + protected final class TestBuilder extends DocumentMK.Builder { + CountingDocumentStore actualStore; + CountingTieredDiffCache actualDiffCache; + + @Override + public DocumentStore getDocumentStore() { + if (actualStore==null) { + actualStore = new CountingDocumentStore(super.getDocumentStore()); + } + return actualStore; + } + + @Override + public DiffCache getDiffCache() { + if (actualDiffCache==null) { + actualDiffCache = new CountingTieredDiffCache(this); + } + return actualDiffCache; + } + } +} Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java Tue Aug 11 12:55:41 2015 @@ -47,7 +47,7 @@ class AmnesiaDiffCache implements DiffCa @Nonnull @Override - public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) { + public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) { return new Entry() { @Override public void append(@Nonnull String path, @Nonnull String changes) { Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java Tue Aug 11 12:55:41 2015 @@ -370,6 +370,10 @@ public class ClusterTest { rootStates2.add((DocumentNodeState) root); } }); + + ns1.runBackgroundOperations(); + ns2.runBackgroundOperations(); + rootStates1.clear(); rootStates2.clear(); Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java?rev=1695297&view=auto ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java (added) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java Tue Aug 11 12:55:41 2015 @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.cache.CacheStats; +import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; +import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; +import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; + +public class CountingDocumentStore implements DocumentStore { + + private DocumentStore delegate; + + //TODO: remove mec + boolean printStacks; + + class Stats { + + private int numFindCalls; + private int numQueryCalls; + private int numRemoveCalls; + private int numCreateOrUpdateCalls; + + } + + private Map<Collection, Stats> collectionStats = new HashMap<Collection, Stats>(); + + public CountingDocumentStore(DocumentStore delegate) { + this.delegate = delegate; + } + + public void resetCounters() { + collectionStats.clear(); + } + + public int getNumFindCalls(Collection collection) { + return getStats(collection).numFindCalls; + } + + public int getNumQueryCalls(Collection collection) { + return getStats(collection).numQueryCalls; + } + + public int getNumRemoveCalls(Collection collection) { + return getStats(collection).numRemoveCalls; + } + + public int getNumCreateOrUpdateCalls(Collection collection) { + return getStats(collection).numCreateOrUpdateCalls; + } + + private Stats getStats(Collection collection) { + if (!collectionStats.containsKey(collection)) { + Stats s = new Stats(); + collectionStats.put(collection, s); + return s; + } else { + return collectionStats.get(collection); + } + } + + @Override + public <T extends Document> T find(Collection<T> collection, String key) { + getStats(collection).numFindCalls++; + if (printStacks) { + new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key).printStackTrace(); + } + return delegate.find(collection, key); + } + + @Override + public <T extends Document> T find(Collection<T> collection, + String key, + int maxCacheAge) { + getStats(collection).numFindCalls++; + if (printStacks) { + new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key + " [max: " + maxCacheAge + "]").printStackTrace(); + } + return delegate.find(collection, key, maxCacheAge); + } + + @Nonnull + @Override + public <T extends Document> List<T> query(Collection<T> collection, + String fromKey, + String toKey, + int limit) { + getStats(collection).numQueryCalls++; + if (printStacks) { + new Exception("query1 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace(); + } + return delegate.query(collection, fromKey, toKey, limit); + } + + @Nonnull + @Override + public <T extends Document> List<T> query(Collection<T> collection, + String fromKey, + String toKey, + String indexedProperty, + long startValue, + int limit) { + getStats(collection).numQueryCalls++; + if (printStacks) { + new Exception("query2 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace(); + } + return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit); + } + + @Override + public <T extends Document> void remove(Collection<T> collection, + String key) { + getStats(collection).numRemoveCalls++; + delegate.remove(collection, key); + } + + @Override + public <T extends Document> void remove(Collection<T> collection, + List<String> keys) { + getStats(collection).numRemoveCalls++; + delegate.remove(collection, keys); + } + + @Override + public <T extends Document> int remove(Collection<T> collection, + Map<String, Map<Key, Condition>> toRemove) { + getStats(collection).numRemoveCalls++; + return delegate.remove(collection, toRemove); + } + + @Override + public <T extends Document> boolean create(Collection<T> collection, + List<UpdateOp> updateOps) { + getStats(collection).numCreateOrUpdateCalls++; + return delegate.create(collection, updateOps); + } + + @Override + public <T extends Document> void update(Collection<T> collection, + List<String> keys, + UpdateOp updateOp) { + getStats(collection).numCreateOrUpdateCalls++; + delegate.update(collection, keys, updateOp); + } + + @Override + public <T extends Document> T createOrUpdate(Collection<T> collection, + UpdateOp update) { + getStats(collection).numCreateOrUpdateCalls++; + return delegate.createOrUpdate(collection, update); + } + + @Override + public <T extends Document> T findAndUpdate(Collection<T> collection, + UpdateOp update) { + getStats(collection).numCreateOrUpdateCalls++; + return delegate.findAndUpdate(collection, update); + } + + @Override + public CacheInvalidationStats invalidateCache() { + return delegate.invalidateCache(); + } + + @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + return delegate.invalidateCache(keys); + } + + @Override + public <T extends Document> void invalidateCache(Collection<T> collection, + String key) { + delegate.invalidateCache(collection, key); + } + + @Override + public void dispose() { + delegate.dispose(); + } + + @Override + public <T extends Document> T getIfCached(Collection<T> collection, + String key) { + return delegate.getIfCached(collection, key); + } + + @Override + public void setReadWriteMode(String readWriteMode) { + delegate.setReadWriteMode(readWriteMode); + } + + @Override + public CacheStats getCacheStats() { + return delegate.getCacheStats(); + } + + @Override + public Map<String, String> getMetadata() { + return delegate.getMetadata(); + } + +} Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java?rev=1695297&view=auto ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java (added) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java Tue Aug 11 12:55:41 2015 @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +public class CountingTieredDiffCache extends TieredDiffCache { + + class CountingLoader implements Loader { + + private Loader delegate; + + CountingLoader(Loader delegate) { + this.delegate = delegate; + } + + @Override + public String call() { + incLoadCount(); + return delegate.call(); + } + + } + + private int loadCount; + + public CountingTieredDiffCache(DocumentMK.Builder builder) { + super(builder); + } + + private void incLoadCount() { + loadCount++; + } + + public int getLoadCount() { + return loadCount; + } + + public void resetLoadCounter() { + loadCount = 0; + } + + @Override + public String getChanges(@Nonnull Revision from, + @Nonnull Revision to, + @Nonnull String path, + @Nullable Loader loader) { + return super.getChanges(from, to, path, new CountingLoader(loader)); + } +} Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Tue Aug 11 12:55:41 2015 @@ -60,6 +60,7 @@ import com.google.common.base.Throwables import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; @@ -102,8 +103,8 @@ public class DocumentNodeStoreTest { DocumentStore docStore = new MemoryDocumentStore(); DocumentStore testStore = new TimingDocumentStoreWrapper(docStore) { @Override - public CacheInvalidationStats invalidateCache() { - super.invalidateCache(); + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + super.invalidateCache(keys); semaphore.acquireUninterruptibly(); semaphore.release(); return null; @@ -1667,7 +1668,7 @@ public class DocumentNodeStoreTest { merge(ns, builder); Revision to = ns.getHeadRevision(); - DiffCache.Entry entry = ns.getDiffCache().newEntry(from, to); + DiffCache.Entry entry = ns.getDiffCache().newEntry(from, to, true); entry.append("/", "-\"foo\""); entry.done(); Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java?rev=1695297&view=auto ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (added) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java Tue Aug 11 12:55:41 2015 @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.commons.json.JsopReader; +import org.apache.jackrabbit.oak.commons.json.JsopTokenizer; +import org.apache.jackrabbit.oak.commons.sort.StringSort; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.junit.Test; + +import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link JournalEntry}. + */ +public class JournalEntryTest { + + @Test + public void applyTo() throws Exception { + DiffCache cache = new MemoryDiffCache(new DocumentMK.Builder()); + List<String> paths = Lists.newArrayList(); + addRandomPaths(paths); + StringSort sort = JournalEntry.newSorter(); + add(sort, paths); + Revision from = new Revision(1, 0, 1); + Revision to = new Revision(2, 0, 1); + sort.sort(); + JournalEntry.applyTo(sort, cache, from, to); + + for (String p : paths) { + String changes = cache.getChanges(from, to, p, null); + assertNotNull("missing changes for " + p, changes); + for (String c : getChildren(changes)) { + assertTrue(paths.contains(PathUtils.concat(p, c))); + } + } + sort.close(); + } + + @Test + public void fillExternalChanges() throws Exception { + DocumentStore store = new MemoryDocumentStore(); + JournalEntry entry = JOURNAL.newDocument(store); + Set<String> paths = Sets.newHashSet(); + addRandomPaths(paths); + entry.modified(paths); + Revision r1 = new Revision(1, 0, 1); + Revision r2 = new Revision(2, 0, 1); + Revision r3 = new Revision(3, 0, 1); + UpdateOp op = entry.asUpdateOp(r2); + assertTrue(store.create(JOURNAL, Collections.singletonList(op))); + + StringSort sort = JournalEntry.newSorter(); + JournalEntry.fillExternalChanges(sort, r2, r3, store); + assertEquals(0, sort.getSize()); + + JournalEntry.fillExternalChanges(sort, r1, r2, store); + assertEquals(paths.size(), sort.getSize()); + sort.close(); + + sort = JournalEntry.newSorter(); + JournalEntry.fillExternalChanges(sort, r1, r3, store); + assertEquals(paths.size(), sort.getSize()); + sort.close(); + } + + @Test + public void getRevisionTimestamp() throws Exception { + DocumentStore store = new MemoryDocumentStore(); + JournalEntry entry = JOURNAL.newDocument(store); + entry.modified("/foo"); + Revision r = Revision.newRevision(1); + assertTrue(store.create(JOURNAL, + Collections.singletonList(entry.asUpdateOp(r)))); + entry = store.find(JOURNAL, JournalEntry.asId(r)); + assertEquals(r.getTimestamp(), entry.getRevisionTimestamp()); + } + + private static void addRandomPaths(java.util.Collection<String> paths) throws IOException { + paths.add("/"); + Random random = new Random(42); + for (int i = 0; i < 1000; i++) { + String path = "/"; + int depth = random.nextInt(6); + for (int j = 0; j < depth; j++) { + char name = (char) ('a' + random.nextInt(26)); + path = PathUtils.concat(path, String.valueOf(name)); + paths.add(path); + } + } + } + + private static void add(StringSort sort, List<String> paths) + throws IOException { + for (String p : paths) { + sort.add(p); + } + } + + private static List<String> getChildren(String diff) { + List<String> children = Lists.newArrayList(); + JsopTokenizer t = new JsopTokenizer(diff); + for (;;) { + int r = t.read(); + switch (r) { + case '^': { + children.add(t.readString()); + t.read(':'); + t.read('{'); + t.read('}'); + break; + } + case JsopReader.END: { + return children; + } + default: + fail("Unexpected token: " + r); + } + } + } +} Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java?rev=1695297&view=auto ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (added) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java Tue Aug 11 12:55:41 2015 @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.commit.Observer; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; +import org.junit.Test; + +import static java.util.Collections.synchronizedList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class JournalTest extends AbstractJournalTest { + + private MemoryDocumentStore ds; + private MemoryBlobStore bs; + + class DiffingObserver implements Observer, Runnable, NodeStateDiff { + + final List<DocumentNodeState> incomingRootStates1 = Lists.newArrayList(); + final List<DocumentNodeState> diffedRootStates1 = Lists.newArrayList(); + + DocumentNodeState oldRoot = null; + + DiffingObserver(boolean startInBackground) { + if (startInBackground) { + // start the diffing in the background - so as to not + // interfere with the contentChanged call + Thread th = new Thread(this); + th.setDaemon(true); + th.start(); + } + } + + public void clear() { + synchronized(incomingRootStates1) { + incomingRootStates1.clear(); + diffedRootStates1.clear(); + } + } + + @Override + public void contentChanged(NodeState root, CommitInfo info) { + synchronized(incomingRootStates1) { + incomingRootStates1.add((DocumentNodeState) root); + incomingRootStates1.notifyAll(); + } + } + + public void processAll() { + while(processOne()) { + // continue + } + } + + public boolean processOne() { + DocumentNodeState newRoot; + synchronized(incomingRootStates1) { + if (incomingRootStates1.size()==0) { + return false; + } + newRoot = incomingRootStates1.remove(0); + } + if (oldRoot!=null) { + newRoot.compareAgainstBaseState(oldRoot, this); + } + oldRoot = newRoot; + synchronized(incomingRootStates1) { + diffedRootStates1.add(newRoot); + } + return true; + } + + @Override + public void run() { + while(true) { + DocumentNodeState newRoot; + synchronized(incomingRootStates1) { + while(incomingRootStates1.size()==0) { + try { + incomingRootStates1.wait(); + } catch (InterruptedException e) { + // ignore + } + } + newRoot = incomingRootStates1.remove(0); + } + if (oldRoot!=null) { + newRoot.compareAgainstBaseState(oldRoot, this); + } + oldRoot = newRoot; + synchronized(incomingRootStates1) { + diffedRootStates1.add(newRoot); + } + } + } + + @Override + public boolean propertyAdded(PropertyState after) { + return true; + } + + @Override + public boolean propertyChanged(PropertyState before, PropertyState after) { + return true; + } + + @Override + public boolean propertyDeleted(PropertyState before) { + return true; + } + + @Override + public boolean childNodeAdded(String name, NodeState after) { + return true; + } + + @Override + public boolean childNodeChanged(String name, NodeState before, + NodeState after) { + return true; + } + + @Override + public boolean childNodeDeleted(String name, NodeState before) { + return true; + } + + public int getTotal() { + synchronized(incomingRootStates1) { + return incomingRootStates1.size() + diffedRootStates1.size(); + } + } + + } + + @Test + public void cleanupTest() throws Exception { + DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0); + DocumentNodeStore ns1 = mk1.getNodeStore(); + // make sure we're visible and marked as active + ns1.renewClusterIdLease(); + JournalGarbageCollector gc = new JournalGarbageCollector(ns1); + // first clean up + gc.gc(1, TimeUnit.MILLISECONDS); + Thread.sleep(100); // sleep just quickly + assertEquals(0, gc.gc(1, TimeUnit.DAYS)); + assertEquals(0, gc.gc(6, TimeUnit.HOURS)); + assertEquals(0, gc.gc(1, TimeUnit.HOURS)); + assertEquals(0, gc.gc(10, TimeUnit.MINUTES)); + assertEquals(0, gc.gc(1, TimeUnit.MINUTES)); + assertEquals(0, gc.gc(1, TimeUnit.SECONDS)); + assertEquals(0, gc.gc(1, TimeUnit.MILLISECONDS)); + + // create some entries that can be deleted thereupon + mk1.commit("/", "+\"regular1\": {}", null, null); + mk1.commit("/", "+\"regular2\": {}", null, null); + mk1.commit("/", "+\"regular3\": {}", null, null); + mk1.commit("/regular2", "+\"regular4\": {}", null, null); + Thread.sleep(100); // sleep 100millis + assertEquals(0, gc.gc(5, TimeUnit.SECONDS)); + assertEquals(0, gc.gc(1, TimeUnit.MILLISECONDS)); + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular5\": {}", null, null); + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular6\": {}", null, null); + ns1.runBackgroundOperations(); + Thread.sleep(100); // sleep 100millis + assertEquals(0, gc.gc(5, TimeUnit.SECONDS)); + assertEquals(3, gc.gc(1, TimeUnit.MILLISECONDS)); + } + + @Test + public void journalTest() throws Exception { + DocumentMK mk1 = createMK(1, 0); + DocumentNodeStore ns1 = mk1.getNodeStore(); + CountingDocumentStore countingDocStore1 = builder.actualStore; + CountingTieredDiffCache countingDiffCache1 = builder.actualDiffCache; + + DocumentMK mk2 = createMK(2, 0); + DocumentNodeStore ns2 = mk2.getNodeStore(); + CountingDocumentStore countingDocStore2 = builder.actualStore; + CountingTieredDiffCache countingDiffCache2 = builder.actualDiffCache; + + final DiffingObserver observer = new DiffingObserver(false); + ns1.addObserver(observer); + + ns1.runBackgroundOperations(); + ns2.runBackgroundOperations(); + observer.processAll(); // to make sure we have an 'oldRoot' + observer.clear(); + countingDocStore1.resetCounters(); + countingDocStore2.resetCounters(); + // countingDocStore1.printStacks = true; + countingDiffCache1.resetLoadCounter(); + countingDiffCache2.resetLoadCounter(); + + mk2.commit("/", "+\"regular1\": {}", null, null); + mk2.commit("/", "+\"regular2\": {}", null, null); + mk2.commit("/", "+\"regular3\": {}", null, null); + mk2.commit("/regular2", "+\"regular4\": {}", null, null); + // flush to journal + ns2.runBackgroundOperations(); + + // nothing notified yet + assertEquals(0, observer.getTotal()); + assertEquals(0, countingDocStore1.getNumFindCalls(Collection.NODES)); + assertEquals(0, countingDocStore1.getNumQueryCalls(Collection.NODES)); + assertEquals(0, countingDocStore1.getNumRemoveCalls(Collection.NODES)); + assertEquals(0, countingDocStore1.getNumCreateOrUpdateCalls(Collection.NODES)); + assertEquals(0, countingDiffCache1.getLoadCount()); + + // let node 1 read those changes + // System.err.println("run background ops"); + ns1.runBackgroundOperations(); + mk2.commit("/", "+\"regular5\": {}", null, null); + ns2.runBackgroundOperations(); + ns1.runBackgroundOperations(); + // and let the observer process everything + observer.processAll(); + countingDocStore1.printStacks = false; + + // now expect 1 entry in rootStates + assertEquals(2, observer.getTotal()); + assertEquals(0, countingDiffCache1.getLoadCount()); + assertEquals(0, countingDocStore1.getNumRemoveCalls(Collection.NODES)); + assertEquals(0, countingDocStore1.getNumCreateOrUpdateCalls(Collection.NODES)); + assertEquals(0, countingDocStore1.getNumQueryCalls(Collection.NODES)); +// assertEquals(0, countingDocStore1.getNumFindCalls(Collection.NODES)); + } + + @Test + public void externalBranchChange() throws Exception { + DocumentMK mk1 = createMK(1, 0); + DocumentNodeStore ns1 = mk1.getNodeStore(); + DocumentMK mk2 = createMK(2, 0); + DocumentNodeStore ns2 = mk2.getNodeStore(); + + ns1.runBackgroundOperations(); + ns2.runBackgroundOperations(); + + mk1.commit("/", "+\"regular1\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + mk1.commit("/regular1", "+\"regular1child\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular2\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular3\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular4\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular5\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + String b1 = mk1.branch(null); + b1 = mk1.commit("/", "+\"branchVisible\": {}", b1, null); + mk1.merge(b1, null); + + // to flush the branch commit either dispose of mk1 + // or run the background operations explicitly + // (as that will propagate the lastRev to the root) + ns1.runBackgroundOperations(); + ns2.runBackgroundOperations(); + + String nodes = mk2.getNodes("/", null, 0, 0, 100, null); + assertEquals("{\"branchVisible\":{},\"regular1\":{},\"regular2\":{},\"regular3\":{},\"regular4\":{},\"regular5\":{},\":childNodeCount\":6}", nodes); + } + + /** Inspired by LastRevRecoveryTest.testRecover() - simplified and extended with journal related asserts **/ + @Test + public void lastRevRecoveryJournalTest() throws Exception { + doLastRevRecoveryJournalTest(false); + } + + /** Inspired by LastRevRecoveryTest.testRecover() - simplified and extended with journal related asserts **/ + @Test + public void lastRevRecoveryJournalTestWithConcurrency() throws Exception { + doLastRevRecoveryJournalTest(true); + } + + private void doLastRevRecoveryJournalTest(boolean testConcurrency) throws Exception { + DocumentMK mk1 = createMK(0 /*clusterId via clusterNodes collection*/, 0); + DocumentNodeStore ds1 = mk1.getNodeStore(); + int c1Id = ds1.getClusterId(); + DocumentMK mk2 = createMK(0 /*clusterId via clusterNodes collection*/, 0); + DocumentNodeStore ds2 = mk2.getNodeStore(); + final int c2Id = ds2.getClusterId(); + + // should have 1 each with just the root changed + assertJournalEntries(ds1, "{}"); + assertJournalEntries(ds2, "{}"); + assertEquals(1, countJournalEntries(ds1, 10)); + assertEquals(1, countJournalEntries(ds2, 10)); + + //1. Create base structure /x/y + NodeBuilder b1 = ds1.getRoot().builder(); + b1.child("x").child("y"); + ds1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); + ds1.runBackgroundOperations(); + + //lastRev are persisted directly for new nodes. In case of + // updates they are persisted via background jobs + + //1.2 Get last rev populated for root node for ds2 + ds2.runBackgroundOperations(); + NodeBuilder b2 = ds2.getRoot().builder(); + b2.child("x").setProperty("f1","b1"); + ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY); + ds2.runBackgroundOperations(); + + //2. Add a new node /x/y/z + b2 = ds2.getRoot().builder(); + b2.child("x").child("y").child("z").setProperty("foo", "bar"); + ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + //Refresh DS1 + ds1.runBackgroundOperations(); + + final NodeDocument z1 = getDocument(ds1, "/x/y/z"); + NodeDocument y1 = getDocument(ds1, "/x/y"); + final NodeDocument x1 = getDocument(ds1, "/x"); + + Revision head2 = ds2.getHeadRevision(); + + //lastRev should not be updated for C #2 + assertNull(y1.getLastRev().get(c2Id)); + + final LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds1); + + // besides the former root change, now 1 also has + final String change1 = "{\"x\":{\"y\":{}}}"; + assertJournalEntries(ds1, "{}", change1); + final String change2 = "{\"x\":{}}"; + assertJournalEntries(ds2, "{}", change2); + + + String change2b = "{\"x\":{\"y\":{\"z\":{}}}}"; + + if (!testConcurrency) { + //Do not pass y1 but still y1 should be updated + recovery.recover(Iterators.forArray(x1,z1), c2Id); + + //Post recovery the lastRev should be updated for /x/y and /x + assertEquals(head2, getDocument(ds1, "/x/y").getLastRev().get(c2Id)); + assertEquals(head2, getDocument(ds1, "/x").getLastRev().get(c2Id)); + assertEquals(head2, getDocument(ds1, "/").getLastRev().get(c2Id)); + + // now 1 is unchanged, but 2 was recovered now, so has one more: + assertJournalEntries(ds1, "{}", change1); // unchanged + assertJournalEntries(ds2, "{}", change2, change2b); + + // just some no-ops: + recovery.recover(c2Id); + recovery.recover(Iterators.<NodeDocument>emptyIterator(), c2Id); + assertJournalEntries(ds1, "{}", change1); // unchanged + assertJournalEntries(ds2, "{}", change2, change2b); + + } else { + + // do some concurrency testing as well to check if + final int NUM_THREADS = 200; + final CountDownLatch ready = new CountDownLatch(NUM_THREADS); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch end = new CountDownLatch(NUM_THREADS); + final List<Exception> exceptions = synchronizedList(new ArrayList<Exception>()); + for (int i = 0; i < NUM_THREADS; i++) { + Thread th = new Thread(new Runnable() { + + @Override + public void run() { + try { + ready.countDown(); + start.await(); + recovery.recover(Iterators.forArray(x1,z1), c2Id); + } catch (Exception e) { + exceptions.add(e); + } finally { + end.countDown(); + } + } + + }); + th.start(); + } + ready.await(5, TimeUnit.SECONDS); + start.countDown(); + assertTrue(end.await(20, TimeUnit.SECONDS)); + assertJournalEntries(ds1, "{}", change1); // unchanged + assertJournalEntries(ds2, "{}", change2, change2b); + for (Exception ex : exceptions) { + throw ex; + } + } + } + + private DocumentMK createMK(int clusterId, int asyncDelay) { + if (ds == null) { + ds = new MemoryDocumentStore(); + } + if (bs == null) { + bs = new MemoryBlobStore(); + } + return createMK(clusterId, asyncDelay, ds, bs); + } +} Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java ------------------------------------------------------------------------------ svn:eol-style = native
