Author: mduerig
Date: Thu Mar 13 16:55:38 2014
New Revision: 1577245

URL: http://svn.apache.org/r1577245
Log:
OAK-1539: Implement JackrabbitObservationManager
Initial implementation

Modified:
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
    
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1577245&r1=1577244&r2=1577245&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
 Thu Mar 13 16:55:38 2014
@@ -19,21 +19,25 @@
 package org.apache.jackrabbit.oak.jcr.observation;
 
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterators.concat;
 import static 
org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER;
 import static 
org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION;
 import static 
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
 import static 
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerObserver;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.jcr.observation.Event;
 import javax.jcr.observation.EventListener;
 
 import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Monitor;
 import com.google.common.util.concurrent.Monitor.Guard;
 import org.apache.jackrabbit.api.jmx.EventListenerMBean;
@@ -75,7 +79,7 @@ class ChangeProcessor implements Observe
     private final PermissionProvider permissionProvider;
     private final ListenerTracker tracker;
     private final EventListener eventListener;
-    private final AtomicReference<FilterProvider> filterProvider;
+    private final AtomicReference<List<FilterProvider>> filterProvider;
     private final AtomicLong eventCount;
     private final AtomicLong eventDuration;
     private final int queueLength;
@@ -89,7 +93,7 @@ class ChangeProcessor implements Observe
             NamePathMapper namePathMapper,
             PermissionProvider permissionProvider,
             ListenerTracker tracker,
