Author: mreutegg
Date: Tue Apr 7 08:20:59 2015
New Revision: 1671757
URL: http://svn.apache.org/r1671757
Log:
OAK-2717: Report maximum observation queue length in ObservationTest benchmark
Report maximum queue length if available
Number of writers and path filter is now configurable
Modified:
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
Modified:
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java?rev=1671757&r1=1671756&r2=1671757&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
(original)
+++
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
Tue Apr 7 08:20:59 2015
@@ -27,11 +27,17 @@ import static javax.jcr.observation.Even
import static javax.jcr.observation.Event.PROPERTY_CHANGED;
import static javax.jcr.observation.Event.PROPERTY_REMOVED;
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
@@ -41,8 +47,17 @@ import javax.jcr.observation.EventIterat
import javax.jcr.observation.EventListener;
import javax.jcr.observation.ObservationManager;
+import com.google.common.collect.Lists;
+
import org.apache.jackrabbit.commons.JcrUtils;
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.api.jmx.RepositoryStatsMBean;
+import org.apache.jackrabbit.oak.fixture.JcrCreator;
+import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture;
import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
public class ObservationTest extends Benchmark {
public static final int EVENT_TYPES = NODE_ADDED | NODE_REMOVED |
NODE_MOVED |
@@ -51,6 +66,8 @@ public class ObservationTest extends Ben
private static final int SAVE_INTERVAL =
Integer.getInteger("saveInterval", 100);
private static final int OUTPUT_RESOLUTION = 100;
private static final int LISTENER_COUNT =
Integer.getInteger("listenerCount", 100);
+ private static final int WRITER_COUNT = Integer.getInteger("writerCount",
1);
+ private static final String PATH_FILTER = System.getProperty("pathFilter",
"/");
@Override
public void run(Iterable<RepositoryFixture> fixtures) {
@@ -58,9 +75,21 @@ public class ObservationTest extends Ben
if (fixture.isAvailable(1)) {
System.out.format("%s: Observation throughput benchmark%n",
fixture);
try {
- Repository[] cluster = fixture.setUpCluster(1);
+ final AtomicReference<Whiteboard> whiteboardRef = new
AtomicReference<Whiteboard>();
+ Repository[] cluster;
+ if (fixture instanceof OakRepositoryFixture) {
+ cluster = ((OakRepositoryFixture)
fixture).setUpCluster(1, new JcrCreator() {
+ @Override
+ public Jcr customize(Oak oak) {
+ whiteboardRef.set(oak.getWhiteboard());
+ return new Jcr(oak);
+ }
+ });
+ } else {
+ cluster = fixture.setUpCluster(1);
+ }
try {
- run(cluster[0]);
+ run(cluster[0], whiteboardRef.get());
} finally {
fixture.tearDownCluster();
}
@@ -71,18 +100,20 @@ public class ObservationTest extends Ben
}
}
- private void run(Repository repository) throws RepositoryException,
ExecutionException, InterruptedException {
+ private void run(Repository repository, @Nullable Whiteboard whiteboard)
+ throws RepositoryException, ExecutionException,
InterruptedException {
Session session = createSession(repository);
long t0 = System.currentTimeMillis();
try {
- observationThroughput(repository);
+ observationThroughput(repository, whiteboard);
} finally {
System.out.println("Time elapsed: " + (System.currentTimeMillis()
- t0) + " ms");
session.logout();
}
}
- public void observationThroughput(final Repository repository)
+ public void observationThroughput(final Repository repository,
+ @Nullable Whiteboard whiteboard)
throws RepositoryException, InterruptedException,
ExecutionException {
long t = 0;
final AtomicInteger eventCount = new AtomicInteger();
@@ -91,67 +122,119 @@ public class ObservationTest extends Ben
Session[] sessions = new Session[LISTENER_COUNT];
EventListener[] listeners = new Listener[LISTENER_COUNT];
+ List<String> testPaths = Lists.newArrayList();
+ Session s = createSession(repository);
+ try {
+ Node testRoot =
s.getRootNode().addNode("path").addNode("to").addNode("observation").addNode("benchmark");
+ for (int i = 0; i < WRITER_COUNT; i++) {
+ testPaths.add(testRoot.addNode("session-" + i).getPath());
+ }
+ s.save();
+ } finally {
+ s.logout();
+ }
+
+ ExecutorService service = Executors.newFixedThreadPool(WRITER_COUNT);
try {
for (int k = 0; k < LISTENER_COUNT; k++) {
sessions[k] = createSession(repository);
listeners[k] = new Listener(eventCount);
ObservationManager obsMgr =
sessions[k].getWorkspace().getObservationManager();
- obsMgr.addEventListener(listeners[k], EVENT_TYPES, "/", true,
null, null, false);
+ obsMgr.addEventListener(listeners[k], EVENT_TYPES,
PATH_FILTER, true, null, null, false);
}
- Future<?> createNodes =
Executors.newSingleThreadExecutor().submit(new Runnable() {
- private final Session session = repository.login(new
SimpleCredentials("admin", "admin".toCharArray()));
-
- @Override
- public void run() {
- try {
- Node testRoot =
session.getRootNode().addNode("observationBenchmark");
- createChildren(testRoot, 100);
- for (Node m : JcrUtils.getChildNodes(testRoot)) {
- createChildren(m, 100);
- for (Node n : JcrUtils.getChildNodes(m)) {
- createChildren(n, 5);
+ List<Future<Object>> createNodes = Lists.newArrayList();
+ for (final String p : testPaths) {
+ createNodes.add(service.submit(new Callable<Object>() {
+ private final Session session = createSession(repository);
+ private int numNodes = 0;
+
+ @Override
+ public Object call() throws Exception {
+ try {
+ Node testRoot = session.getNode(p);
+ createChildren(testRoot, 100);
+ for (Node m : JcrUtils.getChildNodes(testRoot)) {
+ createChildren(m, 100);
+ for (Node n : JcrUtils.getChildNodes(m)) {
+ createChildren(n, 5);
+ }
}
+ session.save();
+ } finally {
+ session.logout();
}
- session.save();
- } catch (RepositoryException e) {
- throw new RuntimeException(e);
- } finally {
- session.logout();
+ return null;
}
- }
- private void createChildren(Node node, int count) throws
RepositoryException {
- for (int c = 0; c < count; c++) {
- node.addNode("n" + c);
- if (nodeCount.incrementAndGet() % SAVE_INTERVAL == 0) {
- node.getSession().save();
+ private void createChildren(Node node, int count)
+ throws RepositoryException {
+ for (int c = 0; c < count; c++) {
+ node.addNode("n" + c);
+ nodeCount.incrementAndGet();
+ if (++numNodes % SAVE_INTERVAL == 0) {
+ node.getSession().save();
+ }
}
}
- }
- });
+ }));
+ }
- System.out.println("ms #node nodes/s #event event/s event
ratio");
- while (!createNodes.isDone() || (eventCount.get() <
nodeCount.get() * EVENTS_PER_NODE)) {
+ System.out.println("ms #node nodes/s #event event/s event
ratio queue");
+ while (!isDone(createNodes) || (eventCount.get() / LISTENER_COUNT
< nodeCount.get() * EVENTS_PER_NODE)) {
long t0 = System.currentTimeMillis();
Thread.sleep(OUTPUT_RESOLUTION);
t += System.currentTimeMillis() - t0;
int nc = nodeCount.get();
int ec = eventCount.get() / LISTENER_COUNT;
+ long ql = getObservationQueueMaxLength(whiteboard);
double nps = (double) nc / t * 1000;
double eps = (double) ec / t * 1000;
double epn = (double) ec / nc / EVENTS_PER_NODE;
- System.out.format("%7d %7d %7.1f %7d %7.1f %1.2f%n", t, nc,
nps, ec, eps, epn);
+ System.out.format(
+ "%7d %7d %7.1f %7d %7.1f %1.2f %7d%n",
+ t, nc, nps, ec, eps, epn, ql);
}
- createNodes.get();
+ get(createNodes);
} finally {
for (int k = 0; k < LISTENER_COUNT; k++) {
sessions[k].getWorkspace().getObservationManager().removeEventListener(listeners[k]);
sessions[k].logout();
}
+ service.shutdown();
+ service.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ }
+
+ private static long getObservationQueueMaxLength(@Nullable Whiteboard
whiteboard) {
+ if (whiteboard == null) {
+ return -1;
+ }
+ List<RepositoryStatsMBean> stats = WhiteboardUtils.getServices(
+ whiteboard, RepositoryStatsMBean.class);
+ for (RepositoryStatsMBean bean : stats) {
+ long[] values = (long[])
bean.getObservationQueueMaxLength().get("per second");
+ return values[values.length - 1];
+ }
+ return -1;
+ }
+
+ private static boolean isDone(Iterable<Future<Object>> futures) {
+ for (Future f : futures) {
+ if (!f.isDone()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static void get(Iterable<Future<Object>> futures)
+ throws ExecutionException, InterruptedException {
+ for (Future f : futures) {
+ f.get();
}
}