Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java Tue Aug 11 12:55:41 2015 @@ -35,6 +35,7 @@ import org.apache.jackrabbit.oak.plugins import org.junit.Ignore; import org.junit.Test; +import static org.apache.jackrabbit.oak.plugins.document.UnsavedModifications.Snapshot.IGNORE; import static org.junit.Assert.assertEquals; public class UnsavedModificationsTest { @@ -96,7 +97,7 @@ public class UnsavedModificationsTest { public void run() { while (exceptions.isEmpty()) { try { - mod.persist(ns, new ReentrantLock()); + mod.persist(ns, IGNORE, new ReentrantLock()); Thread.sleep(10); } catch (Exception e) { exceptions.add(e); @@ -169,7 +170,7 @@ public class UnsavedModificationsTest { paths.clear(); } if (random.nextFloat() < 0.00005) { - pending.persist(ns, new ReentrantLock()); + pending.persist(ns, IGNORE, new ReentrantLock()); } } } @@ -220,7 +221,7 @@ public class UnsavedModificationsTest { } // drain pending, this will force it back to in-memory - pending.persist(ns, new ReentrantLock()); + pending.persist(ns, IGNORE, new ReentrantLock()); // loop over remaining paths while (paths.hasNext()) { @@ -257,7 +258,7 @@ public class UnsavedModificationsTest { } // drain pending, this will force it back to in-memory - pending.persist(ns, new ReentrantLock()); + pending.persist(ns, IGNORE, new ReentrantLock()); // loop over remaining paths while (paths.hasNext()) {
Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java?rev=1695297&view=auto ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java (added) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java Tue Aug 11 12:55:41 2015 @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import com.mongodb.DB; + +import org.apache.jackrabbit.oak.cache.CacheStats; +import org.apache.jackrabbit.oak.plugins.document.AbstractJournalTest; +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; +import org.apache.jackrabbit.oak.plugins.document.DocumentStore; +import org.apache.jackrabbit.oak.plugins.document.JournalGarbageCollector; +import org.apache.jackrabbit.oak.plugins.document.MongoUtils; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection; +import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; +import org.apache.jackrabbit.oak.stats.Clock; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + +public class JournalIT extends AbstractJournalTest { + + private static final Logger LOG = LoggerFactory.getLogger(JournalIT.class); + + @BeforeClass + public static void checkMongoDbAvailable() { + Assume.assumeNotNull(MongoUtils.getConnection()); + } + + @Before + @After + public void dropCollections() throws Exception { + MongoConnection mongoConnection = MongoUtils.getConnection(); + MongoUtils.dropCollections(mongoConnection.getDB()); + mongoConnection.close(); + } + + @Test + public void cacheInvalidationTest() throws Exception { + final DocumentNodeStore ns1 = createMK(1, 0).getNodeStore(); + final DocumentNodeStore ns2 = createMK(2, 0).getNodeStore(); + LOG.info("cache size 1: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount())); + + // invalidate both caches under test first + invalidateDocChildrenCache(ns1); + ns1.getDocumentStore().invalidateCache(); + + { + DocumentStore s = ns1.getDocumentStore(); + CacheStats cacheStats = s.getCacheStats(); + LOG.info("m.size="+(cacheStats==null ? "null" : cacheStats.getElementCount())); + } + LOG.info("cache size 2: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount())); + + // first create child node in instance 1 + final List<String> paths = createRandomPaths(1, 5000000, 1000); + int i=0; + for(String path : paths) { + if (i++%100==0) { + LOG.info("at "+i); + } + getOrCreate(ns1, path, false); + } + final List<String> paths2 = createRandomPaths(20, 2345, 100); + getOrCreate(ns1, paths2, false); + ns1.runBackgroundOperations(); + for(String path : paths) { + assertDocCache(ns1, true, path); + } + + { + DocumentStore s = ns1.getDocumentStore(); + CacheStats cacheStats = s.getCacheStats(); + LOG.info("m.size="+(cacheStats==null ? "null" : cacheStats.getElementCount())); + } + + LOG.info("cache size 2: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount())); + long time = System.currentTimeMillis(); + for(int j=0; j<100; j++) { + long now = System.currentTimeMillis(); + LOG.info("loop "+j+", "+(now-time)+"ms"); + time = now; + final Set<String> electedPaths = choose(paths2, random.nextInt(30)); + { + // choose a random few from above created paths and modify them + final long t1 = System.currentTimeMillis(); + ns2.runBackgroundOperations(); // make sure ns2 has the latest from ns1 + final long t2 = System.currentTimeMillis(); + LOG.info("ns2 background took "+(t2-t1)+"ms"); + + for(String electedPath : electedPaths) { + // modify /child in another instance 2 + setProperty(ns2, electedPath, "p", "ns2"+System.currentTimeMillis(), false); + } + final long t3 = System.currentTimeMillis(); + LOG.info("setting props "+(t3-t2)+"ms"); + + ns2.runBackgroundOperations(); + final long t4 = System.currentTimeMillis(); + LOG.info("ns2 background took2 "+(t4-t3)+"ms"); + } + + // that should not have changed the fact that we have it cached in 'ns1' + for(String electedPath : electedPaths) { + assertDocCache(ns1, true, electedPath); + } + + // doing a backgroundOp now should trigger invalidation + // which thx to the external modification will remove the entry from the cache: + ns1.runBackgroundOperations(); + for(String electedPath : electedPaths) { + assertDocCache(ns1, false, electedPath); + } + + // when I access it again with 'ns1', then it gets cached again: + for(String electedPath : electedPaths) { + getOrCreate(ns1, electedPath, false); + assertDocCache(ns1, true, electedPath); + } + } + } + + @Test + public void largeCleanupTest() throws Exception { + // create more than DELETE_BATCH_SIZE of entries and clean them up + // should make sure to loop in JournalGarbageCollector.gc such + // that it would find issue described here: + // https://issues.apache.org/jira/browse/OAK-2829?focusedCommentId=14585733&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14585733 + + doLargeCleanupTest(0, 100); + doLargeCleanupTest(200, 1000);// using offset as to not make sure to always create new entries + doLargeCleanupTest(2000, 10000); + doLargeCleanupTest(20000, 30000); // using 'size' much larger than 30k will be tremendously slow due to ordered node + } + + @Test + public void simpleCacheInvalidationTest() throws Exception { + final DocumentNodeStore ns1 = createMK(1, 0).getNodeStore(); + final DocumentNodeStore ns2 = createMK(2, 0).getNodeStore(); + + // invalidate both caches under test first + invalidateDocChildrenCache(ns1); + ns1.getDocumentStore().invalidateCache(); + + // first create child node in instance 1 + getOrCreate(ns1, "/child", true); + assertDocCache(ns1, true, "/child"); + + { + // modify /child in another instance 2 + ns2.runBackgroundOperations(); // read latest changes from ns1 + setProperty(ns2, "/child", "p", "ns2"+System.currentTimeMillis(), true); + } + // that should not have changed the fact that we have it cached in 'ns' + assertDocCache(ns1, true, "/child"); + + // doing a backgroundOp now should trigger invalidation + // which thx to the external modification will remove the entry from the cache: + ns1.runBackgroundOperations(); + assertDocCache(ns1, false, "/child"); + + // when I access it again with 'ns', then it gets cached again: + getOrCreate(ns1, "/child", false); + assertDocCache(ns1, true, "/child"); + } + + private void doLargeCleanupTest(int offset, int size) throws Exception { + Clock clock = new Clock.Virtual(); + DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0, + new MemoryDocumentStore(), new MemoryBlobStore()); + DocumentNodeStore ns1 = mk1.getNodeStore(); + // make sure we're visible and marked as active + renewClusterIdLease(ns1); + JournalGarbageCollector gc = new JournalGarbageCollector(ns1); + clock.getTimeIncreasing(); + clock.getTimeIncreasing(); + gc.gc(0, TimeUnit.MILLISECONDS); // cleanup everything that might still be there + + // create entries as parametrized: + for(int i=offset; i<size+offset; i++) { + mk1.commit("/", "+\"regular"+i+"\": {}", null, null); + // always run background ops to 'flush' the change + // into the journal: + ns1.runBackgroundOperations(); + } + Thread.sleep(100); // sleep 100millis + assertEquals(size, gc.gc(0, TimeUnit.MILLISECONDS)); // should now be able to clean up everything + } + + protected DocumentMK createMK(int clusterId, int asyncDelay) { + DB db = MongoUtils.getConnection().getDB(); + builder = newDocumentMKBuilder(); + return register(builder.setMongoDB(db) + .setClusterId(clusterId).setAsyncDelay(asyncDelay).open()); + } + +} Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java Tue Aug 11 12:55:41 2015 @@ -86,7 +86,7 @@ public class MongoDiffCacheTest { MongoDiffCache diffCache = new MongoDiffCache(db, 32, new DocumentMK.Builder()); DiffCache.Entry entry = diffCache.newEntry( - new Revision(1, 0, 1), new Revision(2, 0, 1)); + new Revision(1, 0, 1), new Revision(2, 0, 1), false); for (int i = 0; i < 100; i++) { for (int j = 0; j < 100; j++) { for (int k = 0; k < 64; k++) { Modified: jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original) +++ jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Tue Aug 11 12:55:41 2015 @@ -60,6 +60,7 @@ import org.apache.jackrabbit.oak.spi.whi import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor; import org.apache.jackrabbit.oak.stats.StatisticManager; import org.apache.jackrabbit.oak.stats.TimeSeriesMax; +import org.apache.jackrabbit.oak.util.PerfLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,8 @@ import org.slf4j.LoggerFactory; */ class ChangeProcessor implements Observer { private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class); + private static final PerfLogger PERF_LOGGER = new PerfLogger( + LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf")); /** * Fill ratio of the revision queue at which commits should be delayed @@ -280,6 +283,7 @@ class ChangeProcessor implements Observe public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) { if (previousRoot != null) { try { + long start = PERF_LOGGER.start(); FilterProvider provider = filterProvider.get(); // FIXME don't rely on toString for session id if (provider.includeCommit(contentSession.toString(), info)) { @@ -297,6 +301,9 @@ class ChangeProcessor implements Observe } } } + PERF_LOGGER.end(start, 100, + "Generated events (before: {}, after: {})", + previousRoot, root); } catch (Exception e) { LOG.warn("Error while dispatching observation events for " + tracker, e); } Modified: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java (original) +++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java Tue Aug 11 12:55:41 2015 @@ -26,12 +26,19 @@ import static javax.jcr.observation.Even import static javax.jcr.observation.Event.PROPERTY_ADDED; import static javax.jcr.observation.Event.PROPERTY_CHANGED; import static javax.jcr.observation.Event.PROPERTY_REMOVED; +import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.getServices; +import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import javax.jcr.Node; import javax.jcr.Repository; import javax.jcr.RepositoryException; @@ -41,8 +48,16 @@ import javax.jcr.observation.EventIterat import javax.jcr.observation.EventListener; import javax.jcr.observation.ObservationManager; +import com.google.common.collect.Lists; + import org.apache.jackrabbit.commons.JcrUtils; +import org.apache.jackrabbit.oak.Oak; +import org.apache.jackrabbit.oak.fixture.JcrCreator; +import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture; import org.apache.jackrabbit.oak.fixture.RepositoryFixture; +import org.apache.jackrabbit.oak.jcr.Jcr; +import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean; +import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; public class ObservationTest extends Benchmark { public static final int EVENT_TYPES = NODE_ADDED | NODE_REMOVED | NODE_MOVED | @@ -51,6 +66,8 @@ public class ObservationTest extends Ben private static final int SAVE_INTERVAL = Integer.getInteger("saveInterval", 100); private static final int OUTPUT_RESOLUTION = 100; private static final int LISTENER_COUNT = Integer.getInteger("listenerCount", 100); + private static final int WRITER_COUNT = Integer.getInteger("writerCount", 1); + private static final String PATH_FILTER = System.getProperty("pathFilter"); @Override public void run(Iterable<RepositoryFixture> fixtures) { @@ -58,9 +75,21 @@ public class ObservationTest extends Ben if (fixture.isAvailable(1)) { System.out.format("%s: Observation throughput benchmark%n", fixture); try { - Repository[] cluster = fixture.setUpCluster(1); + final AtomicReference<Whiteboard> whiteboardRef = new AtomicReference<Whiteboard>(); + Repository[] cluster; + if (fixture instanceof OakRepositoryFixture) { + cluster = ((OakRepositoryFixture) fixture).setUpCluster(1, new JcrCreator() { + @Override + public Jcr customize(Oak oak) { + whiteboardRef.set(oak.getWhiteboard()); + return new Jcr(oak); + } + }); + } else { + cluster = fixture.setUpCluster(1); + } try { - run(cluster[0]); + run(cluster[0], whiteboardRef.get()); } finally { fixture.tearDownCluster(); } @@ -71,88 +100,160 @@ public class ObservationTest extends Ben } } - private void run(Repository repository) throws RepositoryException, ExecutionException, InterruptedException { + private void run(Repository repository, @Nullable Whiteboard whiteboard) + throws RepositoryException, ExecutionException, InterruptedException { Session session = createSession(repository); long t0 = System.currentTimeMillis(); try { - observationThroughput(repository); + observationThroughput(repository, whiteboard); } finally { System.out.println("Time elapsed: " + (System.currentTimeMillis() - t0) + " ms"); session.logout(); } } - public void observationThroughput(final Repository repository) + public void observationThroughput(final Repository repository, + @Nullable Whiteboard whiteboard) throws RepositoryException, InterruptedException, ExecutionException { long t = 0; final AtomicInteger eventCount = new AtomicInteger(); final AtomicInteger nodeCount = new AtomicInteger(); - Session[] sessions = new Session[LISTENER_COUNT]; - EventListener[] listeners = new Listener[LISTENER_COUNT]; + List<Session> sessions = Lists.newArrayList(); + List<EventListener> listeners = Lists.newArrayList(); + List<String> testPaths = Lists.newArrayList(); + Session s = createSession(repository); + String path = "/path/to/observation/benchmark-" + AbstractTest.TEST_ID; try { - for (int k = 0; k < LISTENER_COUNT; k++) { - sessions[k] = createSession(repository); - listeners[k] = new Listener(eventCount); - ObservationManager obsMgr = sessions[k].getWorkspace().getObservationManager(); - obsMgr.addEventListener(listeners[k], EVENT_TYPES, "/", true, null, null, false); + Node testRoot = JcrUtils.getOrCreateByPath(path, null, s); + for (int i = 0; i < WRITER_COUNT; i++) { + testPaths.add(testRoot.addNode("session-" + i).getPath()); } + s.save(); + } finally { + s.logout(); + } - Future<?> createNodes = Executors.newSingleThreadExecutor().submit(new Runnable() { - private final Session session = repository.login(new SimpleCredentials("admin", "admin".toCharArray())); + String pathFilter = PATH_FILTER == null ? path : PATH_FILTER; + System.out.println("Path filter for event listener: " + pathFilter); + ExecutorService service = Executors.newFixedThreadPool(WRITER_COUNT); + try { + for (int k = 0; k < LISTENER_COUNT; k++) { + sessions.add(createSession(repository)); + listeners.add(new Listener(eventCount)); + ObservationManager obsMgr = sessions.get(k).getWorkspace().getObservationManager(); + obsMgr.addEventListener(listeners.get(k), EVENT_TYPES, pathFilter, true, null, null, false); + } + // also add a listener on the root node + addRootListener(repository, sessions, listeners); - @Override - public void run() { - try { - Node testRoot = session.getRootNode().addNode("observationBenchmark"); - createChildren(testRoot, 100); - for (Node m : JcrUtils.getChildNodes(testRoot)) { - createChildren(m, 100); - for (Node n : JcrUtils.getChildNodes(m)) { - createChildren(n, 5); + List<Future<Object>> createNodes = Lists.newArrayList(); + for (final String p : testPaths) { + createNodes.add(service.submit(new Callable<Object>() { + private final Session session = createSession(repository); + private int numNodes = 0; + + @Override + public Object call() throws Exception { + try { + Node testRoot = session.getNode(p); + createChildren(testRoot, 100); + for (Node m : JcrUtils.getChildNodes(testRoot)) { + createChildren(m, 100 / WRITER_COUNT); + for (Node n : JcrUtils.getChildNodes(m)) { + createChildren(n, 5); + } } + session.save(); + } finally { + session.logout(); } - session.save(); - } catch (RepositoryException e) { - throw new RuntimeException(e); - } finally { - session.logout(); + return null; } - } - private void createChildren(Node node, int count) throws RepositoryException { - for (int c = 0; c < count; c++) { - node.addNode("n" + c); - if (nodeCount.incrementAndGet() % SAVE_INTERVAL == 0) { - node.getSession().save(); + private void createChildren(Node node, int count) + throws RepositoryException { + for (int c = 0; c < count; c++) { + node.addNode("n" + c); + nodeCount.incrementAndGet(); + if (++numNodes % SAVE_INTERVAL == 0) { + node.getSession().save(); + } } } - } - }); + })); + } - System.out.println("ms #node nodes/s #event event/s event ratio"); - while (!createNodes.isDone() || (eventCount.get() < nodeCount.get() * EVENTS_PER_NODE)) { + System.out.println("ms #node nodes/s #event event/s event-ratio queue external"); + while (!isDone(createNodes) || (eventCount.get() / LISTENER_COUNT < nodeCount.get() * EVENTS_PER_NODE)) { long t0 = System.currentTimeMillis(); Thread.sleep(OUTPUT_RESOLUTION); t += System.currentTimeMillis() - t0; int nc = nodeCount.get(); int ec = eventCount.get() / LISTENER_COUNT; + int[] ql = getObservationQueueLength(whiteboard); double nps = (double) nc / t * 1000; double eps = (double) ec / t * 1000; double epn = (double) ec / nc / EVENTS_PER_NODE; - System.out.format("%7d %7d %7.1f %7d %7.1f %1.2f%n", t, nc, nps, ec, eps, epn); + System.out.format( + "%7d %7d %7.1f %7d %7.1f %7.2f %7d %7d%n", + t, nc, nps, ec, eps, epn, ql[0], ql[1]); } - createNodes.get(); + get(createNodes); } finally { - for (int k = 0; k < LISTENER_COUNT; k++) { - sessions[k].getWorkspace().getObservationManager().removeEventListener(listeners[k]); - sessions[k].logout(); + for (int k = 0; k < sessions.size(); k++) { + sessions.get(k).getWorkspace().getObservationManager() + .removeEventListener(listeners.get(k)); + sessions.get(k).logout(); + } + service.shutdown(); + service.awaitTermination(1, TimeUnit.MINUTES); + } + } + + private void addRootListener(Repository repository, + List<Session> sessions, + List<EventListener> listeners) + throws RepositoryException { + Session s = createSession(repository); + sessions.add(s); + Listener listener = new Listener(new AtomicInteger()); + ObservationManager obsMgr = s.getWorkspace().getObservationManager(); + obsMgr.addEventListener(listener, EVENT_TYPES, "/", true, null, null, false); + listeners.add(listener); + } + + private static int[] getObservationQueueLength(@Nullable Whiteboard wb) { + if (wb == null) { + return new int[]{-1, -1}; + } + int len = -1; + int ext = -1; + for (BackgroundObserverMBean bean : getServices(wb, BackgroundObserverMBean.class)) { + len = Math.max(bean.getQueueSize(), len); + ext = Math.max(bean.getExternalEventCount(), ext); + } + return new int[]{len, ext}; + } + + private static boolean isDone(Iterable<Future<Object>> futures) { + for (Future f : futures) { + if (!f.isDone()) { + return false; } } + return true; + } + + private static void get(Iterable<Future<Object>> futures) + throws ExecutionException, InterruptedException { + for (Future f : futures) { + f.get(); + } } private static Session createSession(Repository repository) Added: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java?rev=1695297&view=auto ============================================================================== --- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java (added) +++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java Tue Aug 11 12:55:41 2015 @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.fixture; + +import org.apache.jackrabbit.oak.Oak; +import org.apache.jackrabbit.oak.jcr.Jcr; + +public interface JcrCreator { + JcrCreator DEFAULT = new JcrCreator() { + @Override + public Jcr customize(Oak oak) { + return new Jcr(oak); + } + }; + + public Jcr customize(Oak oak); +} \ No newline at end of file Propchange: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java?rev=1695297&r1=1695296&r2=1695297&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java (original) +++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java Tue Aug 11 12:55:41 2015 @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.fixture; import java.io.File; + import javax.jcr.Repository; import org.apache.jackrabbit.api.JackrabbitRepository; @@ -90,10 +91,14 @@ public class OakRepositoryFixture implem @Override public final Repository[] setUpCluster(int n) throws Exception { + return setUpCluster(n, JcrCreator.DEFAULT); + } + + public Repository[] setUpCluster(int n, JcrCreator customizer) throws Exception { Oak[] oaks = oakFixture.setUpCluster(n); cluster = new Repository[oaks.length]; for (int i = 0; i < oaks.length; i++) { - cluster[i] = new Jcr(oaks[i]).createRepository();; + cluster[i] = customizer.customize(oaks[i]).createRepository(); } return cluster; }