-            FilterProvider filter,
+            List<FilterProvider> filters,
             StatisticManager statisticManager,
             int queueLength,
             CommitRateLimiter commitRateLimiter) {
@@ -98,7 +102,7 @@ class ChangeProcessor implements Observe
         this.permissionProvider = permissionProvider;
         this.tracker = tracker;
         eventListener = tracker.getTrackedListener();
-        filterProvider = new AtomicReference<FilterProvider>(filter);
+        filterProvider = new AtomicReference<List<FilterProvider>>(filters);
         this.eventCount = 
statisticManager.getCounter(OBSERVATION_EVENT_COUNTER);
         this.eventDuration = 
statisticManager.getCounter(OBSERVATION_EVENT_DURATION);
         this.queueLength = queueLength;
@@ -107,10 +111,10 @@ class ChangeProcessor implements Observe
 
     /**
      * Set the filter for the events this change processor will generate.
-     * @param filter
+     * @param filters
      */
-    public void setFilterProvider(FilterProvider filter) {
-        filterProvider.set(filter);
+    public void setFilterProvider(List<FilterProvider> filters) {
+        filterProvider.set(filters);
     }
 
     /**
@@ -198,22 +202,28 @@ class ChangeProcessor implements Observe
     public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo 
info) {
         if (previousRoot != null) {
             try {
-                FilterProvider provider = filterProvider.get();
-                // FIXME don't rely on toString for session id
-                if (provider.includeCommit(contentSession.toString(), info)) {
-                    String basePath = provider.getPath();
-                    EventFilter userFilter = provider.getFilter(previousRoot, 
root);
-                    EventFilter acFilter = new ACFilter(previousRoot, root, 
permissionProvider, basePath);
-                    EventQueue events = new EventQueue(
-                            namePathMapper, info, previousRoot, root, basePath,
-                            Filters.all(userFilter, acFilter));
-                    if (events.hasNext() && runningMonitor.enterIf(running)) {
-                        try {
-                            eventListener.onEvent(
-                                    new 
EventIteratorAdapter(statisticProvider(events)));
-                        } finally {
-                            runningMonitor.leave();
-                        }
+                List<FilterProvider> providers = filterProvider.get();
+                List<Iterator<Event>> eventQueues = Lists.newArrayList();
+                for (FilterProvider provider : providers) {
+                    // FIXME don't rely on toString for session id
+                    if (provider.includeCommit(contentSession.toString(), 
info)) {
+                        String basePath = provider.getPath();
+                        EventFilter userFilter = 
provider.getFilter(previousRoot, root);
+                        EventFilter acFilter = new ACFilter(previousRoot, 
root, permissionProvider, basePath);
+                        EventQueue events = new EventQueue(
+                                namePathMapper, info, previousRoot, root, 
basePath,
+                                Filters.all(userFilter, acFilter));
+                        eventQueues.add(events);
+                    }
+                }
+
+                Iterator<Event> events = concat(eventQueues.iterator());
+                if (events.hasNext() && runningMonitor.enterIf(running)) {
+                    try {
+                        eventListener.onEvent(
+                                new 
EventIteratorAdapter(statisticProvider(events)));
+                    } finally {
+                        runningMonitor.leave();
                     }
                 }
             } catch (Exception e) {

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1577245&r1=1577244&r2=1577245&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
 Thu Mar 13 16:55:38 2014
@@ -23,6 +23,8 @@ import static java.util.concurrent.TimeU
 import static 
org.apache.jackrabbit.oak.plugins.observation.filter.GlobbingPathFilter.STAR;
 import static 
org.apache.jackrabbit.oak.plugins.observation.filter.GlobbingPathFilter.STAR_STAR;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,8 +37,10 @@ import javax.jcr.nodetype.NoSuchNodeType
 import javax.jcr.observation.EventJournal;
 import javax.jcr.observation.EventListener;
 import javax.jcr.observation.EventListenerIterator;
-import javax.jcr.observation.ObservationManager;
 
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.api.observation.JackrabbitEventFilter;
+import org.apache.jackrabbit.api.observation.JackrabbitObservationManager;
 import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter;
 import org.apache.jackrabbit.commons.observation.ListenerTracker;
 import org.apache.jackrabbit.oak.api.ContentSession;
@@ -58,7 +62,7 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 
-public class ObservationManagerImpl implements ObservationManager {
+public class ObservationManagerImpl implements JackrabbitObservationManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(ObservationManagerImpl.class);
     private static final int STOP_TIME_OUT = 1000;
 
@@ -118,21 +122,30 @@ public class ObservationManagerImpl impl
         }
     }
 
-    private synchronized void addEventListener(
+    private void addEventListener(
             EventListener listener, ListenerTracker tracker, FilterProvider 
filterProvider) {
+        addEventListener(listener, tracker, 
Collections.singletonList(filterProvider));
+    }
+
+    private synchronized void addEventListener(
+            EventListener listener, ListenerTracker tracker, 
List<FilterProvider> filterProviders) {
         ChangeProcessor processor = processors.get(listener);
+        if (filterProviders.isEmpty()) {
+            return;
+        }
+
         if (processor == null) {
             LOG.debug(OBSERVATION,
-                    "Registering event listener {} with filter {}", listener, 
filterProvider);
+                    "Registering event listener {} with filter {}", listener, 
filterProviders);
             processor = new 
ChangeProcessor(sessionDelegate.getContentSession(), namePathMapper,
-                    permissionProvider, tracker, filterProvider, 
statisticManager, queueLength,
+                    permissionProvider, tracker, filterProviders, 
statisticManager, queueLength,
                     commitRateLimiter);
             processors.put(listener, processor);
             processor.start(whiteboard);
         } else {
             LOG.debug(OBSERVATION,
-                    "Changing event listener {} to filter {}", listener, 
filterProvider);
-            processor.setFilterProvider(filterProvider);
+                    "Changing event listener {} to filter {}", listener, 
filterProviders);
+            processor.setFilterProvider(filterProviders);
         }
     }
 
@@ -203,6 +216,56 @@ public class ObservationManagerImpl impl
     }
 
     @Override
+    public void addEventListener(EventListener listener, JackrabbitEventFilter 
filter)
+            throws RepositoryException {
+
+        int eventTypes = filter.getEventTypes();
+        boolean isDeep = filter.getIsDeep();
+        String[] uuids = filter.getIdentifiers();
+        String[] nodeTypeName = filter.getNodeTypes();
+        boolean noLocal = filter.getNoLocal();
+        boolean noExternal = filter.getNoExternal() || listener instanceof 
ExcludeExternal;
+        List<String> absPaths = 
Lists.newArrayList(filter.getAdditionalPaths());
+        String absPath = filter.getAbsPath();
+        if (absPath != null) {
+            absPaths.add(absPath);
+        }
+
+        ArrayList<FilterProvider> filterProviders = Lists.newArrayList();
+        for (String path : absPaths) {
+            FilterBuilder filterBuilder = new FilterBuilder();
+            filterBuilder
+                    .basePath(namePathMapper.getOakPath(path))
+                    .includeSessionLocal(!noLocal)
+                    .includeClusterExternal(!noExternal)
+                    .condition(filterBuilder.all(
+                            filterBuilder.deleteSubtree(),
+                            filterBuilder.moveSubtree(),
+                            filterBuilder.path(isDeep ? STAR_STAR : STAR),
+                            filterBuilder.eventType(eventTypes),
+                            filterBuilder.uuid(Selectors.PARENT, uuids),
+                            filterBuilder.nodeType(Selectors.PARENT,
+                                    validateNodeTypeNames(nodeTypeName))));
+            filterProviders.add(filterBuilder.build());
+        }
+
+        // FIXME support multiple path in ListenerTracker
+        ListenerTracker tracker = new ListenerTracker(
+                listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, 
noLocal) {
+            @Override
+            protected void warn(String message) {
+                LOG.warn(DEPRECATED, message, initStackTrace);
+            }
+            @Override
+            protected void beforeEventDelivery() {
+                sessionDelegate.refreshAtNextAccess();
+            }
+        };
+
+        addEventListener(listener, tracker, filterProviders);
+    }
+
+    @Override
     public void removeEventListener(EventListener listener) {
         ChangeProcessor processor;
         synchronized (this) {

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java?rev=1577245&r1=1577244&r2=1577245&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
 Thu Mar 13 16:55:38 2014
@@ -63,6 +63,8 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.jackrabbit.JcrConstants;
+import org.apache.jackrabbit.api.observation.JackrabbitEventFilter;
+import org.apache.jackrabbit.api.observation.JackrabbitObservationManager;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
@@ -583,6 +585,32 @@ public class ObservationTest extends Abs
     }
 
     @Test
+    public void disjunctPaths() throws ExecutionException, 
InterruptedException, RepositoryException {
+        assumeTrue(observationManager instanceof JackrabbitObservationManager);
+        JackrabbitObservationManager oManager = (JackrabbitObservationManager) 
observationManager;
+        ExpectationListener listener = new ExpectationListener();
+        JackrabbitEventFilter filter = new JackrabbitEventFilter()
+                .setAdditionalPaths(TEST_PATH + "/a", TEST_PATH + "/x")
+                .setEventTypes(NODE_ADDED);
+        oManager.addEventListener(listener, filter);
+
+        Node testNode = getNode(TEST_PATH);
+        Node b = testNode.addNode("a").addNode("b");
+        b.addNode("c");
+        Node y = testNode.addNode("x").addNode("y");
+        y.addNode("z");
+
+        listener.expect(b.getPath(), NODE_ADDED);
+        listener.expect(y.getPath(), NODE_ADDED);
+        testNode.getSession().save();
+
+        List<Expectation> missing = listener.getMissing(TIME_OUT, 
TimeUnit.SECONDS);
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+        List<Event> unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+    }
+
+    @Test
     public void filterPropertyOfParent()
             throws RepositoryException, ExecutionException, 
InterruptedException {
         assumeTrue(observationManager instanceof ObservationManagerImpl);
@@ -630,11 +658,11 @@ public class ObservationTest extends Abs
         // Events for all items that have a property "b/c/foo" with value "bar"
         builder.condition(builder.property(Selectors.fromThis("b/c"), "foo",
                 new Predicate<PropertyState>() {
-            @Override
-            public boolean apply(PropertyState property) {
-                return "bar".equals(property.getValue(STRING));
-            }
-        }));
+                    @Override
+                    public boolean apply(PropertyState property) {
+                        return "bar".equals(property.getValue(STRING));
+                    }
+                }));
         oManager.addEventListener(listener, builder.build());
 
         Node testNode = getNode(TEST_PATH);
@@ -818,7 +846,7 @@ public class ObservationTest extends Abs
         }
 
         public void expectMove(final String src, final String dst) {
-            expect(new Expectation(">" + src + ':' + dst){
+            expect(new Expectation('>' + src + ':' + dst){
                 @Override
                 public boolean onEvent(Event event) throws Exception {
                     return event.getType() == NODE_MOVED &&
@@ -839,8 +867,8 @@ public class ObservationTest extends Abs
             catch (TimeoutException e) {
                 long dt = System.nanoTime() - t0;
                 // TODO remove again once OAK-1491 is fixed
-                assertTrue("Spurious wak-up after " + 0,
-                        dt < 0.8*TimeUnit.NANOSECONDS.convert(time, timeUnit));
+                assertTrue("Spurious wak-up after " + dt,
+                        dt > 0.8*TimeUnit.NANOSECONDS.convert(time, timeUnit));
                 for (Expectation exp : expected) {
                     if (!exp.isDone()) {
                         missing.add(exp);


Reply via email to