Author: mreutegg
Date: Tue Nov  8 07:37:43 2016
New Revision: 1768635

URL: http://svn.apache.org/viewvc?rev=1768635&view=rev
Log:
OAK-4908: Best-effort prefiltering in ChangeProcessor based on ChangeSet

Reverted revision 1768558

Removed:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
    
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
 Tue Nov  8 07:37:43 2016
@@ -42,7 +42,6 @@ import com.google.common.collect.Immutab
 import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.plugins.nodetype.TypePredicate;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
 import 
org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector;
 import org.apache.jackrabbit.oak.plugins.tree.RootFactory;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -60,13 +59,6 @@ public final class FilterBuilder {
     private boolean includeClusterLocal = true;
     private final List<String> subTrees = newArrayList();
     private Condition condition = includeAll();
-    private ChangeSetFilter changeSetFilter = new ChangeSetFilter() {
-        
-        @Override
-        public boolean excludes(ChangeSet changeSet) {
-            return false;
-        }
-    };
 
     private EventAggregator aggregator;
 
@@ -74,12 +66,6 @@ public final class FilterBuilder {
         @Nonnull
         EventFilter createFilter(@Nonnull NodeState before, @Nonnull NodeState 
after);
     }
-    
-    @Nonnull
-    public FilterBuilder setChangeSetFilter(@Nonnull ChangeSetFilter 
changeSetFilter) {
-        this.changeSetFilter = changeSetFilter;
-        return this;
-    }
 
     /**
      * Adds a path to the set of paths whose subtrees include all events of
@@ -394,7 +380,6 @@ public final class FilterBuilder {
             final EventAggregator aggregator = FilterBuilder.this.aggregator;
             final Iterable<String> subTrees = FilterBuilder.this.getSubTrees();
             final Condition condition = FilterBuilder.this.condition;
-            final ChangeSetFilter changeSetFilter = 
FilterBuilder.this.changeSetFilter;
 
             @Override
             public boolean includeCommit(@Nonnull String sessionId, 
@CheckForNull CommitInfo info) {
@@ -432,11 +417,6 @@ public final class FilterBuilder {
             public EventAggregator getEventAggregator() {
                 return aggregator;
             }
-            
-            @Override
-            public boolean excludes(ChangeSet changeSet) {
-                return changeSetFilter.excludes(changeSet);
-            }
         };
     }
 

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
 Tue Nov  8 07:37:43 2016
@@ -28,11 +28,8 @@ import org.apache.jackrabbit.oak.spi.sta
 /**
  * Instance of this class provide a {@link EventFilter} for observation
  * events and a filter for commits.
- * <p>
- * In order to support OAK-4908 a FilterProvider
- * extends ChangeSetFilter
  */
-public interface FilterProvider extends ChangeSetFilter {
+public interface FilterProvider {
 
     /**
      * Filter whole commits. Only commits for which this method returns

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
 Tue Nov  8 07:37:43 2016
@@ -30,7 +30,6 @@ import java.lang.Thread.UncaughtExceptio
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -314,28 +313,4 @@ public class BackgroundObserver implemen
     private static Logger getLogger(@Nonnull Observer observer) {
         return LoggerFactory.getLogger(checkNotNull(observer).getClass());
     }
-
-    
-    /** FOR TESTING ONLY 
-     * @throws InterruptedException **/
-    boolean waitUntilStopped(int timeout, TimeUnit unit) throws 
InterruptedException {
-        long done = System.currentTimeMillis() + unit.toMillis(timeout);
-        boolean added = false;
-        synchronized(this) {
-            added = queue.offer(STOP);
-            currentTask.onComplete(completionHandler);
-        }
-        while(done > System.currentTimeMillis()) {
-            synchronized(this) {
-                if (!added) {
-                    added = queue.offer(STOP);
-                }
-                if (queue.size() == 0 || (queue.size() == 1 && queue.peek() == 
STOP)) {
-                    return true;
-                }
-                wait(1);
-            }
-        }
-        return false;
-    }
 }

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
 Tue Nov  8 07:37:43 2016
@@ -20,50 +20,34 @@
 package org.apache.jackrabbit.oak.spi.commit;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
-import java.io.Closeable;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.plugins.observation.Filter;
-import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
-import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.junit.After;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-
-import junit.framework.AssertionFailedError;
-
 public class BackgroundObserverTest {
     private static final CommitInfo COMMIT_INFO = new CommitInfo("no-session", 
null);
     public static final int CHANGE_COUNT = 1024;
 
     private final List<Runnable> assertions = Lists.newArrayList();
     private CountDownLatch doneCounter;
-    private final List<Closeable> closeables = Lists.newArrayList();
 
     /**
      * Assert that each observer of many running concurrently sees the same
-     * linearly sequence of commits (i.e. sees the commits in the correct
-     * order).
+     * linearly sequence of commits (i.e. sees the commits in the correct 
order).
      */
     @Test
     public void concurrentObservers() throws InterruptedException {
@@ -115,7 +99,7 @@ public class BackgroundObserverTest {
         return new BackgroundObserver(new Observer() {
             // Need synchronised list here to maintain correct memory barrier
             // when this is passed on to done(List<Runnable>)
-            final List<Runnable> assertions = 
Collections.synchronizedList(Lists.<Runnable> newArrayList());
+            final List<Runnable> assertions = 
Collections.synchronizedList(Lists.<Runnable>newArrayList());
             volatile NodeState previous;
 
             @Override
@@ -141,288 +125,5 @@ public class BackgroundObserverTest {
             }
         }, executor, queueLength);
     }
-    
-    class MyFilter implements Filter {
-
-        private boolean excludeNext;
-
-        void excludeNext(boolean excludeNext) {
-            this.excludeNext = excludeNext;
-        }
-
-        @Override
-        public boolean excludes(NodeState root, CommitInfo info) {
-            final boolean excludes = excludeNext;
-            excludeNext = false;
-            return excludes;
-        }
-        
-    }
-
-    class Recorder implements FilteringAwareObserver {
-
-        List<Pair> includedChanges = new LinkedList<Pair>();
-        private boolean pause;
-        private boolean pausing;
-
-        public Recorder() {
-        }
-        
-        @Override
-        public void contentChanged(NodeState before, NodeState after, 
CommitInfo info) {
-            includedChanges.add(new Pair(before, after));
-            maybePause();
-        }
-
-        public void maybePause() {
-            synchronized (this) {
-                try {
-                    while (pause) {
-                        pausing = true;
-                        this.notifyAll();
-                        try {
-                            this.wait();
-                        } catch (InterruptedException e) {
-                            // should not happen
-                        }
-                    }
-                } finally {
-                    pausing = false;
-                    this.notifyAll();
-                }
-            }
-        }
-
-        public synchronized void pause() {
-            this.pause = true;
-        }
-
-        public synchronized void unpause() {
-            this.pause = false;
-            this.notifyAll();
-        }
-
-        public boolean waitForPausing(int timeout, TimeUnit unit) throws 
InterruptedException {
-            final long done = System.currentTimeMillis() + 
unit.toMillis(timeout);
-            synchronized (this) {
-                while (!pausing && done > System.currentTimeMillis()) {
-                    this.wait();
-                }
-                return pausing;
-            }
-        }
-
-        public boolean waitForUnpausing(int timeout, TimeUnit unit) throws 
InterruptedException {
-            final long done = System.currentTimeMillis() + 
unit.toMillis(timeout);
-            synchronized (this) {
-                while (pausing && done > System.currentTimeMillis()) {
-                    this.wait();
-                }
-                return !pausing;
-            }
-        }
-
-    }
-
-    class Pair {
-        private final NodeState before;
-        private final NodeState after;
-
-        Pair(NodeState before, NodeState after) {
-            this.before = before;
-            this.after = after;
-        }
-
-        @Override
-        public String toString() {
-            return "Pair(before=" + before + ", after=" + after + ")";
-        }
-    }
-
-    class NodeStateGenerator {
-        Random r = new Random(1232131); // seed: repeatable tests
-        NodeBuilder builder = EMPTY_NODE.builder();
-
-        NodeState next() {
-            builder.setProperty("p", r.nextInt());
-            NodeState result = builder.getNodeState();
-            builder = result.builder();
-            return result;
-        }
-    }
-
-    private void assertMatches(String msg, List<Pair> expected, List<Pair> 
actual) {
-        assertEquals("size mismatch. msg=" + msg, expected.size(), 
actual.size());
-        for (int i = 0; i < expected.size(); i++) {
-            assertSame("mismatch of before at pos=" + i + ", msg=" + msg, 
expected.get(i).before, actual.get(i).before);
-            assertSame("mismatch of after at pos=" + i + ", msg=" + msg, 
expected.get(i).after, actual.get(i).after);
-        }
-    }
-
-    @After
-    public void shutDown() throws Exception {
-        for (Closeable closeable : closeables) {
-            try {
-                closeable.close();
-            } catch (Exception e) {
-                throw new AssertionFailedError(e.getMessage());
-            }
-        }
-    }
-
-    @Test
-    public void testExcludedAllCommits() throws Exception {
-        MyFilter filter = new MyFilter();
-        Recorder recorder = new Recorder();
-        ExecutorService executor = newSingleThreadExecutor();
-        FilteringObserver fo = new FilteringObserver(executor, 5, filter, 
recorder);
-        closeables.add(fo);
-        List<Pair> expected = new LinkedList<Pair>();
-        NodeStateGenerator generator = new NodeStateGenerator();
-        NodeState first = generator.next();
-        expected.add(new Pair(null, first));
-        fo.contentChanged(first, CommitInfo.EMPTY);
-        for (int i = 0; i < 100000; i++) {
-            filter.excludeNext(true);
-            fo.contentChanged(generator.next(), CommitInfo.EMPTY);
-        }
-        assertTrue("testExcludedAllCommits", 
fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
-        assertMatches("testExcludedAllCommits", expected, 
recorder.includedChanges);
-    }
-
-    @Test
-    public void testNoExcludedCommits() throws Exception {
-        MyFilter filter = new MyFilter();
-        Recorder recorder = new Recorder();
-        ExecutorService executor = newSingleThreadExecutor();
-        FilteringObserver fo = new FilteringObserver(executor, 10002, filter, 
recorder);
-        closeables.add(fo);
-        List<Pair> expected = new LinkedList<Pair>();
-        NodeStateGenerator generator = new NodeStateGenerator();
-        NodeState first = generator.next();
-        expected.add(new Pair(null, first));
-        fo.contentChanged(first, CommitInfo.EMPTY);
-        NodeState previous = first;
-        for (int i = 0; i < 10000; i++) {
-            filter.excludeNext(false);
-            NodeState next = generator.next();
-            expected.add(new Pair(previous, next));
-            previous = next;
-            fo.contentChanged(next, CommitInfo.EMPTY);
-        }
-        assertTrue("testNoExcludedCommits", 
fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
-        assertMatches("testNoExcludedCommits", expected, 
recorder.includedChanges);
-    }
-
-    @Test
-    public void testExcludeCommitsWithFullQueue() throws Exception {
-        MyFilter filter = new MyFilter();
-        Recorder recorder = new Recorder();
-        ExecutorService executor = newSingleThreadExecutor();
-        FilteringObserver fo = new FilteringObserver(executor, 2, filter, 
recorder);
-        closeables.add(fo);
-        List<Pair> expected = new LinkedList<Pair>();
-        NodeStateGenerator generator = new NodeStateGenerator();
-        recorder.pause();
-
-        // the first one will directly go to the recorder
-        NodeState firstIncluded = generator.next();
-        expected.add(new Pair(null, firstIncluded));
-        fo.contentChanged(firstIncluded, CommitInfo.EMPTY);
-
-        assertTrue("observer did not get called (yet?)", 
recorder.waitForPausing(5, TimeUnit.SECONDS));
-
-        // this one will be queued as #1
-        NodeState secondIncluded = generator.next();
-        expected.add(new Pair(firstIncluded, secondIncluded));
-        fo.contentChanged(secondIncluded, CommitInfo.EMPTY);
-
-        // this one will be queued as #2
-        NodeState thirdIncluded = generator.next();
-        expected.add(new Pair(secondIncluded, thirdIncluded));
-        fo.contentChanged(thirdIncluded, CommitInfo.EMPTY);
-
-        // this one will cause the queue to 'overflow' (full==true)
-        NodeState forthQueueFull = generator.next();
-        // not adding to expected, as this one ends up in the overflow element
-        fo.contentChanged(forthQueueFull, CommitInfo.EMPTY);
-
-        NodeState next;
-        // exclude when queue is full
-        filter.excludeNext(true);
-        next = generator.next();
-        // if excluded==true and full, hence not adding to expected
-        fo.contentChanged(next, CommitInfo.EMPTY);
-        // include after an exclude when queue was full
-        // => this is not supported. when the queue
-        filter.excludeNext(false);
-        next = generator.next();
-        // excluded==false BUT queue full, hence not adding to expected
-        fo.contentChanged(next, CommitInfo.EMPTY);
-        // let recorder continue
-        recorder.unpause();
-
-        recorder.waitForUnpausing(5, TimeUnit.SECONDS);
-        Thread.sleep(1000); // wait for 1 element to be dequeued at least
-        // exclude when queue is no longer full
-        filter.excludeNext(true);
-        NodeState seventhAfterQueueFull = generator.next();
-        // with the introduction of the FilteringAwareObserver this
-        // 'seventhAfterQueueFull' root will not be forwarded
-        // to the BackgroundObserver - thus entirely filtered
-
-        fo.contentChanged(seventhAfterQueueFull, CommitInfo.EMPTY);
-
-        // but with the introduction of FilteringAwareObserver the delivery
-        // only happens with non-filtered items, so adding yet another one now
-        filter.excludeNext(false);
-        NodeState last = generator.next();
-        // while above the "seventhAfterQueueFull" DOES get filtered, the next 
contentChange
-        // triggers the release of the 'queue full overflow element' (with 
commitInfo==null)
-        // and that we must add as expected()
-        expected.add(new Pair(thirdIncluded, seventhAfterQueueFull)); // 
commitInfo == null
-        expected.add(new Pair(seventhAfterQueueFull, last));
-        fo.contentChanged(last, CommitInfo.EMPTY);
-        
-        assertTrue("testExcludeCommitsWithFullQueue", 
fo.getBackgroundObserver().waitUntilStopped(10, TimeUnit.SECONDS));
-        assertMatches("testExcludeCommitsWithFullQueue", expected, 
recorder.includedChanges);
-    }
-
-    @Test
-    public void testExcludeSomeCommits() throws Exception {
-        ExecutorService executor = newSingleThreadExecutor();
-        for (int i = 0; i < 100; i++) {
-            doTestExcludeSomeCommits(i, executor);
-        }
-        for (int i = 100; i < 10000; i += 50) {
-            doTestExcludeSomeCommits(i, executor);
-        }
-    }
-
-    private void doTestExcludeSomeCommits(int cnt, Executor executor) throws 
Exception {
-        MyFilter filter = new MyFilter();
-        Recorder recorder = new Recorder();
-        FilteringObserver fo = new FilteringObserver(executor, cnt + 2, 
filter, recorder);
-        closeables.add(fo);
-        List<Pair> expected = new LinkedList<Pair>();
-        Random r = new Random(2343242); // seed: repeatable tests
-        NodeStateGenerator generator = new NodeStateGenerator();
-        NodeState first = generator.next();
-        expected.add(new Pair(null, first));
-        fo.contentChanged(first, CommitInfo.EMPTY);
-        NodeState previous = first;
-        for (int i = 0; i < cnt; i++) {
-            boolean excludeNext = r.nextInt(100) < 90;
-            filter.excludeNext(excludeNext);
-            NodeState next = generator.next();
-            if (!excludeNext) {
-                expected.add(new Pair(previous, next));
-            }
-            previous = next;
-            fo.contentChanged(next, CommitInfo.EMPTY);
-        }
-        assertTrue("cnt=" + cnt, 
fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
-        assertMatches("cnt=" + cnt, expected, recorder.includedChanges);
-    }
 
 }

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
 Tue Nov  8 07:37:43 2016
@@ -45,7 +45,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.name.NamespaceEditorProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.TypeEditorProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
 import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.plugins.version.VersionHook;
 import org.apache.jackrabbit.oak.query.QueryEngineSettings;
@@ -121,7 +120,6 @@ public class Jcr {
             with(new NamespaceEditorProvider());
             with(new TypeEditorProvider());
             with(new ConflictValidatorProvider());
-            with(new ChangeCollectorProvider());
 
             with(new ReferenceEditorProvider());
             with(new ReferenceIndexProvider());

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
 Tue Nov  8 07:37:43 2016
@@ -21,7 +21,6 @@ package org.apache.jackrabbit.oak.jcr.ob
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER;
 import static 
org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION;
-import static 
org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider.COMMIT_CONTEXT_OBSERVATION_CHANGESET;
 import static 
org.apache.jackrabbit.oak.plugins.observation.filter.VisibleFilter.VISIBLE_FILTER;
 import static 
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
 import static 
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerObserver;
@@ -33,46 +32,40 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.jcr.observation.Event;
 import javax.jcr.observation.EventIterator;
 import javax.jcr.observation.EventListener;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Monitor;
+import com.google.common.util.concurrent.Monitor.Guard;
 import org.apache.jackrabbit.api.jmx.EventListenerMBean;
 import org.apache.jackrabbit.commons.observation.ListenerTracker;
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
 import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
-import org.apache.jackrabbit.oak.plugins.observation.Filter;
-import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
-import org.apache.jackrabbit.oak.plugins.observation.FilteringDispatcher;
-import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
 import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
 import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
-import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilter;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
-import org.apache.jackrabbit.oak.spi.commit.CommitContext;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
-import org.apache.jackrabbit.oak.stats.MeterStats;
 import org.apache.jackrabbit.oak.stats.StatisticManager;
+import org.apache.jackrabbit.oak.stats.MeterStats;
 import org.apache.jackrabbit.oak.stats.TimerStats;
 import org.apache.jackrabbit.oak.util.PerfLogger;
 import org.apache.jackrabbit.stats.TimeSeriesMax;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Monitor;
-import com.google.common.util.concurrent.Monitor.Guard;
-
 /**
  * A {@code ChangeProcessor} generates observation {@link 
javax.jcr.observation.Event}s
  * based on a {@link FilterProvider filter} and delivers them to an {@link 
EventListener}.
@@ -80,27 +73,11 @@ import com.google.common.util.concurrent
  * After instantiation a {@code ChangeProcessor} must be started in order to 
start
  * delivering observation events and stopped to stop doing so.
  */
-class ChangeProcessor implements FilteringAwareObserver {
+class ChangeProcessor implements Observer {
     private static final Logger LOG = 
LoggerFactory.getLogger(ChangeProcessor.class);
     private static final PerfLogger PERF_LOGGER = new PerfLogger(
             LoggerFactory.getLogger(ChangeProcessor.class.getName() + 
".perf"));
 
-    private static enum FilterResult {
-        /** marks a commit as to be included, ie delivered.
-         * It's okay to falsely mark a commit as included,
-         * since filtering (as part of converting to events)
-         * will be applied at a later stage again. */
-        INCLUDE,
-        /** mark a commit as not of interest to this ChangeProcessor.
-         * Exclusion is definite, ie it's not okay to falsely
-         * mark a commit as excluded */
-        EXCLUDE, 
-        /** mark a commit as included but indicate that this
-         * is not a result of prefiltering but that prefiltering
-         * was skipped/not applicable for some reason */
-        PREFILTERING_SKIPPED
-    }
-    
     /**
      * Fill ratio of the revision queue at which commits should be delayed
      * (conditional of {@code commitRateLimiter} being non {@code null}).
@@ -112,12 +89,7 @@ class ChangeProcessor implements Filteri
      * kicks in.
      */
     public static final int MAX_DELAY;
-    
-    /** The test mode can be used to just verify if prefiltering would have
-     * correctly done its job and warn if that's not the case.
-     */
-    private static final boolean PREFILTERING_TESTMODE;
-    
+
     // OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using 
System.properties for now
     static {
         final String delayThresholdStr = 
System.getProperty("oak.commitRateLimiter.delayThreshold");
@@ -142,18 +114,6 @@ class ChangeProcessor implements Filteri
         }
         DELAY_THRESHOLD = delayThreshold;
         MAX_DELAY = maxDelay;
-
-        final String prefilteringTestModeStr = 
System.getProperty("oak.observation.prefilteringTestMode");
-        boolean prefilteringTestModeBool = false; // default is enabled
-        try {
-            if (prefilteringTestModeStr != null && 
prefilteringTestModeStr.length() != 0) {
-                prefilteringTestModeBool = 
Boolean.parseBoolean(prefilteringTestModeStr);
-                LOG.info("<clinit> using oak.observation.prefilteringTestMode 
= " + prefilteringTestModeBool);
-            }
-        } catch(RuntimeException e) {
-            LOG.warn("<clinit> could not parse 
oak.observation.prefilteringTestMode, using default (" + 
prefilteringTestModeBool + "): " + e, e);
-        }
-        PREFILTERING_TESTMODE = prefilteringTestModeBool;
     }
     
     private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -185,22 +145,8 @@ class ChangeProcessor implements Filteri
      */
     private CompositeRegistration registration;
 
-    /**
-     * for statistics: tracks how many times prefiltering excluded a commit
-     */
-    private int prefilterExcludeCount;
-    
-    /**
-     * for statistics: tracks how many times prefiltering included a commit
-     */
-    private int prefilterIncludeCount;
-    
-    /**
-     * for statistics: tracks how many times prefiltering was ignored (not 
evaluated at all),
-     * either because it was disabled, queue too small, CommitInfo null or 
CommitContext null
-     */
-    private int prefilterSkipCount;
-    
+    private volatile NodeState previousRoot;
+
     public ChangeProcessor(
             ContentSession contentSession,
             NamePathMapper namePathMapper,
@@ -234,29 +180,6 @@ class ChangeProcessor implements Filteri
         return filterProvider.get();
     }
 
-    @Nonnull
-    public ChangeProcessorMBean getMBean() {
-        return new ChangeProcessorMBean() {
-
-            @Override
-            public int getPrefilterExcludeCount() {
-                return prefilterExcludeCount;
-            }
-
-            @Override
-            public int getPrefilterIncludeCount() {
-                return prefilterIncludeCount;
-            }
-
-            @Override
-            public int getPrefilterSkipCount() {
-                return prefilterSkipCount;
-            }
-
-        };
-    }
-
-    
     /**
      * Start this change processor
      * @param whiteboard  the whiteboard instance to used for scheduling 
individual
@@ -267,18 +190,16 @@ class ChangeProcessor implements Filteri
         checkState(registration == null, "Change processor started already");
         final WhiteboardExecutor executor = new WhiteboardExecutor();
         executor.start(whiteboard);
-        final FilteringObserver filteringObserver = createObserver(executor);
+        final BackgroundObserver observer = createObserver(executor);
         listenerId = COUNTER.incrementAndGet() + "";
         Map<String, String> attrs = ImmutableMap.of(LISTENER_ID, listenerId);
         String name = tracker.toString();
         registration = new CompositeRegistration(
-            registerObserver(whiteboard, filteringObserver),
+            registerObserver(whiteboard, observer),
             registerMBean(whiteboard, EventListenerMBean.class,
                     tracker.getListenerMBean(), "EventListener", name, attrs),
             registerMBean(whiteboard, BackgroundObserverMBean.class,
-                    filteringObserver.getBackgroundObserver().getMBean(), 
BackgroundObserverMBean.TYPE, name, attrs),
-            registerMBean(whiteboard, ChangeProcessorMBean.class,
-                    getMBean(), ChangeProcessorMBean.TYPE, name, attrs),
+                    observer.getMBean(), BackgroundObserverMBean.TYPE, name, 
attrs),
             //TODO If FilterProvider gets changed later then MBean would need 
to be
             // re-registered
             registerMBean(whiteboard, FilterConfigMBean.class,
@@ -286,7 +207,7 @@ class ChangeProcessor implements Filteri
             new Registration() {
                 @Override
                 public void unregister() {
-                    filteringObserver.close();
+                    observer.close();
                 }
             },
             new Registration() {
@@ -304,9 +225,8 @@ class ChangeProcessor implements Filteri
         );
     }
 
-    private FilteringObserver createObserver(final WhiteboardExecutor 
executor) {
-        FilteringDispatcher fd = new FilteringDispatcher(this);
-        BackgroundObserver bo = new BackgroundObserver(fd, executor, 
queueLength) {
+    private BackgroundObserver createObserver(final WhiteboardExecutor 
executor) {
+        return new BackgroundObserver(this, executor, queueLength) {
             private volatile long delay;
             private volatile boolean blocking;
 
@@ -367,43 +287,7 @@ class ChangeProcessor implements Filteri
                 }
             }
 
-            
-            @Override
-            public String toString() {
-                return "Prefiltering BackgroundObserver for 
"+ChangeProcessor.this;
-            }
         };
-        return new FilteringObserver(bo, new Filter() {
-            
-            @Override
-            public boolean excludes(NodeState root, CommitInfo info) {
-                if (PREFILTERING_TESTMODE) {
-                    // then we don't prefilter but only test later
-                    prefilterSkipCount++;
-                    return false;
-                }
-                final FilterResult filterResult = evalPrefilter(root, info, 
getChangeSet(info));
-                switch (filterResult) {
-                case PREFILTERING_SKIPPED: {
-                    prefilterSkipCount++;
-                    return false;
-                }
-                case EXCLUDE: {
-                    prefilterExcludeCount++;
-                    return true;
-                }
-                case INCLUDE: {
-                    prefilterIncludeCount++;
-                    return false;
-                }
-                default: {
-                    LOG.info("isExcluded: unknown/unsupported filter result: " 
+ filterResult);
-                    prefilterSkipCount++;
-                    return false;
-                }
-                }
-            }
-        });
     }
 
     private final Monitor runningMonitor = new Monitor();
@@ -455,48 +339,16 @@ class ChangeProcessor implements Filteri
         }
     }
 
-    /**
-     * Utility method that extracts the ChangeSet from a CommitInfo if 
possible.
-     * @param info
-     * @return
-     */
-    public static ChangeSet getChangeSet(CommitInfo info) {
-        if (info == null) {
-            return null;
-        }
-        CommitContext context = (CommitContext) 
info.getInfo().get(CommitContext.NAME);
-        if (context == null) {
-            return null;
-        }
-        return (ChangeSet) context.get(COMMIT_CONTEXT_OBSERVATION_CHANGESET);
-    }
-
     @Override
-    public void contentChanged(NodeState before, NodeState after, CommitInfo 
info) {
-        FilterResult prefilterTestResult = null;
-        if (PREFILTERING_TESTMODE) {
-            // OAK-4908 test mode: when the ChangeCollectorProvider is enabled
-            // there is the option to have the ChangeProcessors run in 
so-called
-            // 'test mode'. In this test mode the prefiltering is not applied,
-            // but instead verified if it *would have prefiltered correctly*.
-            // that test is therefore done at dequeue-time, hence in
-            // contentChanged
-            // TODO: remove this testing mechanism after a while
-            try {
-                prefilterTestResult = evalPrefilter(after, info, 
getChangeSet(info));
-            } catch (Exception e) {
-                LOG.warn("contentChanged: exception in wouldBeExcludedCommit: 
" + e, e);
-            }
-        }
-        if (before != null) {
+    public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo 
info) {
+        if (previousRoot != null) {
             try {
                 long start = PERF_LOGGER.start();
                 FilterProvider provider = filterProvider.get();
-                boolean onEventInvoked = false;
                 // FIXME don't rely on toString for session id
                 if (provider.includeCommit(contentSession.toString(), info)) {
-                    EventFilter filter = provider.getFilter(before, after);
-                    EventIterator events = new EventQueue(namePathMapper, 
info, before, after,
+                    EventFilter filter = provider.getFilter(previousRoot, 
root);
+                    EventIterator events = new EventQueue(namePathMapper, 
info, previousRoot, root,
                             provider.getSubTrees(), Filters.all(filter, 
VISIBLE_FILTER), 
                             provider.getEventAggregator());
 
@@ -509,7 +361,6 @@ class ChangeProcessor implements Filteri
                         }
                         try {
                             CountingIterator countingEvents = new 
CountingIterator(events);
-                            onEventInvoked = true;
                             eventListener.onEvent(countingEvents);
                             countingEvents.updateCounters(eventCount, 
eventDuration);
                         } finally {
@@ -520,33 +371,14 @@ class ChangeProcessor implements Filteri
                         }
                     }
                 }
-                if (prefilterTestResult != null) {
-                    // OAK-4908 test mode
-                    if (prefilterTestResult == FilterResult.EXCLUDE && 
onEventInvoked) {
-                        // this is not ok, an event would have gotten
-                        // excluded-by-prefiltering even though
-                        // it actually got an event.
-                        LOG.warn("contentChanged: delivering event which would 
have been prefiltered, "
-                                + "info={}, this={}, listener={}", info, this, 
eventListener);
-                    } else if (prefilterTestResult == FilterResult.INCLUDE && 
!onEventInvoked && info != null
-                            && info != CommitInfo.EMPTY) {
-                        // this can occur arbitrarily frequent. as prefiltering
-                        // is not perfect, it can
-                        // have false negatives - ie it can include even though
-                        // no event is then created
-                        // hence we can only really log at debug here
-                        LOG.debug(
-                                "contentChanged: no event to deliver but not 
prefiltered, info={}, this={}, listener={}",
-                                info, this, eventListener);
-                    }
-                }
                 PERF_LOGGER.end(start, 100,
                         "Generated events (before: {}, after: {})",
-                        before, after);
+                        previousRoot, root);
             } catch (Exception e) {
                 LOG.warn("Error while dispatching observation events for " + 
tracker, e);
             }
         }
+        previousRoot = root;
     }
 
     private static class CountingIterator implements EventIterator {
@@ -658,45 +490,4 @@ class ChangeProcessor implements Filteri
                 + ", commitRateLimiter=" + commitRateLimiter
                 + ", running=" + running.isSatisfied() + "]";
     }
-
-    /**
-     * Evaluate the prefilter for a given commit.
-     * @param changeSet 
-     * 
-     * @return a FilterResult indicating either inclusion, exclusion or
-     *         inclusion-due-to-skipping. The latter is used to reflect
-     *         prefilter evaluation better in statistics (as it could also have
-     *         been reported just as include)
-     */
-    private FilterResult evalPrefilter(NodeState root, CommitInfo info, 
ChangeSet changeSet) {
-        if (info == null) {
-            return FilterResult.PREFILTERING_SKIPPED;
-        }
-        if (root == null) {
-            // likely only occurs at startup
-            // we can't do any diffing etc, so just not exclude it
-            return FilterResult.PREFILTERING_SKIPPED;
-        }
-
-        final FilterProvider fp = filterProvider.get();
-        // FIXME don't rely on toString for session id
-        if (!fp.includeCommit(contentSession.toString(), info)) {
-            // 'classic' (and cheap pre-) filtering
-            return FilterResult.EXCLUDE;
-        }
-        if (changeSet == null) {
-            // then can't do any prefiltering since it was not
-            // able to complete the sets (within the given boundaries)
-            // (this corresponds to a large commit, which thus can't
-            // go through prefiltering)
-            return FilterResult.PREFILTERING_SKIPPED;
-        }
-
-        final ChangeSetFilter prefilter = fp;
-        if (prefilter.excludes(changeSet)) {
-            return FilterResult.EXCLUDE;
-        } else {
-            return FilterResult.INCLUDE;
-        }
-    }
 }

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
 Tue Nov  8 07:37:43 2016
@@ -84,12 +84,6 @@ import static org.apache.jackrabbit.oak.
                 referenceInterface = BackgroundObserverMBean.class,
                 policy = ReferencePolicy.DYNAMIC,
                 cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
-        @Reference(name = "changeProcessorMBean",
-                bind = "bindChangeProcessorMBean",
-                unbind = "unbindChangeProcessorMBean",
-                referenceInterface = ChangeProcessorMBean.class,
-                policy = ReferencePolicy.DYNAMIC,
-                cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
         @Reference(name = "filterConfigMBean",
                 bind = "bindFilterConfigMBean",
                 unbind = "unbindFilterConfigMBean",
@@ -102,7 +96,6 @@ public class ConsolidatedListenerMBeanIm
     private final AtomicInteger observerCount = new AtomicInteger();
     private final Map<ObjectName, EventListenerMBean> eventListeners = 
Maps.newConcurrentMap();
     private final Map<ObjectName, BackgroundObserverMBean> bgObservers = 
Maps.newConcurrentMap();
-    private final Map<ObjectName, ChangeProcessorMBean> changeProcessors = 
Maps.newConcurrentMap();
     private final Map<ObjectName, FilterConfigMBean> filterConfigs = 
Maps.newConcurrentMap();
 
     private Registration mbeanReg;
@@ -208,11 +201,6 @@ public class ConsolidatedListenerMBeanIm
                     m.observerMBean = ef.getValue();
                 }
             }
-            for (Map.Entry<ObjectName, ChangeProcessorMBean> ef : 
changeProcessors.entrySet()){
-                if (Objects.equal(getListenerId(ef.getKey()), listenerId)){
-                    m.changeProcessorMBean = ef.getValue();
-                }
-            }
             mbeans.add(m);
         }
         return mbeans;
@@ -261,16 +249,6 @@ public class ConsolidatedListenerMBeanIm
     }
 
     @SuppressWarnings("unused")
-    protected void bindChangeProcessorMBean(ChangeProcessorMBean mbean, 
Map<String, ?> config){
-       changeProcessors.put(getObjectName(config), mbean);
-    }
-
-    @SuppressWarnings("unused")
-    protected void unbindChangeProcessorMBean(ChangeProcessorMBean mbean, 
Map<String, ?> config){
-       changeProcessors.remove(getObjectName(config));
-    }
-
-    @SuppressWarnings("unused")
     protected void bindListenerMBean(EventListenerMBean mbean, Map<String, ?> 
config){
         eventListeners.put(getObjectName(config), mbean);
     }
@@ -302,7 +280,6 @@ public class ConsolidatedListenerMBeanIm
     private static class ListenerMBeans {
         EventListenerMBean eventListenerMBean;
         BackgroundObserverMBean observerMBean;
-        ChangeProcessorMBean changeProcessorMBean;
         FilterConfigMBean filterConfigMBean;
     }
 
@@ -324,9 +301,6 @@ public class ConsolidatedListenerMBeanIm
                 "ratioOfTimeSpentProcessingEvents",
                 "eventConsumerTimeRatio",
                 "queueBacklogMillis",
-                "prefilterSkips",
-                "prefilterExcludes",
-                "prefilterIncludes",
                 "queueSize",
                 "localEventCount",
                 "externalEventCount",
@@ -357,9 +331,6 @@ public class ConsolidatedListenerMBeanIm
                 SimpleType.INTEGER,
                 SimpleType.INTEGER,
                 SimpleType.INTEGER,
-                SimpleType.INTEGER,
-                SimpleType.INTEGER,
-                SimpleType.INTEGER,
                 SimpleType.STRING,
                 SimpleType.BOOLEAN,
                 SimpleType.BOOLEAN,
@@ -405,9 +376,6 @@ public class ConsolidatedListenerMBeanIm
                     
mbeans.eventListenerMBean.getRatioOfTimeSpentProcessingEvents(),
                     mbeans.eventListenerMBean.getEventConsumerTimeRatio(),
                     mbeans.eventListenerMBean.getQueueBacklogMillis(),
-                    mbeans.changeProcessorMBean == null ? -1 : 
mbeans.changeProcessorMBean.getPrefilterSkipCount(),
-                    mbeans.changeProcessorMBean == null ? -1 : 
mbeans.changeProcessorMBean.getPrefilterExcludeCount(),
-                    mbeans.changeProcessorMBean == null ? -1 : 
mbeans.changeProcessorMBean.getPrefilterIncludeCount(),
                     mbeans.observerMBean.getQueueSize(),
                     mbeans.observerMBean.getLocalEventCount(),
                     mbeans.observerMBean.getExternalEventCount(),

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
 Tue Nov  8 07:37:43 2016
@@ -512,21 +512,4 @@ public class OakEventFilterImpl extends
         return this;
     }
 
-    /**
-     * A hook called by the ObservationManagerImpl before creating the 
ChangeSetFilterImpl
-     * which allows this filter to adjust the includePaths according to its
-     * enabled flags.
-     * <p>
-     * This is used to set the includePath to be '/' in case 
includeAncestorRemove
-     * is set. The reason for this is that we must catch parent removals and 
can thus
-     * not apply the normally applied prefilter paths.
-     * @param includePaths the set to adjust depending on filter flags
-     */
-    void adjustPrefilterIncludePaths(Set<String> includePaths) {
-        if (includeAncestorRemove) {
-            includePaths.clear();
-            includePaths.add("/");
-        }
-    }
-
 }

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
 Tue Nov  8 07:37:43 2016
@@ -62,7 +62,6 @@ import org.apache.jackrabbit.oak.plugins
 import 
org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
 import 
org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory;
-import 
org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilterImpl;
 import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors;
 import org.apache.jackrabbit.oak.spi.commit.Observable;
 import 
org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
@@ -279,7 +278,6 @@ public class ObservationManagerImpl impl
 
         List<Condition> excludeConditions = createExclusions(filterBuilder, 
excludedPaths);
 
-        final String[] validatedNodeTypeNames = 
validateNodeTypeNames(nodeTypeName);
         Selector nodeTypeSelector = Selectors.PARENT;
         boolean deleteSubtree = true;
         if (oakEventFilter != null) {
@@ -306,7 +304,7 @@ public class ObservationManagerImpl impl
                     filterBuilder.moveSubtree(),
                     filterBuilder.eventType(eventTypes),
                     filterBuilder.uuid(Selectors.PARENT, uuids),
-                    filterBuilder.nodeType(nodeTypeSelector, 
validatedNodeTypeNames),
+                    filterBuilder.nodeType(nodeTypeSelector, 
validateNodeTypeNames(nodeTypeName)),
                     filterBuilder.accessControl(permissionProviderFactory));
         if (oakEventFilter != null) {
             condition = oakEventFilter.wrapMainCondition(condition, 
filterBuilder, permissionProviderFactory);
@@ -321,16 +319,6 @@ public class ObservationManagerImpl impl
         ListenerTracker tracker = new WarningListenerTracker(
                 !noExternal, listener, eventTypes, absPath, isDeep, uuids, 
nodeTypeName, noLocal);
 
-        if (oakEventFilter != null) {
-            oakEventFilter.adjustPrefilterIncludePaths(includePaths);
-        }
-        
-        // OAK-4908 : prefiltering support. here we have explicit yes/no/maybe 
filtering
-        // for things like propertyNames/nodeTypes/nodeNames/paths which 
cannot be 
-        // applied on the full-fledged filterBuilder above but requires an 
explicit 'prefilter' for that.
-        filterBuilder.setChangeSetFilter(new ChangeSetFilterImpl(includePaths, 
isDeep, excludedPaths, null,
-                validatedNodeTypeNames == null ? null : 
newHashSet(validatedNodeTypeNames), null));
-        
         addEventListener(listener, tracker, filterBuilder.build());
     }
 

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
 Tue Nov  8 07:37:43 2016
@@ -436,25 +436,7 @@ public class ObservationTest extends Abs
             observationManager.removeEventListener(listener);
         }
     }
-    
-    @Test
-    public void propertyFilter() throws Exception {
-        Node root = getNode("/");
-        ExpectationListener listener = new ExpectationListener();
-        observationManager.addEventListener(listener, PROPERTY_ADDED, "/a/b", 
false, null, null, false);
-        Node a = root.addNode("a");
-        Node b = a.addNode("b");
-        listener.expect("/a/b/jcr:primaryType", PROPERTY_ADDED);
 
-        listener.expectAdd(b.setProperty("propName", 1));
-       root.getSession().save();
-
-       List<Expectation> missing = listener.getMissing(TIME_OUT, 
TimeUnit.SECONDS);
-        assertTrue("Missing events: " + missing, missing.isEmpty());
-        List<Event> unexpected = listener.getUnexpected();
-        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
-    }
-    
     @Test
     public void pathFilter() throws Exception {
         final String path = "/events/only/here";
@@ -1533,7 +1515,7 @@ public class ObservationTest extends Abs
 
         filter = new JackrabbitEventFilter();
         filter.setEventTypes(ALL_EVENTS);
-        filter = FilterFactory.wrap(filter).withIncludeGlobPaths(TEST_PATH + 
"/a3/**/y");
+        filter = FilterFactory.wrap(filter).withIncludeGlobPaths(TEST_PATH + 
"/a3/**/y/*");
         oManager.addEventListener(listener, filter);
         cp = oManager.getChangeProcessor(listener);
         assertNotNull(cp);


Reply via email to