Author: mreutegg
Date: Tue Nov 8 07:37:43 2016
New Revision: 1768635
URL: http://svn.apache.org/viewvc?rev=1768635&view=rev
Log:
OAK-4908: Best-effort prefiltering in ChangeProcessor based on ChangeSet
Reverted revision 1768558
Removed:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
Tue Nov 8 07:37:43 2016
@@ -42,7 +42,6 @@ import com.google.common.collect.Immutab
import com.google.common.collect.Iterables;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.plugins.nodetype.TypePredicate;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
import
org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector;
import org.apache.jackrabbit.oak.plugins.tree.RootFactory;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -60,13 +59,6 @@ public final class FilterBuilder {
private boolean includeClusterLocal = true;
private final List<String> subTrees = newArrayList();
private Condition condition = includeAll();
- private ChangeSetFilter changeSetFilter = new ChangeSetFilter() {
-
- @Override
- public boolean excludes(ChangeSet changeSet) {
- return false;
- }
- };
private EventAggregator aggregator;
@@ -74,12 +66,6 @@ public final class FilterBuilder {
@Nonnull
EventFilter createFilter(@Nonnull NodeState before, @Nonnull NodeState
after);
}
-
- @Nonnull
- public FilterBuilder setChangeSetFilter(@Nonnull ChangeSetFilter
changeSetFilter) {
- this.changeSetFilter = changeSetFilter;
- return this;
- }
/**
* Adds a path to the set of paths whose subtrees include all events of
@@ -394,7 +380,6 @@ public final class FilterBuilder {
final EventAggregator aggregator = FilterBuilder.this.aggregator;
final Iterable<String> subTrees = FilterBuilder.this.getSubTrees();
final Condition condition = FilterBuilder.this.condition;
- final ChangeSetFilter changeSetFilter =
FilterBuilder.this.changeSetFilter;
@Override
public boolean includeCommit(@Nonnull String sessionId,
@CheckForNull CommitInfo info) {
@@ -432,11 +417,6 @@ public final class FilterBuilder {
public EventAggregator getEventAggregator() {
return aggregator;
}
-
- @Override
- public boolean excludes(ChangeSet changeSet) {
- return changeSetFilter.excludes(changeSet);
- }
};
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
Tue Nov 8 07:37:43 2016
@@ -28,11 +28,8 @@ import org.apache.jackrabbit.oak.spi.sta
/**
* Instance of this class provide a {@link EventFilter} for observation
* events and a filter for commits.
- * <p>
- * In order to support OAK-4908 a FilterProvider
- * extends ChangeSetFilter
*/
-public interface FilterProvider extends ChangeSetFilter {
+public interface FilterProvider {
/**
* Filter whole commits. Only commits for which this method returns
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
Tue Nov 8 07:37:43 2016
@@ -30,7 +30,6 @@ import java.lang.Thread.UncaughtExceptio
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -314,28 +313,4 @@ public class BackgroundObserver implemen
private static Logger getLogger(@Nonnull Observer observer) {
return LoggerFactory.getLogger(checkNotNull(observer).getClass());
}
-
-
- /** FOR TESTING ONLY
- * @throws InterruptedException **/
- boolean waitUntilStopped(int timeout, TimeUnit unit) throws
InterruptedException {
- long done = System.currentTimeMillis() + unit.toMillis(timeout);
- boolean added = false;
- synchronized(this) {
- added = queue.offer(STOP);
- currentTask.onComplete(completionHandler);
- }
- while(done > System.currentTimeMillis()) {
- synchronized(this) {
- if (!added) {
- added = queue.offer(STOP);
- }
- if (queue.size() == 0 || (queue.size() == 1 && queue.peek() ==
STOP)) {
- return true;
- }
- wait(1);
- }
- }
- return false;
- }
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
Tue Nov 8 07:37:43 2016
@@ -20,50 +20,34 @@
package org.apache.jackrabbit.oak.spi.commit;
import static java.util.concurrent.Executors.newFixedThreadPool;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import java.io.Closeable;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.plugins.observation.Filter;
-import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
-import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.junit.After;
import org.junit.Test;
-import com.google.common.collect.Lists;
-
-import junit.framework.AssertionFailedError;
-
public class BackgroundObserverTest {
private static final CommitInfo COMMIT_INFO = new CommitInfo("no-session",
null);
public static final int CHANGE_COUNT = 1024;
private final List<Runnable> assertions = Lists.newArrayList();
private CountDownLatch doneCounter;
- private final List<Closeable> closeables = Lists.newArrayList();
/**
* Assert that each observer of many running concurrently sees the same
- * linearly sequence of commits (i.e. sees the commits in the correct
- * order).
+ * linearly sequence of commits (i.e. sees the commits in the correct
order).
*/
@Test
public void concurrentObservers() throws InterruptedException {
@@ -115,7 +99,7 @@ public class BackgroundObserverTest {
return new BackgroundObserver(new Observer() {
// Need synchronised list here to maintain correct memory barrier
// when this is passed on to done(List<Runnable>)
- final List<Runnable> assertions =
Collections.synchronizedList(Lists.<Runnable> newArrayList());
+ final List<Runnable> assertions =
Collections.synchronizedList(Lists.<Runnable>newArrayList());
volatile NodeState previous;
@Override
@@ -141,288 +125,5 @@ public class BackgroundObserverTest {
}
}, executor, queueLength);
}
-
- class MyFilter implements Filter {
-
- private boolean excludeNext;
-
- void excludeNext(boolean excludeNext) {
- this.excludeNext = excludeNext;
- }
-
- @Override
- public boolean excludes(NodeState root, CommitInfo info) {
- final boolean excludes = excludeNext;
- excludeNext = false;
- return excludes;
- }
-
- }
-
- class Recorder implements FilteringAwareObserver {
-
- List<Pair> includedChanges = new LinkedList<Pair>();
- private boolean pause;
- private boolean pausing;
-
- public Recorder() {
- }
-
- @Override
- public void contentChanged(NodeState before, NodeState after,
CommitInfo info) {
- includedChanges.add(new Pair(before, after));
- maybePause();
- }
-
- public void maybePause() {
- synchronized (this) {
- try {
- while (pause) {
- pausing = true;
- this.notifyAll();
- try {
- this.wait();
- } catch (InterruptedException e) {
- // should not happen
- }
- }
- } finally {
- pausing = false;
- this.notifyAll();
- }
- }
- }
-
- public synchronized void pause() {
- this.pause = true;
- }
-
- public synchronized void unpause() {
- this.pause = false;
- this.notifyAll();
- }
-
- public boolean waitForPausing(int timeout, TimeUnit unit) throws
InterruptedException {
- final long done = System.currentTimeMillis() +
unit.toMillis(timeout);
- synchronized (this) {
- while (!pausing && done > System.currentTimeMillis()) {
- this.wait();
- }
- return pausing;
- }
- }
-
- public boolean waitForUnpausing(int timeout, TimeUnit unit) throws
InterruptedException {
- final long done = System.currentTimeMillis() +
unit.toMillis(timeout);
- synchronized (this) {
- while (pausing && done > System.currentTimeMillis()) {
- this.wait();
- }
- return !pausing;
- }
- }
-
- }
-
- class Pair {
- private final NodeState before;
- private final NodeState after;
-
- Pair(NodeState before, NodeState after) {
- this.before = before;
- this.after = after;
- }
-
- @Override
- public String toString() {
- return "Pair(before=" + before + ", after=" + after + ")";
- }
- }
-
- class NodeStateGenerator {
- Random r = new Random(1232131); // seed: repeatable tests
- NodeBuilder builder = EMPTY_NODE.builder();
-
- NodeState next() {
- builder.setProperty("p", r.nextInt());
- NodeState result = builder.getNodeState();
- builder = result.builder();
- return result;
- }
- }
-
- private void assertMatches(String msg, List<Pair> expected, List<Pair>
actual) {
- assertEquals("size mismatch. msg=" + msg, expected.size(),
actual.size());
- for (int i = 0; i < expected.size(); i++) {
- assertSame("mismatch of before at pos=" + i + ", msg=" + msg,
expected.get(i).before, actual.get(i).before);
- assertSame("mismatch of after at pos=" + i + ", msg=" + msg,
expected.get(i).after, actual.get(i).after);
- }
- }
-
- @After
- public void shutDown() throws Exception {
- for (Closeable closeable : closeables) {
- try {
- closeable.close();
- } catch (Exception e) {
- throw new AssertionFailedError(e.getMessage());
- }
- }
- }
-
- @Test
- public void testExcludedAllCommits() throws Exception {
- MyFilter filter = new MyFilter();
- Recorder recorder = new Recorder();
- ExecutorService executor = newSingleThreadExecutor();
- FilteringObserver fo = new FilteringObserver(executor, 5, filter,
recorder);
- closeables.add(fo);
- List<Pair> expected = new LinkedList<Pair>();
- NodeStateGenerator generator = new NodeStateGenerator();
- NodeState first = generator.next();
- expected.add(new Pair(null, first));
- fo.contentChanged(first, CommitInfo.EMPTY);
- for (int i = 0; i < 100000; i++) {
- filter.excludeNext(true);
- fo.contentChanged(generator.next(), CommitInfo.EMPTY);
- }
- assertTrue("testExcludedAllCommits",
fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
- assertMatches("testExcludedAllCommits", expected,
recorder.includedChanges);
- }
-
- @Test
- public void testNoExcludedCommits() throws Exception {
- MyFilter filter = new MyFilter();
- Recorder recorder = new Recorder();
- ExecutorService executor = newSingleThreadExecutor();
- FilteringObserver fo = new FilteringObserver(executor, 10002, filter,
recorder);
- closeables.add(fo);
- List<Pair> expected = new LinkedList<Pair>();
- NodeStateGenerator generator = new NodeStateGenerator();
- NodeState first = generator.next();
- expected.add(new Pair(null, first));
- fo.contentChanged(first, CommitInfo.EMPTY);
- NodeState previous = first;
- for (int i = 0; i < 10000; i++) {
- filter.excludeNext(false);
- NodeState next = generator.next();
- expected.add(new Pair(previous, next));
- previous = next;
- fo.contentChanged(next, CommitInfo.EMPTY);
- }
- assertTrue("testNoExcludedCommits",
fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
- assertMatches("testNoExcludedCommits", expected,
recorder.includedChanges);
- }
-
- @Test
- public void testExcludeCommitsWithFullQueue() throws Exception {
- MyFilter filter = new MyFilter();
- Recorder recorder = new Recorder();
- ExecutorService executor = newSingleThreadExecutor();
- FilteringObserver fo = new FilteringObserver(executor, 2, filter,
recorder);
- closeables.add(fo);
- List<Pair> expected = new LinkedList<Pair>();
- NodeStateGenerator generator = new NodeStateGenerator();
- recorder.pause();
-
- // the first one will directly go to the recorder
- NodeState firstIncluded = generator.next();
- expected.add(new Pair(null, firstIncluded));
- fo.contentChanged(firstIncluded, CommitInfo.EMPTY);
-
- assertTrue("observer did not get called (yet?)",
recorder.waitForPausing(5, TimeUnit.SECONDS));
-
- // this one will be queued as #1
- NodeState secondIncluded = generator.next();
- expected.add(new Pair(firstIncluded, secondIncluded));
- fo.contentChanged(secondIncluded, CommitInfo.EMPTY);
-
- // this one will be queued as #2
- NodeState thirdIncluded = generator.next();
- expected.add(new Pair(secondIncluded, thirdIncluded));
- fo.contentChanged(thirdIncluded, CommitInfo.EMPTY);
-
- // this one will cause the queue to 'overflow' (full==true)
- NodeState forthQueueFull = generator.next();
- // not adding to expected, as this one ends up in the overflow element
- fo.contentChanged(forthQueueFull, CommitInfo.EMPTY);
-
- NodeState next;
- // exclude when queue is full
- filter.excludeNext(true);
- next = generator.next();
- // if excluded==true and full, hence not adding to expected
- fo.contentChanged(next, CommitInfo.EMPTY);
- // include after an exclude when queue was full
- // => this is not supported. when the queue
- filter.excludeNext(false);
- next = generator.next();
- // excluded==false BUT queue full, hence not adding to expected
- fo.contentChanged(next, CommitInfo.EMPTY);
- // let recorder continue
- recorder.unpause();
-
- recorder.waitForUnpausing(5, TimeUnit.SECONDS);
- Thread.sleep(1000); // wait for 1 element to be dequeued at least
- // exclude when queue is no longer full
- filter.excludeNext(true);
- NodeState seventhAfterQueueFull = generator.next();
- // with the introduction of the FilteringAwareObserver this
- // 'seventhAfterQueueFull' root will not be forwarded
- // to the BackgroundObserver - thus entirely filtered
-
- fo.contentChanged(seventhAfterQueueFull, CommitInfo.EMPTY);
-
- // but with the introduction of FilteringAwareObserver the delivery
- // only happens with non-filtered items, so adding yet another one now
- filter.excludeNext(false);
- NodeState last = generator.next();
- // while above the "seventhAfterQueueFull" DOES get filtered, the next
contentChange
- // triggers the release of the 'queue full overflow element' (with
commitInfo==null)
- // and that we must add as expected()
- expected.add(new Pair(thirdIncluded, seventhAfterQueueFull)); //
commitInfo == null
- expected.add(new Pair(seventhAfterQueueFull, last));
- fo.contentChanged(last, CommitInfo.EMPTY);
-
- assertTrue("testExcludeCommitsWithFullQueue",
fo.getBackgroundObserver().waitUntilStopped(10, TimeUnit.SECONDS));
- assertMatches("testExcludeCommitsWithFullQueue", expected,
recorder.includedChanges);
- }
-
- @Test
- public void testExcludeSomeCommits() throws Exception {
- ExecutorService executor = newSingleThreadExecutor();
- for (int i = 0; i < 100; i++) {
- doTestExcludeSomeCommits(i, executor);
- }
- for (int i = 100; i < 10000; i += 50) {
- doTestExcludeSomeCommits(i, executor);
- }
- }
-
- private void doTestExcludeSomeCommits(int cnt, Executor executor) throws
Exception {
- MyFilter filter = new MyFilter();
- Recorder recorder = new Recorder();
- FilteringObserver fo = new FilteringObserver(executor, cnt + 2,
filter, recorder);
- closeables.add(fo);
- List<Pair> expected = new LinkedList<Pair>();
- Random r = new Random(2343242); // seed: repeatable tests
- NodeStateGenerator generator = new NodeStateGenerator();
- NodeState first = generator.next();
- expected.add(new Pair(null, first));
- fo.contentChanged(first, CommitInfo.EMPTY);
- NodeState previous = first;
- for (int i = 0; i < cnt; i++) {
- boolean excludeNext = r.nextInt(100) < 90;
- filter.excludeNext(excludeNext);
- NodeState next = generator.next();
- if (!excludeNext) {
- expected.add(new Pair(previous, next));
- }
- previous = next;
- fo.contentChanged(next, CommitInfo.EMPTY);
- }
- assertTrue("cnt=" + cnt,
fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
- assertMatches("cnt=" + cnt, expected, recorder.includedChanges);
- }
}
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
Tue Nov 8 07:37:43 2016
@@ -45,7 +45,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.name.NamespaceEditorProvider;
import org.apache.jackrabbit.oak.plugins.nodetype.TypeEditorProvider;
import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
import org.apache.jackrabbit.oak.plugins.version.VersionHook;
import org.apache.jackrabbit.oak.query.QueryEngineSettings;
@@ -121,7 +120,6 @@ public class Jcr {
with(new NamespaceEditorProvider());
with(new TypeEditorProvider());
with(new ConflictValidatorProvider());
- with(new ChangeCollectorProvider());
with(new ReferenceEditorProvider());
with(new ReferenceIndexProvider());
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
Tue Nov 8 07:37:43 2016
@@ -21,7 +21,6 @@ package org.apache.jackrabbit.oak.jcr.ob
import static com.google.common.base.Preconditions.checkState;
import static
org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER;
import static
org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION;
-import static
org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider.COMMIT_CONTEXT_OBSERVATION_CHANGESET;
import static
org.apache.jackrabbit.oak.plugins.observation.filter.VisibleFilter.VISIBLE_FILTER;
import static
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import static
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerObserver;
@@ -33,46 +32,40 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Monitor;
+import com.google.common.util.concurrent.Monitor.Guard;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
-import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
-import org.apache.jackrabbit.oak.plugins.observation.Filter;
-import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
-import org.apache.jackrabbit.oak.plugins.observation.FilteringDispatcher;
-import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
-import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilter;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
-import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
-import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticManager;
+import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.apache.jackrabbit.oak.util.PerfLogger;
import org.apache.jackrabbit.stats.TimeSeriesMax;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Monitor;
-import com.google.common.util.concurrent.Monitor.Guard;
-
/**
* A {@code ChangeProcessor} generates observation {@link
javax.jcr.observation.Event}s
* based on a {@link FilterProvider filter} and delivers them to an {@link
EventListener}.
@@ -80,27 +73,11 @@ import com.google.common.util.concurrent
* After instantiation a {@code ChangeProcessor} must be started in order to
start
* delivering observation events and stopped to stop doing so.
*/
-class ChangeProcessor implements FilteringAwareObserver {
+class ChangeProcessor implements Observer {
private static final Logger LOG =
LoggerFactory.getLogger(ChangeProcessor.class);
private static final PerfLogger PERF_LOGGER = new PerfLogger(
LoggerFactory.getLogger(ChangeProcessor.class.getName() +
".perf"));
- private static enum FilterResult {
- /** marks a commit as to be included, ie delivered.
- * It's okay to falsely mark a commit as included,
- * since filtering (as part of converting to events)
- * will be applied at a later stage again. */
- INCLUDE,
- /** mark a commit as not of interest to this ChangeProcessor.
- * Exclusion is definite, ie it's not okay to falsely
- * mark a commit as excluded */
- EXCLUDE,
- /** mark a commit as included but indicate that this
- * is not a result of prefiltering but that prefiltering
- * was skipped/not applicable for some reason */
- PREFILTERING_SKIPPED
- }
-
/**
* Fill ratio of the revision queue at which commits should be delayed
* (conditional of {@code commitRateLimiter} being non {@code null}).
@@ -112,12 +89,7 @@ class ChangeProcessor implements Filteri
* kicks in.
*/
public static final int MAX_DELAY;
-
- /** The test mode can be used to just verify if prefiltering would have
- * correctly done its job and warn if that's not the case.
- */
- private static final boolean PREFILTERING_TESTMODE;
-
+
// OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using
System.properties for now
static {
final String delayThresholdStr =
System.getProperty("oak.commitRateLimiter.delayThreshold");
@@ -142,18 +114,6 @@ class ChangeProcessor implements Filteri
}
DELAY_THRESHOLD = delayThreshold;
MAX_DELAY = maxDelay;
-
- final String prefilteringTestModeStr =
System.getProperty("oak.observation.prefilteringTestMode");
- boolean prefilteringTestModeBool = false; // default is enabled
- try {
- if (prefilteringTestModeStr != null &&
prefilteringTestModeStr.length() != 0) {
- prefilteringTestModeBool =
Boolean.parseBoolean(prefilteringTestModeStr);
- LOG.info("<clinit> using oak.observation.prefilteringTestMode
= " + prefilteringTestModeBool);
- }
- } catch(RuntimeException e) {
- LOG.warn("<clinit> could not parse
oak.observation.prefilteringTestMode, using default (" +
prefilteringTestModeBool + "): " + e, e);
- }
- PREFILTERING_TESTMODE = prefilteringTestModeBool;
}
private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -185,22 +145,8 @@ class ChangeProcessor implements Filteri
*/
private CompositeRegistration registration;
- /**
- * for statistics: tracks how many times prefiltering excluded a commit
- */
- private int prefilterExcludeCount;
-
- /**
- * for statistics: tracks how many times prefiltering included a commit
- */
- private int prefilterIncludeCount;
-
- /**
- * for statistics: tracks how many times prefiltering was ignored (not
evaluated at all),
- * either because it was disabled, queue too small, CommitInfo null or
CommitContext null
- */
- private int prefilterSkipCount;
-
+ private volatile NodeState previousRoot;
+
public ChangeProcessor(
ContentSession contentSession,
NamePathMapper namePathMapper,
@@ -234,29 +180,6 @@ class ChangeProcessor implements Filteri
return filterProvider.get();
}
- @Nonnull
- public ChangeProcessorMBean getMBean() {
- return new ChangeProcessorMBean() {
-
- @Override
- public int getPrefilterExcludeCount() {
- return prefilterExcludeCount;
- }
-
- @Override
- public int getPrefilterIncludeCount() {
- return prefilterIncludeCount;
- }
-
- @Override
- public int getPrefilterSkipCount() {
- return prefilterSkipCount;
- }
-
- };
- }
-
-
/**
* Start this change processor
* @param whiteboard the whiteboard instance to used for scheduling
individual
@@ -267,18 +190,16 @@ class ChangeProcessor implements Filteri
checkState(registration == null, "Change processor started already");
final WhiteboardExecutor executor = new WhiteboardExecutor();
executor.start(whiteboard);
- final FilteringObserver filteringObserver = createObserver(executor);
+ final BackgroundObserver observer = createObserver(executor);
listenerId = COUNTER.incrementAndGet() + "";
Map<String, String> attrs = ImmutableMap.of(LISTENER_ID, listenerId);
String name = tracker.toString();
registration = new CompositeRegistration(
- registerObserver(whiteboard, filteringObserver),
+ registerObserver(whiteboard, observer),
registerMBean(whiteboard, EventListenerMBean.class,
tracker.getListenerMBean(), "EventListener", name, attrs),
registerMBean(whiteboard, BackgroundObserverMBean.class,
- filteringObserver.getBackgroundObserver().getMBean(),
BackgroundObserverMBean.TYPE, name, attrs),
- registerMBean(whiteboard, ChangeProcessorMBean.class,
- getMBean(), ChangeProcessorMBean.TYPE, name, attrs),
+ observer.getMBean(), BackgroundObserverMBean.TYPE, name,
attrs),
//TODO If FilterProvider gets changed later then MBean would need
to be
// re-registered
registerMBean(whiteboard, FilterConfigMBean.class,
@@ -286,7 +207,7 @@ class ChangeProcessor implements Filteri
new Registration() {
@Override
public void unregister() {
- filteringObserver.close();
+ observer.close();
}
},
new Registration() {
@@ -304,9 +225,8 @@ class ChangeProcessor implements Filteri
);
}
- private FilteringObserver createObserver(final WhiteboardExecutor
executor) {
- FilteringDispatcher fd = new FilteringDispatcher(this);
- BackgroundObserver bo = new BackgroundObserver(fd, executor,
queueLength) {
+ private BackgroundObserver createObserver(final WhiteboardExecutor
executor) {
+ return new BackgroundObserver(this, executor, queueLength) {
private volatile long delay;
private volatile boolean blocking;
@@ -367,43 +287,7 @@ class ChangeProcessor implements Filteri
}
}
-
- @Override
- public String toString() {
- return "Prefiltering BackgroundObserver for
"+ChangeProcessor.this;
- }
};
- return new FilteringObserver(bo, new Filter() {
-
- @Override
- public boolean excludes(NodeState root, CommitInfo info) {
- if (PREFILTERING_TESTMODE) {
- // then we don't prefilter but only test later
- prefilterSkipCount++;
- return false;
- }
- final FilterResult filterResult = evalPrefilter(root, info,
getChangeSet(info));
- switch (filterResult) {
- case PREFILTERING_SKIPPED: {
- prefilterSkipCount++;
- return false;
- }
- case EXCLUDE: {
- prefilterExcludeCount++;
- return true;
- }
- case INCLUDE: {
- prefilterIncludeCount++;
- return false;
- }
- default: {
- LOG.info("isExcluded: unknown/unsupported filter result: "
+ filterResult);
- prefilterSkipCount++;
- return false;
- }
- }
- }
- });
}
private final Monitor runningMonitor = new Monitor();
@@ -455,48 +339,16 @@ class ChangeProcessor implements Filteri
}
}
- /**
- * Utility method that extracts the ChangeSet from a CommitInfo if
possible.
- * @param info
- * @return
- */
- public static ChangeSet getChangeSet(CommitInfo info) {
- if (info == null) {
- return null;
- }
- CommitContext context = (CommitContext)
info.getInfo().get(CommitContext.NAME);
- if (context == null) {
- return null;
- }
- return (ChangeSet) context.get(COMMIT_CONTEXT_OBSERVATION_CHANGESET);
- }
-
@Override
- public void contentChanged(NodeState before, NodeState after, CommitInfo
info) {
- FilterResult prefilterTestResult = null;
- if (PREFILTERING_TESTMODE) {
- // OAK-4908 test mode: when the ChangeCollectorProvider is enabled
- // there is the option to have the ChangeProcessors run in
so-called
- // 'test mode'. In this test mode the prefiltering is not applied,
- // but instead verified if it *would have prefiltered correctly*.
- // that test is therefore done at dequeue-time, hence in
- // contentChanged
- // TODO: remove this testing mechanism after a while
- try {
- prefilterTestResult = evalPrefilter(after, info,
getChangeSet(info));
- } catch (Exception e) {
- LOG.warn("contentChanged: exception in wouldBeExcludedCommit:
" + e, e);
- }
- }
- if (before != null) {
+ public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo
info) {
+ if (previousRoot != null) {
try {
long start = PERF_LOGGER.start();
FilterProvider provider = filterProvider.get();
- boolean onEventInvoked = false;
// FIXME don't rely on toString for session id
if (provider.includeCommit(contentSession.toString(), info)) {
- EventFilter filter = provider.getFilter(before, after);
- EventIterator events = new EventQueue(namePathMapper,
info, before, after,
+ EventFilter filter = provider.getFilter(previousRoot,
root);
+ EventIterator events = new EventQueue(namePathMapper,
info, previousRoot, root,
provider.getSubTrees(), Filters.all(filter,
VISIBLE_FILTER),
provider.getEventAggregator());
@@ -509,7 +361,6 @@ class ChangeProcessor implements Filteri
}
try {
CountingIterator countingEvents = new
CountingIterator(events);
- onEventInvoked = true;
eventListener.onEvent(countingEvents);
countingEvents.updateCounters(eventCount,
eventDuration);
} finally {
@@ -520,33 +371,14 @@ class ChangeProcessor implements Filteri
}
}
}
- if (prefilterTestResult != null) {
- // OAK-4908 test mode
- if (prefilterTestResult == FilterResult.EXCLUDE &&
onEventInvoked) {
- // this is not ok, an event would have gotten
- // excluded-by-prefiltering even though
- // it actually got an event.
- LOG.warn("contentChanged: delivering event which would
have been prefiltered, "
- + "info={}, this={}, listener={}", info, this,
eventListener);
- } else if (prefilterTestResult == FilterResult.INCLUDE &&
!onEventInvoked && info != null
- && info != CommitInfo.EMPTY) {
- // this can occur arbitrarily frequent. as prefiltering
- // is not perfect, it can
- // have false negatives - ie it can include even though
- // no event is then created
- // hence we can only really log at debug here
- LOG.debug(
- "contentChanged: no event to deliver but not
prefiltered, info={}, this={}, listener={}",
- info, this, eventListener);
- }
- }
PERF_LOGGER.end(start, 100,
"Generated events (before: {}, after: {})",
- before, after);
+ previousRoot, root);
} catch (Exception e) {
LOG.warn("Error while dispatching observation events for " +
tracker, e);
}
}
+ previousRoot = root;
}
private static class CountingIterator implements EventIterator {
@@ -658,45 +490,4 @@ class ChangeProcessor implements Filteri
+ ", commitRateLimiter=" + commitRateLimiter
+ ", running=" + running.isSatisfied() + "]";
}
-
- /**
- * Evaluate the prefilter for a given commit.
- * @param changeSet
- *
- * @return a FilterResult indicating either inclusion, exclusion or
- * inclusion-due-to-skipping. The latter is used to reflect
- * prefilter evaluation better in statistics (as it could also have
- * been reported just as include)
- */
- private FilterResult evalPrefilter(NodeState root, CommitInfo info,
ChangeSet changeSet) {
- if (info == null) {
- return FilterResult.PREFILTERING_SKIPPED;
- }
- if (root == null) {
- // likely only occurs at startup
- // we can't do any diffing etc, so just not exclude it
- return FilterResult.PREFILTERING_SKIPPED;
- }
-
- final FilterProvider fp = filterProvider.get();
- // FIXME don't rely on toString for session id
- if (!fp.includeCommit(contentSession.toString(), info)) {
- // 'classic' (and cheap pre-) filtering
- return FilterResult.EXCLUDE;
- }
- if (changeSet == null) {
- // then can't do any prefiltering since it was not
- // able to complete the sets (within the given boundaries)
- // (this corresponds to a large commit, which thus can't
- // go through prefiltering)
- return FilterResult.PREFILTERING_SKIPPED;
- }
-
- final ChangeSetFilter prefilter = fp;
- if (prefilter.excludes(changeSet)) {
- return FilterResult.EXCLUDE;
- } else {
- return FilterResult.INCLUDE;
- }
- }
}
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
Tue Nov 8 07:37:43 2016
@@ -84,12 +84,6 @@ import static org.apache.jackrabbit.oak.
referenceInterface = BackgroundObserverMBean.class,
policy = ReferencePolicy.DYNAMIC,
cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
- @Reference(name = "changeProcessorMBean",
- bind = "bindChangeProcessorMBean",
- unbind = "unbindChangeProcessorMBean",
- referenceInterface = ChangeProcessorMBean.class,
- policy = ReferencePolicy.DYNAMIC,
- cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
@Reference(name = "filterConfigMBean",
bind = "bindFilterConfigMBean",
unbind = "unbindFilterConfigMBean",
@@ -102,7 +96,6 @@ public class ConsolidatedListenerMBeanIm
private final AtomicInteger observerCount = new AtomicInteger();
private final Map<ObjectName, EventListenerMBean> eventListeners =
Maps.newConcurrentMap();
private final Map<ObjectName, BackgroundObserverMBean> bgObservers =
Maps.newConcurrentMap();
- private final Map<ObjectName, ChangeProcessorMBean> changeProcessors =
Maps.newConcurrentMap();
private final Map<ObjectName, FilterConfigMBean> filterConfigs =
Maps.newConcurrentMap();
private Registration mbeanReg;
@@ -208,11 +201,6 @@ public class ConsolidatedListenerMBeanIm
m.observerMBean = ef.getValue();
}
}
- for (Map.Entry<ObjectName, ChangeProcessorMBean> ef :
changeProcessors.entrySet()){
- if (Objects.equal(getListenerId(ef.getKey()), listenerId)){
- m.changeProcessorMBean = ef.getValue();
- }
- }
mbeans.add(m);
}
return mbeans;
@@ -261,16 +249,6 @@ public class ConsolidatedListenerMBeanIm
}
@SuppressWarnings("unused")
- protected void bindChangeProcessorMBean(ChangeProcessorMBean mbean,
Map<String, ?> config){
- changeProcessors.put(getObjectName(config), mbean);
- }
-
- @SuppressWarnings("unused")
- protected void unbindChangeProcessorMBean(ChangeProcessorMBean mbean,
Map<String, ?> config){
- changeProcessors.remove(getObjectName(config));
- }
-
- @SuppressWarnings("unused")
protected void bindListenerMBean(EventListenerMBean mbean, Map<String, ?>
config){
eventListeners.put(getObjectName(config), mbean);
}
@@ -302,7 +280,6 @@ public class ConsolidatedListenerMBeanIm
private static class ListenerMBeans {
EventListenerMBean eventListenerMBean;
BackgroundObserverMBean observerMBean;
- ChangeProcessorMBean changeProcessorMBean;
FilterConfigMBean filterConfigMBean;
}
@@ -324,9 +301,6 @@ public class ConsolidatedListenerMBeanIm
"ratioOfTimeSpentProcessingEvents",
"eventConsumerTimeRatio",
"queueBacklogMillis",
- "prefilterSkips",
- "prefilterExcludes",
- "prefilterIncludes",
"queueSize",
"localEventCount",
"externalEventCount",
@@ -357,9 +331,6 @@ public class ConsolidatedListenerMBeanIm
SimpleType.INTEGER,
SimpleType.INTEGER,
SimpleType.INTEGER,
- SimpleType.INTEGER,
- SimpleType.INTEGER,
- SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.BOOLEAN,
SimpleType.BOOLEAN,
@@ -405,9 +376,6 @@ public class ConsolidatedListenerMBeanIm
mbeans.eventListenerMBean.getRatioOfTimeSpentProcessingEvents(),
mbeans.eventListenerMBean.getEventConsumerTimeRatio(),
mbeans.eventListenerMBean.getQueueBacklogMillis(),
- mbeans.changeProcessorMBean == null ? -1 :
mbeans.changeProcessorMBean.getPrefilterSkipCount(),
- mbeans.changeProcessorMBean == null ? -1 :
mbeans.changeProcessorMBean.getPrefilterExcludeCount(),
- mbeans.changeProcessorMBean == null ? -1 :
mbeans.changeProcessorMBean.getPrefilterIncludeCount(),
mbeans.observerMBean.getQueueSize(),
mbeans.observerMBean.getLocalEventCount(),
mbeans.observerMBean.getExternalEventCount(),
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
Tue Nov 8 07:37:43 2016
@@ -512,21 +512,4 @@ public class OakEventFilterImpl extends
return this;
}
- /**
- * A hook called by the ObservationManagerImpl before creating the
ChangeSetFilterImpl
- * which allows this filter to adjust the includePaths according to its
- * enabled flags.
- * <p>
- * This is used to set the includePath to be '/' in case
includeAncestorRemove
- * is set. The reason for this is that we must catch parent removals and
can thus
- * not apply the normally applied prefilter paths.
- * @param includePaths the set to adjust depending on filter flags
- */
- void adjustPrefilterIncludePaths(Set<String> includePaths) {
- if (includeAncestorRemove) {
- includePaths.clear();
- includePaths.add("/");
- }
- }
-
}
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
Tue Nov 8 07:37:43 2016
@@ -62,7 +62,6 @@ import org.apache.jackrabbit.oak.plugins
import
org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
import
org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory;
-import
org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilterImpl;
import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors;
import org.apache.jackrabbit.oak.spi.commit.Observable;
import
org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
@@ -279,7 +278,6 @@ public class ObservationManagerImpl impl
List<Condition> excludeConditions = createExclusions(filterBuilder,
excludedPaths);
- final String[] validatedNodeTypeNames =
validateNodeTypeNames(nodeTypeName);
Selector nodeTypeSelector = Selectors.PARENT;
boolean deleteSubtree = true;
if (oakEventFilter != null) {
@@ -306,7 +304,7 @@ public class ObservationManagerImpl impl
filterBuilder.moveSubtree(),
filterBuilder.eventType(eventTypes),
filterBuilder.uuid(Selectors.PARENT, uuids),
- filterBuilder.nodeType(nodeTypeSelector,
validatedNodeTypeNames),
+ filterBuilder.nodeType(nodeTypeSelector,
validateNodeTypeNames(nodeTypeName)),
filterBuilder.accessControl(permissionProviderFactory));
if (oakEventFilter != null) {
condition = oakEventFilter.wrapMainCondition(condition,
filterBuilder, permissionProviderFactory);
@@ -321,16 +319,6 @@ public class ObservationManagerImpl impl
ListenerTracker tracker = new WarningListenerTracker(
!noExternal, listener, eventTypes, absPath, isDeep, uuids,
nodeTypeName, noLocal);
- if (oakEventFilter != null) {
- oakEventFilter.adjustPrefilterIncludePaths(includePaths);
- }
-
- // OAK-4908 : prefiltering support. here we have explicit yes/no/maybe
filtering
- // for things like propertyNames/nodeTypes/nodeNames/paths which
cannot be
- // applied on the full-fledged filterBuilder above but requires an
explicit 'prefilter' for that.
- filterBuilder.setChangeSetFilter(new ChangeSetFilterImpl(includePaths,
isDeep, excludedPaths, null,
- validatedNodeTypeNames == null ? null :
newHashSet(validatedNodeTypeNames), null));
-
addEventListener(listener, tracker, filterBuilder.build());
}
Modified:
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java?rev=1768635&r1=1768634&r2=1768635&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
Tue Nov 8 07:37:43 2016
@@ -436,25 +436,7 @@ public class ObservationTest extends Abs
observationManager.removeEventListener(listener);
}
}
-
- @Test
- public void propertyFilter() throws Exception {
- Node root = getNode("/");
- ExpectationListener listener = new ExpectationListener();
- observationManager.addEventListener(listener, PROPERTY_ADDED, "/a/b",
false, null, null, false);
- Node a = root.addNode("a");
- Node b = a.addNode("b");
- listener.expect("/a/b/jcr:primaryType", PROPERTY_ADDED);
- listener.expectAdd(b.setProperty("propName", 1));
- root.getSession().save();
-
- List<Expectation> missing = listener.getMissing(TIME_OUT,
TimeUnit.SECONDS);
- assertTrue("Missing events: " + missing, missing.isEmpty());
- List<Event> unexpected = listener.getUnexpected();
- assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
- }
-
@Test
public void pathFilter() throws Exception {
final String path = "/events/only/here";
@@ -1533,7 +1515,7 @@ public class ObservationTest extends Abs
filter = new JackrabbitEventFilter();
filter.setEventTypes(ALL_EVENTS);
- filter = FilterFactory.wrap(filter).withIncludeGlobPaths(TEST_PATH +
"/a3/**/y");
+ filter = FilterFactory.wrap(filter).withIncludeGlobPaths(TEST_PATH +
"/a3/**/y/*");
oManager.addEventListener(listener, filter);
cp = oManager.getChangeProcessor(listener);
assertNotNull(cp);