Author: mduerig
Date: Thu Mar 13 16:55:38 2014
New Revision: 1577245
URL: http://svn.apache.org/r1577245
Log:
OAK-1539: Implement JackrabbitObservationManager
Initial implementation
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1577245&r1=1577244&r2=1577245&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
Thu Mar 13 16:55:38 2014
@@ -19,21 +19,25 @@
package org.apache.jackrabbit.oak.jcr.observation;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterators.concat;
import static
org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER;
import static
org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION;
import static
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import static
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerObserver;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import javax.jcr.observation.Event;
import javax.jcr.observation.EventListener;
import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.Monitor.Guard;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
@@ -75,7 +79,7 @@ class ChangeProcessor implements Observe
private final PermissionProvider permissionProvider;
private final ListenerTracker tracker;
private final EventListener eventListener;
- private final AtomicReference<FilterProvider> filterProvider;
+ private final AtomicReference<List<FilterProvider>> filterProvider;
private final AtomicLong eventCount;
private final AtomicLong eventDuration;
private final int queueLength;
@@ -89,7 +93,7 @@ class ChangeProcessor implements Observe
NamePathMapper namePathMapper,
PermissionProvider permissionProvider,
ListenerTracker tracker,
- FilterProvider filter,
+ List<FilterProvider> filters,
StatisticManager statisticManager,
int queueLength,
CommitRateLimiter commitRateLimiter) {
@@ -98,7 +102,7 @@ class ChangeProcessor implements Observe
this.permissionProvider = permissionProvider;
this.tracker = tracker;
eventListener = tracker.getTrackedListener();
- filterProvider = new AtomicReference<FilterProvider>(filter);
+ filterProvider = new AtomicReference<List<FilterProvider>>(filters);
this.eventCount =
statisticManager.getCounter(OBSERVATION_EVENT_COUNTER);
this.eventDuration =
statisticManager.getCounter(OBSERVATION_EVENT_DURATION);
this.queueLength = queueLength;
@@ -107,10 +111,10 @@ class ChangeProcessor implements Observe
/**
* Set the filter for the events this change processor will generate.
- * @param filter
+ * @param filters
*/
- public void setFilterProvider(FilterProvider filter) {
- filterProvider.set(filter);
+ public void setFilterProvider(List<FilterProvider> filters) {
+ filterProvider.set(filters);
}
/**
@@ -198,22 +202,28 @@ class ChangeProcessor implements Observe
public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo
info) {
if (previousRoot != null) {
try {
- FilterProvider provider = filterProvider.get();
- // FIXME don't rely on toString for session id
- if (provider.includeCommit(contentSession.toString(), info)) {
- String basePath = provider.getPath();
- EventFilter userFilter = provider.getFilter(previousRoot,
root);
- EventFilter acFilter = new ACFilter(previousRoot, root,
permissionProvider, basePath);
- EventQueue events = new EventQueue(
- namePathMapper, info, previousRoot, root, basePath,
- Filters.all(userFilter, acFilter));
- if (events.hasNext() && runningMonitor.enterIf(running)) {
- try {
- eventListener.onEvent(
- new
EventIteratorAdapter(statisticProvider(events)));
- } finally {
- runningMonitor.leave();
- }
+ List<FilterProvider> providers = filterProvider.get();
+ List<Iterator<Event>> eventQueues = Lists.newArrayList();
+ for (FilterProvider provider : providers) {
+ // FIXME don't rely on toString for session id
+ if (provider.includeCommit(contentSession.toString(),
info)) {
+ String basePath = provider.getPath();
+ EventFilter userFilter =
provider.getFilter(previousRoot, root);
+ EventFilter acFilter = new ACFilter(previousRoot,
root, permissionProvider, basePath);
+ EventQueue events = new EventQueue(
+ namePathMapper, info, previousRoot, root,
basePath,
+ Filters.all(userFilter, acFilter));
+ eventQueues.add(events);
+ }
+ }
+
+ Iterator<Event> events = concat(eventQueues.iterator());
+ if (events.hasNext() && runningMonitor.enterIf(running)) {
+ try {
+ eventListener.onEvent(
+ new
EventIteratorAdapter(statisticProvider(events)));
+ } finally {
+ runningMonitor.leave();
}
}
} catch (Exception e) {
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1577245&r1=1577244&r2=1577245&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
Thu Mar 13 16:55:38 2014
@@ -23,6 +23,8 @@ import static java.util.concurrent.TimeU
import static
org.apache.jackrabbit.oak.plugins.observation.filter.GlobbingPathFilter.STAR;
import static
org.apache.jackrabbit.oak.plugins.observation.filter.GlobbingPathFilter.STAR_STAR;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,8 +37,10 @@ import javax.jcr.nodetype.NoSuchNodeType
import javax.jcr.observation.EventJournal;
import javax.jcr.observation.EventListener;
import javax.jcr.observation.EventListenerIterator;
-import javax.jcr.observation.ObservationManager;
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.api.observation.JackrabbitEventFilter;
+import org.apache.jackrabbit.api.observation.JackrabbitObservationManager;
import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
import org.apache.jackrabbit.oak.api.ContentSession;
@@ -58,7 +62,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-public class ObservationManagerImpl implements ObservationManager {
+public class ObservationManagerImpl implements JackrabbitObservationManager {
private static final Logger LOG =
LoggerFactory.getLogger(ObservationManagerImpl.class);
private static final int STOP_TIME_OUT = 1000;
@@ -118,21 +122,30 @@ public class ObservationManagerImpl impl
}
}
- private synchronized void addEventListener(
+ private void addEventListener(
EventListener listener, ListenerTracker tracker, FilterProvider
filterProvider) {
+ addEventListener(listener, tracker,
Collections.singletonList(filterProvider));
+ }
+
+ private synchronized void addEventListener(
+ EventListener listener, ListenerTracker tracker,
List<FilterProvider> filterProviders) {
ChangeProcessor processor = processors.get(listener);
+ if (filterProviders.isEmpty()) {
+ return;
+ }
+
if (processor == null) {
LOG.debug(OBSERVATION,
- "Registering event listener {} with filter {}", listener,
filterProvider);
+ "Registering event listener {} with filter {}", listener,
filterProviders);
processor = new
ChangeProcessor(sessionDelegate.getContentSession(), namePathMapper,
- permissionProvider, tracker, filterProvider,
statisticManager, queueLength,
+ permissionProvider, tracker, filterProviders,
statisticManager, queueLength,
commitRateLimiter);
processors.put(listener, processor);
processor.start(whiteboard);
} else {
LOG.debug(OBSERVATION,
- "Changing event listener {} to filter {}", listener,
filterProvider);
- processor.setFilterProvider(filterProvider);
+ "Changing event listener {} to filter {}", listener,
filterProviders);
+ processor.setFilterProvider(filterProviders);
}
}
@@ -203,6 +216,56 @@ public class ObservationManagerImpl impl
}
@Override
+ public void addEventListener(EventListener listener, JackrabbitEventFilter
filter)
+ throws RepositoryException {
+
+ int eventTypes = filter.getEventTypes();
+ boolean isDeep = filter.getIsDeep();
+ String[] uuids = filter.getIdentifiers();
+ String[] nodeTypeName = filter.getNodeTypes();
+ boolean noLocal = filter.getNoLocal();
+ boolean noExternal = filter.getNoExternal() || listener instanceof
ExcludeExternal;
+ List<String> absPaths =
Lists.newArrayList(filter.getAdditionalPaths());
+ String absPath = filter.getAbsPath();
+ if (absPath != null) {
+ absPaths.add(absPath);
+ }
+
+ ArrayList<FilterProvider> filterProviders = Lists.newArrayList();
+ for (String path : absPaths) {
+ FilterBuilder filterBuilder = new FilterBuilder();
+ filterBuilder
+ .basePath(namePathMapper.getOakPath(path))
+ .includeSessionLocal(!noLocal)
+ .includeClusterExternal(!noExternal)
+ .condition(filterBuilder.all(
+ filterBuilder.deleteSubtree(),
+ filterBuilder.moveSubtree(),
+ filterBuilder.path(isDeep ? STAR_STAR : STAR),
+ filterBuilder.eventType(eventTypes),
+ filterBuilder.uuid(Selectors.PARENT, uuids),
+ filterBuilder.nodeType(Selectors.PARENT,
+ validateNodeTypeNames(nodeTypeName))));
+ filterProviders.add(filterBuilder.build());
+ }
+
+ // FIXME support multiple path in ListenerTracker
+ ListenerTracker tracker = new ListenerTracker(
+ listener, eventTypes, absPath, isDeep, uuids, nodeTypeName,
noLocal) {
+ @Override
+ protected void warn(String message) {
+ LOG.warn(DEPRECATED, message, initStackTrace);
+ }
+ @Override
+ protected void beforeEventDelivery() {
+ sessionDelegate.refreshAtNextAccess();
+ }
+ };
+
+ addEventListener(listener, tracker, filterProviders);
+ }
+
+ @Override
public void removeEventListener(EventListener listener) {
ChangeProcessor processor;
synchronized (this) {
Modified:
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java?rev=1577245&r1=1577244&r2=1577245&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
Thu Mar 13 16:55:38 2014
@@ -63,6 +63,8 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.jackrabbit.JcrConstants;
+import org.apache.jackrabbit.api.observation.JackrabbitEventFilter;
+import org.apache.jackrabbit.api.observation.JackrabbitObservationManager;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
@@ -583,6 +585,32 @@ public class ObservationTest extends Abs
}
@Test
+ public void disjunctPaths() throws ExecutionException,
InterruptedException, RepositoryException {
+ assumeTrue(observationManager instanceof JackrabbitObservationManager);
+ JackrabbitObservationManager oManager = (JackrabbitObservationManager)
observationManager;
+ ExpectationListener listener = new ExpectationListener();
+ JackrabbitEventFilter filter = new JackrabbitEventFilter()
+ .setAdditionalPaths(TEST_PATH + "/a", TEST_PATH + "/x")
+ .setEventTypes(NODE_ADDED);
+ oManager.addEventListener(listener, filter);
+
+ Node testNode = getNode(TEST_PATH);
+ Node b = testNode.addNode("a").addNode("b");
+ b.addNode("c");
+ Node y = testNode.addNode("x").addNode("y");
+ y.addNode("z");
+
+ listener.expect(b.getPath(), NODE_ADDED);
+ listener.expect(y.getPath(), NODE_ADDED);
+ testNode.getSession().save();
+
+ List<Expectation> missing = listener.getMissing(TIME_OUT,
TimeUnit.SECONDS);
+ assertTrue("Missing events: " + missing, missing.isEmpty());
+ List<Event> unexpected = listener.getUnexpected();
+ assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+ }
+
+ @Test
public void filterPropertyOfParent()
throws RepositoryException, ExecutionException,
InterruptedException {
assumeTrue(observationManager instanceof ObservationManagerImpl);
@@ -630,11 +658,11 @@ public class ObservationTest extends Abs
// Events for all items that have a property "b/c/foo" with value "bar"
builder.condition(builder.property(Selectors.fromThis("b/c"), "foo",
new Predicate<PropertyState>() {
- @Override
- public boolean apply(PropertyState property) {
- return "bar".equals(property.getValue(STRING));
- }
- }));
+ @Override
+ public boolean apply(PropertyState property) {
+ return "bar".equals(property.getValue(STRING));
+ }
+ }));
oManager.addEventListener(listener, builder.build());
Node testNode = getNode(TEST_PATH);
@@ -818,7 +846,7 @@ public class ObservationTest extends Abs
}
public void expectMove(final String src, final String dst) {
- expect(new Expectation(">" + src + ':' + dst){
+ expect(new Expectation('>' + src + ':' + dst){
@Override
public boolean onEvent(Event event) throws Exception {
return event.getType() == NODE_MOVED &&
@@ -839,8 +867,8 @@ public class ObservationTest extends Abs
catch (TimeoutException e) {
long dt = System.nanoTime() - t0;
// TODO remove again once OAK-1491 is fixed
- assertTrue("Spurious wak-up after " + 0,
- dt < 0.8*TimeUnit.NANOSECONDS.convert(time, timeUnit));
+ assertTrue("Spurious wak-up after " + dt,
+ dt > 0.8*TimeUnit.NANOSECONDS.convert(time, timeUnit));
for (Expectation exp : expected) {
if (!exp.isDone()) {
missing.add(exp);