Author: mreutegg
Date: Tue Apr  7 08:20:59 2015
New Revision: 1671757

URL: http://svn.apache.org/r1671757
Log:
OAK-2717: Report maximum observation queue length in ObservationTest benchmark

Report maximum queue length if available
Number of writers and path filter is now configurable

Modified:
    
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java

Modified: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java?rev=1671757&r1=1671756&r2=1671757&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
 Tue Apr  7 08:20:59 2015
@@ -27,11 +27,17 @@ import static javax.jcr.observation.Even
 import static javax.jcr.observation.Event.PROPERTY_CHANGED;
 import static javax.jcr.observation.Event.PROPERTY_REMOVED;
 
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import javax.annotation.Nullable;
 import javax.jcr.Node;
 import javax.jcr.Repository;
 import javax.jcr.RepositoryException;
@@ -41,8 +47,17 @@ import javax.jcr.observation.EventIterat
 import javax.jcr.observation.EventListener;
 import javax.jcr.observation.ObservationManager;
 
+import com.google.common.collect.Lists;
+
 import org.apache.jackrabbit.commons.JcrUtils;
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.api.jmx.RepositoryStatsMBean;
+import org.apache.jackrabbit.oak.fixture.JcrCreator;
+import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture;
 import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
 
 public class ObservationTest extends Benchmark {
     public static final int EVENT_TYPES = NODE_ADDED | NODE_REMOVED | 
NODE_MOVED |
@@ -51,6 +66,8 @@ public class ObservationTest extends Ben
     private static final int SAVE_INTERVAL = 
Integer.getInteger("saveInterval", 100);
     private static final int OUTPUT_RESOLUTION = 100;
     private static final int LISTENER_COUNT = 
Integer.getInteger("listenerCount", 100);
+    private static final int WRITER_COUNT = Integer.getInteger("writerCount", 
1);
+    private static final String PATH_FILTER = System.getProperty("pathFilter", 
"/");
 
     @Override
     public void run(Iterable<RepositoryFixture> fixtures) {
@@ -58,9 +75,21 @@ public class ObservationTest extends Ben
             if (fixture.isAvailable(1)) {
                 System.out.format("%s: Observation throughput benchmark%n", 
fixture);
                 try {
-                    Repository[] cluster = fixture.setUpCluster(1);
+                    final AtomicReference<Whiteboard> whiteboardRef = new 
AtomicReference<Whiteboard>();
+                    Repository[] cluster;
+                    if (fixture instanceof OakRepositoryFixture) {
+                        cluster = ((OakRepositoryFixture) 
fixture).setUpCluster(1, new JcrCreator() {
+                            @Override
+                            public Jcr customize(Oak oak) {
+                                whiteboardRef.set(oak.getWhiteboard());
+                                return new Jcr(oak);
+                            }
+                        });
+                    } else {
+                        cluster = fixture.setUpCluster(1);
+                    }
                     try {
-                        run(cluster[0]);
+                        run(cluster[0], whiteboardRef.get());
                     } finally {
                         fixture.tearDownCluster();
                     }
@@ -71,18 +100,20 @@ public class ObservationTest extends Ben
         }
     }
 
-    private void run(Repository repository) throws RepositoryException, 
ExecutionException, InterruptedException {
+    private void run(Repository repository, @Nullable Whiteboard whiteboard)
+            throws RepositoryException, ExecutionException, 
InterruptedException {
         Session session = createSession(repository);
         long t0 = System.currentTimeMillis();
         try {
-            observationThroughput(repository);
+            observationThroughput(repository, whiteboard);
         } finally {
             System.out.println("Time elapsed: " + (System.currentTimeMillis() 
- t0) + " ms");
             session.logout();
         }
     }
 
-    public void observationThroughput(final Repository repository)
+    public void observationThroughput(final Repository repository,
+                                      @Nullable Whiteboard whiteboard)
             throws RepositoryException, InterruptedException, 
ExecutionException {
         long t = 0;
         final AtomicInteger eventCount = new AtomicInteger();
@@ -91,67 +122,119 @@ public class ObservationTest extends Ben
         Session[] sessions = new Session[LISTENER_COUNT];
         EventListener[] listeners = new Listener[LISTENER_COUNT];
 
+        List<String> testPaths = Lists.newArrayList();
+        Session s = createSession(repository);
+        try {
+            Node testRoot = 
s.getRootNode().addNode("path").addNode("to").addNode("observation").addNode("benchmark");
+            for (int i = 0; i < WRITER_COUNT; i++) {
+                testPaths.add(testRoot.addNode("session-" + i).getPath());
+            }
+            s.save();
+        } finally {
+            s.logout();
+        }
+
+        ExecutorService service = Executors.newFixedThreadPool(WRITER_COUNT);
         try {
             for (int k = 0; k < LISTENER_COUNT; k++) {
                 sessions[k] = createSession(repository);
                 listeners[k] = new Listener(eventCount);
                 ObservationManager obsMgr = 
sessions[k].getWorkspace().getObservationManager();
-                obsMgr.addEventListener(listeners[k], EVENT_TYPES, "/", true, 
null, null, false);
+                obsMgr.addEventListener(listeners[k], EVENT_TYPES, 
PATH_FILTER, true, null, null, false);
             }
 
-            Future<?> createNodes = 
Executors.newSingleThreadExecutor().submit(new Runnable() {
-                private final Session session = repository.login(new 
SimpleCredentials("admin", "admin".toCharArray()));
-
-                @Override
-                public void run() {
-                    try {
-                        Node testRoot = 
session.getRootNode().addNode("observationBenchmark");
-                        createChildren(testRoot, 100);
-                        for (Node m : JcrUtils.getChildNodes(testRoot)) {
-                            createChildren(m, 100);
-                            for (Node n : JcrUtils.getChildNodes(m)) {
-                                createChildren(n, 5);
+            List<Future<Object>> createNodes = Lists.newArrayList();
+            for (final String p : testPaths) {
+                createNodes.add(service.submit(new Callable<Object>() {
+                    private final Session session = createSession(repository);
+                    private int numNodes = 0;
+
+                    @Override
+                    public Object call() throws Exception {
+                        try {
+                            Node testRoot = session.getNode(p);
+                            createChildren(testRoot, 100);
+                            for (Node m : JcrUtils.getChildNodes(testRoot)) {
+                                createChildren(m, 100);
+                                for (Node n : JcrUtils.getChildNodes(m)) {
+                                    createChildren(n, 5);
+                                }
                             }
+                            session.save();
+                        } finally {
+                            session.logout();
                         }
-                        session.save();
-                    } catch (RepositoryException e) {
-                        throw new RuntimeException(e);
-                    } finally {
-                        session.logout();
+                        return null;
                     }
-                }
 
-                private void createChildren(Node node, int count) throws 
RepositoryException {
-                    for (int c = 0; c < count; c++) {
-                        node.addNode("n" + c);
-                        if (nodeCount.incrementAndGet() % SAVE_INTERVAL == 0) {
-                            node.getSession().save();
+                    private void createChildren(Node node, int count)
+                            throws RepositoryException {
+                        for (int c = 0; c < count; c++) {
+                            node.addNode("n" + c);
+                            nodeCount.incrementAndGet();
+                            if (++numNodes % SAVE_INTERVAL == 0) {
+                                node.getSession().save();
+                            }
                         }
                     }
-                }
-            });
+                }));
+            }
 
-            System.out.println("ms      #node   nodes/s #event  event/s event 
ratio");
-            while (!createNodes.isDone() || (eventCount.get() < 
nodeCount.get() * EVENTS_PER_NODE)) {
+            System.out.println("ms      #node   nodes/s #event  event/s event 
ratio queue");
+            while (!isDone(createNodes) || (eventCount.get() / LISTENER_COUNT 
< nodeCount.get() * EVENTS_PER_NODE)) {
                 long t0 = System.currentTimeMillis();
                 Thread.sleep(OUTPUT_RESOLUTION);
                 t += System.currentTimeMillis() - t0;
 
                 int nc = nodeCount.get();
                 int ec = eventCount.get() / LISTENER_COUNT;
+                long ql = getObservationQueueMaxLength(whiteboard);
 
                 double nps = (double) nc / t * 1000;
                 double eps = (double) ec / t * 1000;
                 double epn = (double) ec / nc / EVENTS_PER_NODE;
 
-                System.out.format("%7d %7d %7.1f %7d %7.1f %1.2f%n", t, nc, 
nps, ec, eps, epn);
+                System.out.format(
+                        "%7d %7d %7.1f %7d %7.1f %1.2f %7d%n",
+                           t, nc,  nps, ec,  eps,  epn, ql);
             }
-            createNodes.get();
+            get(createNodes);
         } finally {
             for (int k = 0; k < LISTENER_COUNT; k++) {
                 
sessions[k].getWorkspace().getObservationManager().removeEventListener(listeners[k]);
                 sessions[k].logout();
             }
+            service.shutdown();
+            service.awaitTermination(1, TimeUnit.MINUTES);
+        }
+    }
+
+    private static long getObservationQueueMaxLength(@Nullable Whiteboard 
whiteboard) {
+        if (whiteboard == null) {
+            return -1;
+        }
+        List<RepositoryStatsMBean> stats = WhiteboardUtils.getServices(
+                whiteboard, RepositoryStatsMBean.class);
+        for (RepositoryStatsMBean bean : stats) {
+            long[] values = (long[]) 
bean.getObservationQueueMaxLength().get("per second");
+            return values[values.length - 1];
+        }
+        return -1;
+    }
+
+    private static boolean isDone(Iterable<Future<Object>> futures) {
+        for (Future f : futures) {
+            if (!f.isDone()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static void get(Iterable<Future<Object>> futures)
+            throws ExecutionException, InterruptedException {
+        for (Future f : futures) {
+            f.get();
         }
     }
 


Reply via email to