Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java Wed Jul 1 13:37:35 2015 @@ -35,6 +35,7 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.document.Document; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; +import org.apache.jackrabbit.oak.plugins.document.JournalEntry; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; import org.apache.jackrabbit.oak.plugins.document.Revision; import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator; @@ -73,6 +74,12 @@ public class MemoryDocumentStore impleme private ConcurrentSkipListMap<String, Document> settings = new ConcurrentSkipListMap<String, Document>(); + /** + * The 'externalChanges' collection. + */ + private ConcurrentSkipListMap<String, JournalEntry> externalChanges = + new ConcurrentSkipListMap<String, JournalEntry>(); + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); /** @@ -226,8 +233,10 @@ public class MemoryDocumentStore impleme return (ConcurrentSkipListMap<String, T>) nodes; } else if (collection == Collection.CLUSTER_NODES) { return (ConcurrentSkipListMap<String, T>) clusterNodes; - }else if (collection == Collection.SETTINGS) { + } else if (collection == Collection.SETTINGS) { return (ConcurrentSkipListMap<String, T>) settings; + } else if (collection == Collection.JOURNAL) { + return (ConcurrentSkipListMap<String, T>) externalChanges; } else { throw new IllegalArgumentException( "Unknown collection: " + collection.toString()); @@ -329,6 +338,11 @@ public class MemoryDocumentStore impleme } @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + return null; + } + + @Override public void dispose() { // ignore }
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java Wed Jul 1 13:37:35 2015 @@ -195,7 +195,7 @@ abstract class CacheInvalidator { PeekingIterator<TreeNode> pitr = Iterators.peekingIterator(treeItr); Map<String, TreeNode> sameLevelNodes = Maps.newHashMap(); - // Fetch only the lastRev map and id + // Fetch only the modCount and id final BasicDBObject keys = new BasicDBObject(Document.ID, 1); keys.put(Document.MOD_COUNT, 1); @@ -228,7 +228,7 @@ abstract class CacheInvalidator { QueryBuilder query = QueryBuilder.start(Document.ID) .in(idBatch); - // Fetch lastRev and modCount for each such nodes + // Fetch modCount for each such nodes DBCursor cursor = nodes.find(query.get(), keys); cursor.setReadPreference(ReadPreference.primary()); LOG.debug( Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java Wed Jul 1 13:37:35 2015 @@ -91,7 +91,7 @@ public class MongoDiffCache extends Memo if (changes == null && loader != null) { changes = loader.call(); // put into memory cache - super.newEntry(from, to).append(path, changes); + super.newEntry(from, to, false).append(path, changes); } return changes; } finally { @@ -102,7 +102,8 @@ public class MongoDiffCache extends Memo @Nonnull @Override public Entry newEntry(@Nonnull final Revision from, - @Nonnull final Revision to) { + @Nonnull final Revision to, + boolean local /*ignored*/) { return new MemoryEntry(from, to) { private Diff commit = new Diff(from, to); @@ -172,7 +173,7 @@ public class MongoDiffCache extends Memo // diff is complete LOG.debug("Built diff from {} commits", numCommits); // apply to diff cache and serve later requests from cache - d.applyToEntry(super.newEntry(from, to)).done(); + d.applyToEntry(super.newEntry(from, to, false)).done(); // return changes return d.getChanges(path); } Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Wed Jul 1 13:37:35 2015 @@ -67,6 +67,7 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.document.cache.ForwardingListener; import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocOffHeapCache; import org.apache.jackrabbit.oak.plugins.document.cache.OffHeapCache; +import org.apache.jackrabbit.oak.plugins.document.mongo.CacheInvalidator.InvalidationResult; import org.apache.jackrabbit.oak.plugins.document.util.StringValue; import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.stats.Clock; @@ -117,6 +118,7 @@ public class MongoDocumentStore implemen private final DBCollection nodes; private final DBCollection clusterNodes; private final DBCollection settings; + private final DBCollection journal; private final Cache<CacheValue, NodeDocument> nodesCache; private final CacheStats cacheStats; @@ -196,12 +198,10 @@ public class MongoDocumentStore implemen .put("version", version) .build(); - nodes = db.getCollection( - Collection.NODES.toString()); - clusterNodes = db.getCollection( - Collection.CLUSTER_NODES.toString()); - settings = db.getCollection( - Collection.SETTINGS.toString()); + nodes = db.getCollection(Collection.NODES.toString()); + clusterNodes = db.getCollection(Collection.CLUSTER_NODES.toString()); + settings = db.getCollection(Collection.SETTINGS.toString()); + journal = db.getCollection(Collection.JOURNAL.toString()); maxReplicationLagMillis = builder.getMaxReplicationLagMillis(); @@ -299,6 +299,59 @@ public class MongoDocumentStore implemen //that would lead to lesser number of queries return CacheInvalidator.createHierarchicalInvalidator(this).invalidateCache(); } + + @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + LOG.debug("invalidateCache: start"); + final InvalidationResult result = new InvalidationResult(); + int size = 0; + + final Iterator<String> it = keys.iterator(); + while(it.hasNext()) { + // read chunks of documents only + final List<String> ids = new ArrayList<String>(IN_CLAUSE_BATCH_SIZE); + while(it.hasNext() && ids.size() < IN_CLAUSE_BATCH_SIZE) { + final String id = it.next(); + if (getCachedNodeDoc(id) != null) { + // only add those that we actually do have cached + ids.add(id); + } + } + size += ids.size(); + if (LOG.isTraceEnabled()) { + LOG.trace("invalidateCache: batch size: {} of total so far {}", + ids.size(), size); + } + + QueryBuilder query = QueryBuilder.start(Document.ID).in(ids); + // Fetch only the modCount and id + final BasicDBObject fields = new BasicDBObject(Document.ID, 1); + fields.put(Document.MOD_COUNT, 1); + + DBCursor cursor = nodes.find(query.get(), fields); + cursor.setReadPreference(ReadPreference.primary()); + result.queryCount++; + + for (DBObject obj : cursor) { + result.cacheEntriesProcessedCount++; + String id = (String) obj.get(Document.ID); + Number modCount = (Number) obj.get(Document.MOD_COUNT); + + CachedNodeDocument cachedDoc = getCachedNodeDoc(id); + if (cachedDoc != null + && !Objects.equal(cachedDoc.getModCount(), modCount)) { + invalidateCache(Collection.NODES, id); + result.invalidationCount++; + } else { + result.upToDateCount++; + } + } + } + + result.cacheSize = size; + LOG.trace("invalidateCache: end. total: {}", size); + return result; + } @Override public <T extends Document> void invalidateCache(Collection<T> collection, String key) { @@ -365,31 +418,30 @@ public class MongoDocumentStore implemen try { TreeLock lock = acquire(key); try { - if (maxCacheAge == 0) { - invalidateCache(collection, key); - } - while (true) { - doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() { - @Override - public NodeDocument call() throws Exception { - NodeDocument doc = (NodeDocument) findUncachedWithRetry( - collection, key, - getReadPreference(maxCacheAge), 2); - if (doc == null) { - doc = NodeDocument.NULL; + if (maxCacheAge > 0 || preferCached) { + // try again some other thread may have populated + // the cache by now + doc = nodesCache.getIfPresent(cacheKey); + if (doc != null) { + if (preferCached || + getTime() - doc.getCreated() < maxCacheAge) { + if (doc == NodeDocument.NULL) { + return null; } - return doc; + return (T) doc; } - }); - if (maxCacheAge == 0 || preferCached) { - break; - } - if (getTime() - doc.getCreated() < maxCacheAge) { - break; } - // too old: invalidate, try again - invalidateCache(collection, key); } + final NodeDocument d = (NodeDocument) findUncachedWithRetry( + collection, key, + getReadPreference(maxCacheAge), 2); + invalidateCache(collection, key); + doc = nodesCache.get(cacheKey, new Callable<NodeDocument>() { + @Override + public NodeDocument call() throws Exception { + return d == null ? NodeDocument.NULL : d; + } + }); } finally { lock.unlock(); } @@ -402,6 +454,8 @@ public class MongoDocumentStore implemen t = e.getCause(); } catch (ExecutionException e) { t = e.getCause(); + } catch (RuntimeException e) { + t = e; } throw new DocumentStoreException("Failed to load document with " + key, t); } @@ -423,6 +477,9 @@ public class MongoDocumentStore implemen DocumentReadPreference docReadPref, int retries) { checkArgument(retries >= 0, "retries must not be negative"); + if (key.equals("0:/")) { + LOG.trace("root node"); + } int numAttempts = retries + 1; MongoException ex = null; for (int i = 0; i < numAttempts; i++) { @@ -560,9 +617,13 @@ public class MongoDocumentStore implemen } DBObject query = queryBuilder.get(); String parentId = Utils.getParentIdFromLowerLimit(fromKey); + long lockTime = -1; final long start = PERFLOG.start(); - TreeLock lock = withLock ? acquireExclusive(parentId != null ? parentId : "") : null; + TreeLock lock = acquireExclusive(parentId != null ? parentId : ""); try { + if (start != -1) { + lockTime = System.currentTimeMillis() - start; + } DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC); if (!disableIndexHint) { cursor.hint(hint); @@ -620,7 +681,7 @@ public class MongoDocumentStore implemen if (lock != null) { lock.unlock(); } - PERFLOG.end(start, 1, "query for children from [{}] to [{}]", fromKey, toKey); + PERFLOG.end(start, 1, "query for children from [{}] to [{}], lock:{}", fromKey, toKey, lockTime); } } @@ -1014,7 +1075,9 @@ public class MongoDocumentStore implemen return clusterNodes; } else if (collection == Collection.SETTINGS) { return settings; - }else { + } else if (collection == Collection.JOURNAL) { + return journal; + } else { throw new IllegalArgumentException( "Unknown collection: " + collection.toString()); } Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Wed Jul 1 13:37:35 2015 @@ -282,6 +282,12 @@ public class RDBDocumentStore implements } return null; } + + @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + //TODO: optimize me + return invalidateCache(); + } @Override public <T extends Document> void invalidateCache(Collection<T> collection, String id) { @@ -783,7 +789,7 @@ public class RDBDocumentStore implements private Set<String> tablesToBeDropped = new HashSet<String>(); // table names - private String tnNodes, tnClusterNodes, tnSettings; + private String tnNodes, tnClusterNodes, tnSettings, tnJournal; // ratio between Java characters and UTF-8 encoding // a) single characters will fit into 3 bytes @@ -825,6 +831,7 @@ public class RDBDocumentStore implements this.tnNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.NODES)); this.tnClusterNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.CLUSTER_NODES)); this.tnSettings = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.SETTINGS)); + this.tnJournal = RDBJDBCTools.createTableName(options.getTablePrefix(), "JOURNAL"); this.ch = new RDBConnectionHandler(ds); this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of RDBDocumentStore creation") : null; @@ -878,6 +885,7 @@ public class RDBDocumentStore implements createTableFor(con, Collection.CLUSTER_NODES, tablesCreated, tablesPresent, tableDiags); createTableFor(con, Collection.NODES, tablesCreated, tablesPresent, tableDiags); createTableFor(con, Collection.SETTINGS, tablesCreated, tablesPresent, tableDiags); + createTableFor(con, Collection.JOURNAL, tablesCreated, tablesPresent, tableDiags); } finally { con.commit(); con.close(); @@ -1316,6 +1324,8 @@ public class RDBDocumentStore implements return this.tnNodes; } else if (collection == Collection.SETTINGS) { return this.tnSettings; + } else if (collection == Collection.JOURNAL) { + return this.tnJournal; } else { throw new IllegalArgumentException("Unknown collection: " + collection.toString()); } Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java Wed Jul 1 13:37:35 2015 @@ -251,6 +251,17 @@ public class LoggingDocumentStoreWrapper throw convert(e); } } + + @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + try { + logMethod("invalidateCache", keys); + return store.invalidateCache(keys); + } catch (Exception e) { + logException(e); + throw convert(e); + } + } @Override public <T extends Document> void invalidateCache(Collection<T> collection, String key) { Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java Wed Jul 1 13:37:35 2015 @@ -107,6 +107,11 @@ public class SynchronizingDocumentStoreW } @Override + public synchronized CacheInvalidationStats invalidateCache(Iterable<String> keys) { + return store.invalidateCache(keys); + } + + @Override public synchronized <T extends Document> void invalidateCache(Collection<T> collection, String key) { store.invalidateCache(collection, key); } Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java Wed Jul 1 13:37:35 2015 @@ -282,6 +282,18 @@ public class TimingDocumentStoreWrapper throw convert(e); } } + + @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + try { + long start = now(); + CacheInvalidationStats result = base.invalidateCache(keys); + updateAndLogTimes("invalidateCache3", start, 0, 0); + return result; + } catch (Exception e) { + throw convert(e); + } + } @Override public <T extends Document> void invalidateCache(Collection<T> collection, String key) { Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Wed Jul 1 13:37:35 2015 @@ -34,6 +34,7 @@ import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.AbstractIterator; import com.mongodb.BasicDBObject; @@ -51,6 +52,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterables.transform; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry; /** @@ -589,4 +591,31 @@ public class Utils { public static boolean isHiddenPath(@Nonnull String path) { return path.contains("/:"); } + + /** + * Transforms the given {@link Iterable} from {@link String} to + * {@link StringValue} elements. The {@link Iterable} must no have + * {@code null} values. + */ + public static Iterable<StringValue> asStringValueIterable( + @Nonnull Iterable<String> values) { + return transform(values, new Function<String, StringValue>() { + @Override + public StringValue apply(String input) { + return new StringValue(input); + } + }); + } + + /** + * Transforms the given paths into ids using {@link #getIdFromPath(String)}. + */ + public static Iterable<String> pathToId(@Nonnull Iterable<String> paths) { + return transform(paths, new Function<String, String>() { + @Override + public String apply(String input) { + return getIdFromPath(input); + } + }); + } } Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java Wed Jul 1 13:37:35 2015 @@ -161,7 +161,7 @@ public abstract class NodeObserver imple while (!generator.isDone()) { generator.generate(); } - PERF_LOGGER.end(start, 10, + PERF_LOGGER.end(start, 100, "Generated events (before: {}, after: {})", previousRoot, root); } catch (Exception e) { Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java Wed Jul 1 13:37:35 2015 @@ -47,7 +47,7 @@ class AmnesiaDiffCache implements DiffCa @Nonnull @Override - public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) { + public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) { return new Entry() { @Override public void append(@Nonnull String path, @Nonnull String changes) { Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java Wed Jul 1 13:37:35 2015 @@ -370,6 +370,10 @@ public class ClusterTest { rootStates2.add((DocumentNodeState) root); } }); + + ns1.runBackgroundOperations(); + ns2.runBackgroundOperations(); + rootStates1.clear(); rootStates2.clear(); Copied: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java (from r1684820, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java) URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java?p2=jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java&r1=1684820&r2=1688649&rev=1688649&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java Wed Jul 1 13:37:35 2015 @@ -20,186 +20,203 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nonnull; + import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; public class CountingDocumentStore implements DocumentStore { - - private DocumentStore delegate; - - //TODO: remove mec - boolean printStacks; - - class Stats { - - private int numFindCalls; - private int numQueryCalls; - private int numRemoveCalls; - private int numCreateOrUpdateCalls; - - } - - private Map<Collection, Stats> collectionStats = new HashMap<Collection, Stats>(); - - public CountingDocumentStore(DocumentStore delegate) { - this.delegate = delegate; - } - - public void resetCounters() { - collectionStats.clear(); - } - - public int getNumFindCalls(Collection collection) { - return getStats(collection).numFindCalls; - } - - public int getNumQueryCalls(Collection collection) { - return getStats(collection).numQueryCalls; - } - - public int getNumRemoveCalls(Collection collection) { - return getStats(collection).numRemoveCalls; - } - - public int getNumCreateOrUpdateCalls(Collection collection) { - return getStats(collection).numCreateOrUpdateCalls; - } - - private Stats getStats(Collection collection) { - if (!collectionStats.containsKey(collection)) { - Stats s = new Stats(); - collectionStats.put(collection, s); - return s; - } else { - return collectionStats.get(collection); - } - } - - @Override - public <T extends Document> T find(Collection<T> collection, String key) { - getStats(collection).numFindCalls++; - if (printStacks) { - new Exception("find ["+getStats(collection).numFindCalls+"] ("+collection+") "+key).printStackTrace(); - } - return delegate.find(collection, key); - } - - @Override - public <T extends Document> T find(Collection<T> collection, String key, - int maxCacheAge) { - getStats(collection).numFindCalls++; - if (printStacks) { - new Exception("find ["+getStats(collection).numFindCalls+"] ("+collection+") "+key+" [max: "+maxCacheAge+"]").printStackTrace(); - } - return delegate.find(collection, key, maxCacheAge); - } - - @Override - public <T extends Document> List<T> query(Collection<T> collection, - String fromKey, String toKey, int limit) { - getStats(collection).numQueryCalls++; - if (printStacks) { - new Exception("query1 ["+getStats(collection).numQueryCalls+"] ("+collection+") "+fromKey+", to "+toKey+". limit "+limit).printStackTrace(); - } - return delegate.query(collection, fromKey, toKey, limit); - } - - @Override - public <T extends Document> List<T> query(Collection<T> collection, - String fromKey, String toKey, String indexedProperty, - long startValue, int limit) { - getStats(collection).numQueryCalls++; - if (printStacks) { - new Exception("query2 ["+getStats(collection).numQueryCalls+"] ("+collection+") "+fromKey+", to "+toKey+". limit "+limit).printStackTrace(); - } - return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit); - } - - @Override - public <T extends Document> void remove(Collection<T> collection, String key) { - getStats(collection).numRemoveCalls++; - delegate.remove(collection, key); - } - - @Override - public <T extends Document> void remove(Collection<T> collection, - List<String> keys) { - getStats(collection).numRemoveCalls++; - delegate.remove(collection, keys); - } - - @Override - public <T extends Document> int remove(Collection<T> collection, - Map<String, Map<Key, Condition>> toRemove) { - getStats(collection).numRemoveCalls++; - return delegate.remove(collection, toRemove); - } - - @Override - public <T extends Document> boolean create(Collection<T> collection, - List<UpdateOp> updateOps) { - getStats(collection).numCreateOrUpdateCalls++; - return delegate.create(collection, updateOps); - } - - @Override - public <T extends Document> void update(Collection<T> collection, - List<String> keys, UpdateOp updateOp) { - getStats(collection).numCreateOrUpdateCalls++; - delegate.update(collection, keys, updateOp); - } - - @Override - public <T extends Document> T createOrUpdate(Collection<T> collection, - UpdateOp update) { - getStats(collection).numCreateOrUpdateCalls++; - return delegate.createOrUpdate(collection, update); - } - - @Override - public <T extends Document> T findAndUpdate(Collection<T> collection, - UpdateOp update) { - getStats(collection).numCreateOrUpdateCalls++; - return delegate.findAndUpdate(collection, update); - } - - @Override - public CacheInvalidationStats invalidateCache() { - return delegate.invalidateCache(); - } - - @Override - public <T extends Document> void invalidateCache(Collection<T> collection, - String key) { - delegate.invalidateCache(collection, key); - } - - @Override - public void dispose() { - delegate.dispose(); - } - - @Override - public <T extends Document> T getIfCached(Collection<T> collection, - String key) { - return delegate.getIfCached(collection, key); - } - - @Override - public void setReadWriteMode(String readWriteMode) { - delegate.setReadWriteMode(readWriteMode); - } - - @Override - public CacheStats getCacheStats() { - return delegate.getCacheStats(); - } - - @Override - public Map<String, String> getMetadata() { - return delegate.getMetadata(); - } + + private DocumentStore delegate; + + //TODO: remove mec + boolean printStacks; + + class Stats { + + private int numFindCalls; + private int numQueryCalls; + private int numRemoveCalls; + private int numCreateOrUpdateCalls; + + } + + private Map<Collection, Stats> collectionStats = new HashMap<Collection, Stats>(); + + public CountingDocumentStore(DocumentStore delegate) { + this.delegate = delegate; + } + + public void resetCounters() { + collectionStats.clear(); + } + + public int getNumFindCalls(Collection collection) { + return getStats(collection).numFindCalls; + } + + public int getNumQueryCalls(Collection collection) { + return getStats(collection).numQueryCalls; + } + + public int getNumRemoveCalls(Collection collection) { + return getStats(collection).numRemoveCalls; + } + + public int getNumCreateOrUpdateCalls(Collection collection) { + return getStats(collection).numCreateOrUpdateCalls; + } + + private Stats getStats(Collection collection) { + if (!collectionStats.containsKey(collection)) { + Stats s = new Stats(); + collectionStats.put(collection, s); + return s; + } else { + return collectionStats.get(collection); + } + } + + @Override + public <T extends Document> T find(Collection<T> collection, String key) { + getStats(collection).numFindCalls++; + if (printStacks) { + new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key).printStackTrace(); + } + return delegate.find(collection, key); + } + + @Override + public <T extends Document> T find(Collection<T> collection, + String key, + int maxCacheAge) { + getStats(collection).numFindCalls++; + if (printStacks) { + new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key + " [max: " + maxCacheAge + "]").printStackTrace(); + } + return delegate.find(collection, key, maxCacheAge); + } + + @Nonnull + @Override + public <T extends Document> List<T> query(Collection<T> collection, + String fromKey, + String toKey, + int limit) { + getStats(collection).numQueryCalls++; + if (printStacks) { + new Exception("query1 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace(); + } + return delegate.query(collection, fromKey, toKey, limit); + } + + @Nonnull + @Override + public <T extends Document> List<T> query(Collection<T> collection, + String fromKey, + String toKey, + String indexedProperty, + long startValue, + int limit) { + getStats(collection).numQueryCalls++; + if (printStacks) { + new Exception("query2 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace(); + } + return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit); + } + + @Override + public <T extends Document> void remove(Collection<T> collection, + String key) { + getStats(collection).numRemoveCalls++; + delegate.remove(collection, key); + } + + @Override + public <T extends Document> void remove(Collection<T> collection, + List<String> keys) { + getStats(collection).numRemoveCalls++; + delegate.remove(collection, keys); + } + + @Override + public <T extends Document> int remove(Collection<T> collection, + Map<String, Map<Key, Condition>> toRemove) { + getStats(collection).numRemoveCalls++; + return delegate.remove(collection, toRemove); + } + + @Override + public <T extends Document> boolean create(Collection<T> collection, + List<UpdateOp> updateOps) { + getStats(collection).numCreateOrUpdateCalls++; + return delegate.create(collection, updateOps); + } + + @Override + public <T extends Document> void update(Collection<T> collection, + List<String> keys, + UpdateOp updateOp) { + getStats(collection).numCreateOrUpdateCalls++; + delegate.update(collection, keys, updateOp); + } + + @Override + public <T extends Document> T createOrUpdate(Collection<T> collection, + UpdateOp update) { + getStats(collection).numCreateOrUpdateCalls++; + return delegate.createOrUpdate(collection, update); + } + + @Override + public <T extends Document> T findAndUpdate(Collection<T> collection, + UpdateOp update) { + getStats(collection).numCreateOrUpdateCalls++; + return delegate.findAndUpdate(collection, update); + } + + @Override + public CacheInvalidationStats invalidateCache() { + return delegate.invalidateCache(); + } + + @Override + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + return delegate.invalidateCache(keys); + } + + @Override + public <T extends Document> void invalidateCache(Collection<T> collection, + String key) { + delegate.invalidateCache(collection, key); + } + + @Override + public void dispose() { + delegate.dispose(); + } + + @Override + public <T extends Document> T getIfCached(Collection<T> collection, + String key) { + return delegate.getIfCached(collection, key); + } + + @Override + public void setReadWriteMode(String readWriteMode) { + delegate.setReadWriteMode(readWriteMode); + } + + @Override + public CacheStats getCacheStats() { + return delegate.getCacheStats(); + } + + @Override + public Map<String, String> getMetadata() { + return delegate.getMetadata(); + } } Copied: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java (from r1684820, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java) URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java?p2=jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java&r1=1684820&r2=1688649&rev=1688649&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java Wed Jul 1 13:37:35 2015 @@ -16,45 +16,50 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + public class CountingTieredDiffCache extends TieredDiffCache { - class CountingLoader implements Loader { + class CountingLoader implements Loader { + + private Loader delegate; + + CountingLoader(Loader delegate) { + this.delegate = delegate; + } + + @Override + public String call() { + incLoadCount(); + return delegate.call(); + } + + } + + private int loadCount; + + public CountingTieredDiffCache(DocumentMK.Builder builder) { + super(builder); + } + + private void incLoadCount() { + loadCount++; + } - private Loader delegate; + public int getLoadCount() { + return loadCount; + } - CountingLoader(Loader delegate) { - this.delegate = delegate; - } - - @Override - public String call() { - incLoadCount(); - return delegate.call(); - } - - } - - private int loadCount; - - public CountingTieredDiffCache(DocumentMK.Builder builder) { - super(builder); - } - - private void incLoadCount() { - loadCount++; - } - - public int getLoadCount() { - return loadCount; - } - - public void resetLoadCounter() { - loadCount = 0; - } - - @Override - public String getChanges(Revision from, Revision to, String path, - Loader loader) { - return super.getChanges(from, to, path, new CountingLoader(loader)); - } + public void resetLoadCounter() { + loadCount = 0; + } + + @Override + public String getChanges(@Nonnull Revision from, + @Nonnull Revision to, + @Nonnull String path, + @Nullable Loader loader) { + return super.getChanges(from, to, path, new CountingLoader(loader)); + } } Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Wed Jul 1 13:37:35 2015 @@ -60,6 +60,7 @@ import com.google.common.base.Throwables import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; @@ -103,8 +104,8 @@ public class DocumentNodeStoreTest { DocumentStore docStore = new MemoryDocumentStore(); DocumentStore testStore = new TimingDocumentStoreWrapper(docStore) { @Override - public CacheInvalidationStats invalidateCache() { - super.invalidateCache(); + public CacheInvalidationStats invalidateCache(Iterable<String> keys) { + super.invalidateCache(keys); semaphore.acquireUninterruptibly(); semaphore.release(); return null; @@ -1662,7 +1663,7 @@ public class DocumentNodeStoreTest { merge(ns, builder); Revision to = ns.getHeadRevision(); - DiffCache.Entry entry = ns.getDiffCache().newEntry(from, to); + DiffCache.Entry entry = ns.getDiffCache().newEntry(from, to, true); entry.append("/", "-\"foo\""); entry.done(); Copied: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (from r1685977, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java) URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java?p2=jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java&r1=1685977&r2=1688649&rev=1688649&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java Wed Jul 1 13:37:35 2015 @@ -52,11 +52,12 @@ public class JournalEntryTest { add(sort, paths); Revision from = new Revision(1, 0, 1); Revision to = new Revision(2, 0, 1); + sort.sort(); JournalEntry.applyTo(sort, cache, from, to); for (String p : paths) { String changes = cache.getChanges(from, to, p, null); - assertNotNull(changes); + assertNotNull("missing changes for " + p, changes); for (String c : getChildren(changes)) { assertTrue(paths.contains(PathUtils.concat(p, c))); } Copied: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (from r1684820, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java) URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java?p2=jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java&r1=1684820&r2=1688649&rev=1688649&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java Wed Jul 1 13:37:35 2015 @@ -16,23 +16,16 @@ */ package org.apache.jackrabbit.oak.plugins.document; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; -import org.apache.jackrabbit.oak.plugins.document.util.Utils; -import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; @@ -40,26 +33,18 @@ import org.apache.jackrabbit.oak.spi.com import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.mongodb.DB; - -public class JournalTest { +import static java.util.Collections.synchronizedList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; - private static final boolean MONGO_DB = false; -// private static final boolean MONGO_DB = true; - - private TestBuilder builder; +public class JournalTest extends AbstractJournalTest { private MemoryDocumentStore ds; private MemoryBlobStore bs; - private List<DocumentMK> mks = Lists.newArrayList(); - class DiffingObserver implements Observer, Runnable, NodeStateDiff { final List<DocumentNodeState> incomingRootStates1 = Lists.newArrayList(); @@ -126,7 +111,6 @@ public class JournalTest { incomingRootStates1.wait(); } catch (InterruptedException e) { // ignore - continue; } } newRoot = incomingRootStates1.remove(0); @@ -237,7 +221,7 @@ public class JournalTest { observer.clear(); countingDocStore1.resetCounters(); countingDocStore2.resetCounters(); - countingDocStore1.printStacks = true; + // countingDocStore1.printStacks = true; countingDiffCache1.resetLoadCounter(); countingDiffCache2.resetLoadCounter(); @@ -257,7 +241,7 @@ public class JournalTest { assertEquals(0, countingDiffCache1.getLoadCount()); // let node 1 read those changes - System.err.println("run background ops"); + // System.err.println("run background ops"); ns1.runBackgroundOperations(); mk2.commit("/", "+\"regular5\": {}", null, null); ns2.runBackgroundOperations(); @@ -329,7 +313,7 @@ public class JournalTest { doLastRevRecoveryJournalTest(true); } - void doLastRevRecoveryJournalTest(boolean testConcurrency) throws Exception { + private void doLastRevRecoveryJournalTest(boolean testConcurrency) throws Exception { DocumentMK mk1 = createMK(0 /*clusterId via clusterNodes collection*/, 0); DocumentNodeStore ds1 = mk1.getNodeStore(); int c1Id = ds1.getClusterId(); @@ -405,8 +389,7 @@ public class JournalTest { // just some no-ops: recovery.recover(c2Id); - List<NodeDocument> emptyList = new LinkedList<NodeDocument>(); - recovery.recover(emptyList.iterator(), c2Id); + recovery.recover(Iterators.<NodeDocument>emptyIterator(), c2Id); assertJournalEntries(ds1, "{}", change1); // unchanged assertJournalEntries(ds2, "{}", change2, change2b); @@ -417,8 +400,8 @@ public class JournalTest { final CountDownLatch ready = new CountDownLatch(NUM_THREADS); final CountDownLatch start = new CountDownLatch(1); final CountDownLatch end = new CountDownLatch(NUM_THREADS); - for(int i=0; i<NUM_THREADS; i++) { - final List<Throwable> throwables = new LinkedList<Throwable>(); + final List<Exception> exceptions = synchronizedList(new ArrayList<Exception>()); + for (int i = 0; i < NUM_THREADS; i++) { Thread th = new Thread(new Runnable() { @Override @@ -427,10 +410,8 @@ public class JournalTest { ready.countDown(); start.await(); recovery.recover(Iterators.forArray(x1,z1), c2Id); - } catch (Throwable e) { - synchronized(throwables) { - throwables.add(e); - } + } catch (Exception e) { + exceptions.add(e); } finally { end.countDown(); } @@ -444,118 +425,19 @@ public class JournalTest { assertTrue(end.await(20, TimeUnit.SECONDS)); assertJournalEntries(ds1, "{}", change1); // unchanged assertJournalEntries(ds2, "{}", change2, change2b); - } - } - - void assertJournalEntries(DocumentNodeStore ds, String... expectedChanges) { - List<String> exp = new LinkedList<String>(Arrays.asList(expectedChanges)); - for(boolean branch : new Boolean[]{false, true}) { - String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch)); - String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch)); - List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, expectedChanges.length+5); - if (entries.size()>0) { - for (Iterator<JournalEntry> it = entries.iterator(); it.hasNext();) { - JournalEntry journalEntry = it.next(); - if (!exp.remove(journalEntry.get("_c"))) { - fail("Found an unexpected change: "+journalEntry.get("_c")+", while all I expected was: "+expectedChanges); - } - } + for (Exception ex : exceptions) { + throw ex; } } - if (exp.size()>0) { - fail("Did not find all expected changes, left over: "+exp+" (from original list which is: "+expectedChanges+")"); - } - } - - int countJournalEntries(DocumentNodeStore ds, int max) { - int total = 0; - for(boolean branch : new Boolean[]{false, true}) { - String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch)); - String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch)); - List<JournalEntry> entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, max); - total+=entries.size(); - } - return total; - } - - private NodeDocument getDocument(DocumentNodeStore nodeStore, String path) { - return nodeStore.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path)); - } - - @Before - @After - public void clear() { - for (DocumentMK mk : mks) { - mk.dispose(); - } - mks.clear(); - if (MONGO_DB) { - DB db = MongoUtils.getConnection().getDB(); - MongoUtils.dropCollections(db); - } - } - - private final class TestBuilder extends DocumentMK.Builder { - private CountingDocumentStore actualStore; - private CountingTieredDiffCache actualDiffCache; - - @Override - public DocumentStore getDocumentStore() { - if (actualStore==null) { - actualStore = new CountingDocumentStore(super.getDocumentStore()); - } - return actualStore; - } - - @Override - public DiffCache getDiffCache() { - if (actualDiffCache==null) { - actualDiffCache = new CountingTieredDiffCache(this); - } - return actualDiffCache; - } } private DocumentMK createMK(int clusterId, int asyncDelay) { - if (MONGO_DB) { - DB db = MongoUtils.getConnection(/*"oak-observation"*/).getDB(); - builder = newDocumentMKBuilder(); - return register(builder.setMongoDB(db) - .setClusterId(clusterId).setAsyncDelay(asyncDelay).open()); - } else { - if (ds == null) { - ds = new MemoryDocumentStore(); - } - if (bs == null) { - bs = new MemoryBlobStore(); - } - return createMK(clusterId, asyncDelay, ds, bs); + if (ds == null) { + ds = new MemoryDocumentStore(); } - } - - private TestBuilder newDocumentMKBuilder() { - return new TestBuilder(); - } - - private DocumentMK createMK(int clusterId, int asyncDelay, - DocumentStore ds, BlobStore bs) { - builder = newDocumentMKBuilder(); - return register(builder.setDocumentStore(ds) - .setBlobStore(bs).setClusterId(clusterId) - .setAsyncDelay(asyncDelay).open()); - } - - private DocumentMK register(DocumentMK mk) { - mks.add(mk); - return mk; - } - - private void disposeMK(DocumentMK mk) { - mk.dispose(); - for (int i = 0; i < mks.size(); i++) { - if (mks.get(i) == mk) { - mks.remove(i); - } + if (bs == null) { + bs = new MemoryBlobStore(); } + return createMK(clusterId, asyncDelay, ds, bs); } } Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java (original) +++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java Wed Jul 1 13:37:35 2015 @@ -88,7 +88,7 @@ public class MongoDiffCacheTest { MongoDiffCache diffCache = new MongoDiffCache(db, 32, new DocumentMK.Builder()); DiffCache.Entry entry = diffCache.newEntry( - new Revision(1, 0, 1), new Revision(2, 0, 1)); + new Revision(1, 0, 1), new Revision(2, 0, 1), false); for (int i = 0; i < 100; i++) { for (int j = 0; j < 100; j++) { for (int k = 0; k < 64; k++) { Modified: jackrabbit/oak/branches/1.2/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original) +++ jackrabbit/oak/branches/1.2/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Wed Jul 1 13:37:35 2015 @@ -60,6 +60,7 @@ import org.apache.jackrabbit.oak.spi.whi import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor; import org.apache.jackrabbit.oak.stats.StatisticManager; +import org.apache.jackrabbit.oak.util.PerfLogger; import org.apache.jackrabbit.stats.TimeSeriesMax; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +74,8 @@ import org.slf4j.LoggerFactory; */ class ChangeProcessor implements Observer { private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class); + private static final PerfLogger PERF_LOGGER = new PerfLogger( + LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf")); /** * Fill ratio of the revision queue at which commits should be delayed @@ -289,6 +292,7 @@ class ChangeProcessor implements Observe public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) { if (previousRoot != null) { try { + long start = PERF_LOGGER.start(); FilterProvider provider = filterProvider.get(); // FIXME don't rely on toString for session id if (provider.includeCommit(contentSession.toString(), info)) { @@ -306,6 +310,9 @@ class ChangeProcessor implements Observe } } } + PERF_LOGGER.end(start, 100, + "Generated events (before: {}, after: {})", + previousRoot, root); } catch (Exception e) { LOG.warn("Error while dispatching observation events for " + tracker, e); } Modified: jackrabbit/oak/branches/1.2/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java?rev=1688649&r1=1688648&r2=1688649&view=diff ============================================================================== --- jackrabbit/oak/branches/1.2/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java (original) +++ jackrabbit/oak/branches/1.2/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java Wed Jul 1 13:37:35 2015 @@ -26,6 +26,7 @@ import static javax.jcr.observation.Even import static javax.jcr.observation.Event.PROPERTY_ADDED; import static javax.jcr.observation.Event.PROPERTY_CHANGED; import static javax.jcr.observation.Event.PROPERTY_REMOVED; +import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.getServices; import java.util.List; import java.util.concurrent.Callable; @@ -51,13 +52,12 @@ import com.google.common.collect.Lists; import org.apache.jackrabbit.commons.JcrUtils; import org.apache.jackrabbit.oak.Oak; -import org.apache.jackrabbit.oak.api.jmx.RepositoryStatsMBean; import org.apache.jackrabbit.oak.fixture.JcrCreator; import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture; import org.apache.jackrabbit.oak.fixture.RepositoryFixture; import org.apache.jackrabbit.oak.jcr.Jcr; +import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean; import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; -import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils; public class ObservationTest extends Benchmark { public static final int EVENT_TYPES = NODE_ADDED | NODE_REMOVED | NODE_MOVED | @@ -67,7 +67,7 @@ public class ObservationTest extends Ben private static final int OUTPUT_RESOLUTION = 100; private static final int LISTENER_COUNT = Integer.getInteger("listenerCount", 100); private static final int WRITER_COUNT = Integer.getInteger("writerCount", 1); - private static final String PATH_FILTER = System.getProperty("pathFilter", "/"); + private static final String PATH_FILTER = System.getProperty("pathFilter"); @Override public void run(Iterable<RepositoryFixture> fixtures) { @@ -119,13 +119,14 @@ public class ObservationTest extends Ben final AtomicInteger eventCount = new AtomicInteger(); final AtomicInteger nodeCount = new AtomicInteger(); - Session[] sessions = new Session[LISTENER_COUNT]; - EventListener[] listeners = new Listener[LISTENER_COUNT]; + List<Session> sessions = Lists.newArrayList(); + List<EventListener> listeners = Lists.newArrayList(); List<String> testPaths = Lists.newArrayList(); Session s = createSession(repository); + String path = "/path/to/observation/benchmark-" + AbstractTest.TEST_ID; try { - Node testRoot = s.getRootNode().addNode("path").addNode("to").addNode("observation").addNode("benchmark"); + Node testRoot = JcrUtils.getOrCreateByPath(path, null, s); for (int i = 0; i < WRITER_COUNT; i++) { testPaths.add(testRoot.addNode("session-" + i).getPath()); } @@ -134,14 +135,18 @@ public class ObservationTest extends Ben s.logout(); } + String pathFilter = PATH_FILTER == null ? path : PATH_FILTER; + System.out.println("Path filter for event listener: " + pathFilter); ExecutorService service = Executors.newFixedThreadPool(WRITER_COUNT); try { for (int k = 0; k < LISTENER_COUNT; k++) { - sessions[k] = createSession(repository); - listeners[k] = new Listener(eventCount); - ObservationManager obsMgr = sessions[k].getWorkspace().getObservationManager(); - obsMgr.addEventListener(listeners[k], EVENT_TYPES, PATH_FILTER, true, null, null, false); + sessions.add(createSession(repository)); + listeners.add(new Listener(eventCount)); + ObservationManager obsMgr = sessions.get(k).getWorkspace().getObservationManager(); + obsMgr.addEventListener(listeners.get(k), EVENT_TYPES, pathFilter, true, null, null, false); } + // also add a listener on the root node + addRootListener(repository, sessions, listeners); List<Future<Object>> createNodes = Lists.newArrayList(); for (final String p : testPaths) { @@ -155,7 +160,7 @@ public class ObservationTest extends Ben Node testRoot = session.getNode(p); createChildren(testRoot, 100); for (Node m : JcrUtils.getChildNodes(testRoot)) { - createChildren(m, 100); + createChildren(m, 100 / WRITER_COUNT); for (Node n : JcrUtils.getChildNodes(m)) { createChildren(n, 5); } @@ -180,7 +185,7 @@ public class ObservationTest extends Ben })); } - System.out.println("ms #node nodes/s #event event/s event ratio queue"); + System.out.println("ms #node nodes/s #event event/s event-ratio queue external"); while (!isDone(createNodes) || (eventCount.get() / LISTENER_COUNT < nodeCount.get() * EVENTS_PER_NODE)) { long t0 = System.currentTimeMillis(); Thread.sleep(OUTPUT_RESOLUTION); @@ -188,38 +193,51 @@ public class ObservationTest extends Ben int nc = nodeCount.get(); int ec = eventCount.get() / LISTENER_COUNT; - long ql = getObservationQueueMaxLength(whiteboard); + int[] ql = getObservationQueueLength(whiteboard); double nps = (double) nc / t * 1000; double eps = (double) ec / t * 1000; double epn = (double) ec / nc / EVENTS_PER_NODE; System.out.format( - "%7d %7d %7.1f %7d %7.1f %1.2f %7d%n", - t, nc, nps, ec, eps, epn, ql); + "%7d %7d %7.1f %7d %7.1f %7.2f %7d %7d%n", + t, nc, nps, ec, eps, epn, ql[0], ql[1]); } get(createNodes); } finally { - for (int k = 0; k < LISTENER_COUNT; k++) { - sessions[k].getWorkspace().getObservationManager().removeEventListener(listeners[k]); - sessions[k].logout(); + for (int k = 0; k < sessions.size(); k++) { + sessions.get(k).getWorkspace().getObservationManager() + .removeEventListener(listeners.get(k)); + sessions.get(k).logout(); } service.shutdown(); service.awaitTermination(1, TimeUnit.MINUTES); } } - private static long getObservationQueueMaxLength(@Nullable Whiteboard whiteboard) { - if (whiteboard == null) { - return -1; - } - List<RepositoryStatsMBean> stats = WhiteboardUtils.getServices( - whiteboard, RepositoryStatsMBean.class); - for (RepositoryStatsMBean bean : stats) { - long[] values = (long[]) bean.getObservationQueueMaxLength().get("per second"); - return values[values.length - 1]; + private void addRootListener(Repository repository, + List<Session> sessions, + List<EventListener> listeners) + throws RepositoryException { + Session s = createSession(repository); + sessions.add(s); + Listener listener = new Listener(new AtomicInteger()); + ObservationManager obsMgr = s.getWorkspace().getObservationManager(); + obsMgr.addEventListener(listener, EVENT_TYPES, "/", true, null, null, false); + listeners.add(listener); + } + + private static int[] getObservationQueueLength(@Nullable Whiteboard wb) { + if (wb == null) { + return new int[]{-1, -1}; + } + int len = -1; + int ext = -1; + for (BackgroundObserverMBean bean : getServices(wb, BackgroundObserverMBean.class)) { + len = Math.max(bean.getQueueSize(), len); + ext = Math.max(bean.getExternalEventCount(), ext); } - return -1; + return new int[]{len, ext}; } private static boolean isDone(Iterable<Future<Object>> futures) {
