Author: stefanegli
Date: Mon Oct 31 16:09:39 2016
New Revision: 1767322
URL: http://svn.apache.org/viewvc?rev=1767322&view=rev
Log:
OAK-5011 : adding EventAggregator to the FilterBuilder which allows to report
events higher up the path chain - hence the term aggregation - than where it
happend. The height of where the event is aggregated to is controlled by this
EventAggregator. In order to decide if the latter should aggregate, it gets the
NodeStates of the parents. That allows it to inspect for eg node type etc,
something not possible without this change
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/EventAggregator.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/QueueingHandler.java
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/EventAggregator.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/EventAggregator.java?rev=1767322&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/EventAggregator.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/EventAggregator.java
Mon Oct 31 16:09:39 2016
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import java.util.List;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+/**
+ * An EventAggregator can be provided via a FilterProvider
+ * and is then used to 'aggregate' an event at creation time
+ * (ie after filtering).
+ * <p>
+ * Aggregation in this context means to have the event identifier
+ * not be the path (as usual) but one of its parents.
+ * This allows to have client code use an aggregating filter
+ * and ignore the event paths but only inspect the event
+ * identifier which is then the aggregation parent node.
+ */
+public interface EventAggregator {
+
+ /**
+ * Aggregates a property change
+ * @return 0 or negative for no aggregation, positive indicating
+ * how many levels to aggregate upwards the tree.
+ */
+ int aggregate(NodeState root, List<ChildNodeEntry> parents, PropertyState
propertyState);
+
+ /**
+ * Aggregates a node change
+ * @return 0 or negative for no aggregation, positive indicating
+ * how many levels to aggregate upwards the tree.
+ */
+ int aggregate(NodeState root, List<ChildNodeEntry> parents, ChildNodeEntry
childNodeState);
+
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/EventAggregator.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java?rev=1767322&r1=1767321&r2=1767322&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
Mon Oct 31 16:09:39 2016
@@ -60,6 +60,8 @@ public final class FilterBuilder {
private final List<String> subTrees = newArrayList();
private Condition condition = includeAll();
+ private EventAggregator aggregator;
+
public interface Condition {
@Nonnull
EventFilter createFilter(@Nonnull NodeState before, @Nonnull NodeState
after);
@@ -99,6 +101,11 @@ public final class FilterBuilder {
private Iterable<String> getSubTrees() {
return subTrees.isEmpty() ? ImmutableList.of("/") : subTrees;
}
+
+ public FilterBuilder aggregator(EventAggregator aggregator) {
+ this.aggregator = aggregator;
+ return this;
+ }
/**
* Whether to include session local changes. Defaults to {@code false}.
@@ -370,6 +377,7 @@ public final class FilterBuilder {
final boolean includeSessionLocal =
FilterBuilder.this.includeSessionLocal;
final boolean includeClusterExternal =
FilterBuilder.this.includeClusterExternal;
final boolean includeClusterLocal =
FilterBuilder.this.includeClusterLocal;
+ final EventAggregator aggregator = FilterBuilder.this.aggregator;
final Iterable<String> subTrees = FilterBuilder.this.getSubTrees();
final Condition condition = FilterBuilder.this.condition;
@@ -404,6 +412,11 @@ public final class FilterBuilder {
private boolean isExternal(CommitInfo info) {
return info == null;
}
+
+ @Override
+ public EventAggregator getEventAggregator() {
+ return aggregator;
+ }
};
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java?rev=1767322&r1=1767321&r2=1767322&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
Mon Oct 31 16:09:39 2016
@@ -64,4 +64,11 @@ public interface FilterProvider {
Iterable<String> getSubTrees();
FilterConfigMBean getConfigMBean();
+
+ /**
+ * Allows providers to supply an EventAggregator that
+ * is used to adjust (aggregate) the event identifier before event
+ * creation (ie after event filtering).
+ */
+ EventAggregator getEventAggregator();
}
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1767322&r1=1767321&r2=1767322&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
Mon Oct 31 16:09:39 2016
@@ -344,7 +344,8 @@ class ChangeProcessor implements Observe
if (provider.includeCommit(contentSession.toString(), info)) {
EventFilter filter = provider.getFilter(previousRoot,
root);
EventIterator events = new EventQueue(namePathMapper,
info, previousRoot, root,
- provider.getSubTrees(), Filters.all(filter,
VISIBLE_FILTER));
+ provider.getSubTrees(), Filters.all(filter,
VISIBLE_FILTER),
+ provider.getEventAggregator());
long time = System.nanoTime();
boolean hasEvents = events.hasNext();
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java?rev=1767322&r1=1767321&r2=1767322&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
Mon Oct 31 16:09:39 2016
@@ -32,6 +32,7 @@ import org.apache.jackrabbit.oak.namepat
import org.apache.jackrabbit.oak.plugins.observation.EventGenerator;
import org.apache.jackrabbit.oak.plugins.observation.EventHandler;
import org.apache.jackrabbit.oak.plugins.observation.FilteredHandler;
+import org.apache.jackrabbit.oak.plugins.observation.filter.EventAggregator;
import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -50,11 +51,12 @@ class EventQueue implements EventIterato
public EventQueue(
@Nonnull NamePathMapper mapper, CommitInfo info,
@Nonnull NodeState before, @Nonnull NodeState after,
- @Nonnull Iterable<String> basePaths, @Nonnull EventFilter filter) {
+ @Nonnull Iterable<String> basePaths, @Nonnull EventFilter filter,
+ @Nonnull EventAggregator aggregator) {
this.generator = new EventGenerator();
EventFactory factory = new EventFactory(mapper, info);
EventHandler handler = new FilteredHandler(
- filter, new QueueingHandler(this, factory, before, after));
+ filter, new QueueingHandler(this, factory, aggregator, before,
after));
for (String path : basePaths) {
addHandler(before, after, path, handler, generator);
}
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/QueueingHandler.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/QueueingHandler.java?rev=1767322&r1=1767321&r2=1767322&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/QueueingHandler.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/QueueingHandler.java
Mon Oct 31 16:09:39 2016
@@ -24,11 +24,17 @@ import static org.apache.jackrabbit.JcrC
import static org.apache.jackrabbit.oak.api.Type.NAME;
import static org.apache.jackrabbit.oak.api.Type.NAMES;
+import java.util.LinkedList;
+import java.util.List;
+
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.namepath.PathTracker;
import org.apache.jackrabbit.oak.plugins.identifier.IdentifierTracker;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryChildNodeEntry;
import org.apache.jackrabbit.oak.plugins.observation.DefaultEventHandler;
import org.apache.jackrabbit.oak.plugins.observation.EventHandler;
+import org.apache.jackrabbit.oak.plugins.observation.filter.EventAggregator;
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.NodeState;
/**
@@ -37,6 +43,26 @@ import org.apache.jackrabbit.oak.spi.sta
* JCR events that are then placed in the given {@link EventQueue}.
*/
class QueueingHandler extends DefaultEventHandler {
+
+ class AggregationResult {
+
+ private final String name;
+ private final IdentifierTracker identifierTracker;
+ private final String primaryType;
+ private final Iterable<String> mixinTypes;
+ private final PathTracker pathTracker;
+
+ AggregationResult(String name, IdentifierTracker identifierTracker,
+ final String primaryType, final Iterable<String> mixinTypes,
PathTracker pathTracker) {
+ this.name = name;
+ this.identifierTracker = identifierTracker;
+ this.primaryType = primaryType;
+ this.mixinTypes = mixinTypes;
+ this.pathTracker = pathTracker;
+ }
+ }
+
+ private final QueueingHandler parent;
private final EventQueue queue;
@@ -54,41 +80,62 @@ class QueueingHandler extends DefaultEve
private final IdentifierTracker identifierTracker;
+ private final EventAggregator aggregator;
+
+ private final String name;
+
+ private final NodeState root;
+
+ private final List<ChildNodeEntry> parents;
+
QueueingHandler(
EventQueue queue, EventFactory factory,
- NodeState before, NodeState after) {
+ EventAggregator aggregator, NodeState before, NodeState after) {
+ this.parent = null;
this.queue = queue;
this.factory = factory;
+ this.name = null;
+ this.aggregator = aggregator;
this.pathTracker = new PathTracker();
this.beforeIdentifierTracker = new IdentifierTracker(before);
+ this.parents = new LinkedList<ChildNodeEntry>();
if (after.exists()) {
this.identifierTracker = new IdentifierTracker(after);
this.parentType = getPrimaryType(after);
this.parentMixins = getMixinTypes(after);
+ this.root = after;
} else {
this.identifierTracker = beforeIdentifierTracker;
this.parentType = getPrimaryType(before);
this.parentMixins = getMixinTypes(before);
+ this.root = before;
}
}
private QueueingHandler(
QueueingHandler parent,
String name, NodeState before, NodeState after) {
+ this.parent = parent;
this.queue = parent.queue;
this.factory = parent.factory;
+ this.root = parent.root;
+ this.name = name;
+ this.aggregator = parent.aggregator;
this.pathTracker = parent.pathTracker.getChildTracker(name);
this.beforeIdentifierTracker =
parent.beforeIdentifierTracker.getChildTracker(name, before);
+ this.parents = new LinkedList<ChildNodeEntry>(parent.parents);
if (after.exists()) {
this.identifierTracker =
parent.identifierTracker.getChildTracker(name, after);
this.parentType = getPrimaryType(after);
this.parentMixins = getMixinTypes(after);
+ this.parents.add(new MemoryChildNodeEntry(name, after));
} else {
this.identifierTracker = beforeIdentifierTracker;
this.parentType = getPrimaryType(before);
this.parentMixins = getMixinTypes(before);
+ this.parents.add(new MemoryChildNodeEntry(name, before));
}
}
@@ -100,50 +147,103 @@ class QueueingHandler extends DefaultEve
return new QueueingHandler(this, name, before, after);
}
+ private AggregationResult aggregate(PropertyState after) {
+ int aggregationLevel = 0;
+ if (aggregator != null) {
+ aggregationLevel = aggregator.aggregate(root, parents, after);
+ }
+ if (aggregationLevel <= 0) {
+ // no aggregation
+ return new AggregationResult(after.getName(),
this.identifierTracker, parentType, parentMixins, pathTracker);
+ } else {
+ QueueingHandler handler = this;
+ String name = after.getName();
+ for(int i=0; i<aggregationLevel; i++) {
+ name = handler.name + "/" + name;
+ handler = handler.parent;
+ }
+ return new AggregationResult(name, handler.identifierTracker,
handler.parentType, handler.parentMixins, handler.pathTracker);
+ }
+ }
+
@Override
public void propertyAdded(PropertyState after) {
+ AggregationResult aggregated = aggregate(after);
queue.addEvent(factory.propertyAdded(
after,
- parentType, parentMixins,
- pathTracker.getPath(), after.getName(),
- identifierTracker.getIdentifier()));
+ aggregated.primaryType, aggregated.mixinTypes,
+ aggregated.pathTracker.getPath(), aggregated.name,
+ aggregated.identifierTracker.getIdentifier()));
}
@Override
public void propertyChanged(PropertyState before, PropertyState after) {
+ AggregationResult aggregated = aggregate(after);
queue.addEvent(factory.propertyChanged(
before, after,
- parentType, parentMixins,
- pathTracker.getPath(), after.getName(),
- identifierTracker.getIdentifier()));
+ aggregated.primaryType, aggregated.mixinTypes,
+ aggregated.pathTracker.getPath(), aggregated.name,
+ aggregated.identifierTracker.getIdentifier()));
}
@Override
public void propertyDeleted(PropertyState before) {
+ AggregationResult aggregated = aggregate(before);
queue.addEvent(factory.propertyDeleted(
before,
- parentType, parentMixins,
- pathTracker.getPath(), before.getName(),
- identifierTracker.getIdentifier()));
+ aggregated.primaryType, aggregated.mixinTypes,
+ aggregated.pathTracker.getPath(), aggregated.name,
+ aggregated.identifierTracker.getIdentifier()));
+ }
+
+ private AggregationResult aggregate(String name, NodeState node,
IdentifierTracker childTracker) {
+ int aggregationLevel = 0;
+ if (aggregator != null) {
+ aggregationLevel = aggregator.aggregate(root, parents, new
MemoryChildNodeEntry(name, node));
+ }
+ if (aggregationLevel <= 0) {
+ // no aggregation
+ return new AggregationResult(name, childTracker,
getPrimaryType(node), getMixinTypes(node), pathTracker);
+ } else {
+ QueueingHandler handler = this;
+ IdentifierTracker tracker = childTracker;
+ String primaryType = null;
+ Iterable<String> mixinTypes = null;
+ PathTracker pathTracker = null;
+ String childName = null;
+ for(int i=0; i<aggregationLevel; i++) {
+ if (i > 0) {
+ name = childName + "/" + name;
+ }
+ tracker = handler.identifierTracker;
+ primaryType = handler.parentType;
+ mixinTypes = handler.parentMixins;
+ pathTracker = handler.pathTracker;
+ childName = handler.name;
+ handler = handler.parent;
+ }
+ return new AggregationResult(name, tracker, primaryType,
mixinTypes, pathTracker);
+ }
}
@Override
public void nodeAdded(String name, NodeState after) {
IdentifierTracker tracker =
identifierTracker.getChildTracker(name, after);
+ AggregationResult aggregated = aggregate(name, after, tracker);
queue.addEvent(factory.nodeAdded(
- getPrimaryType(after), getMixinTypes(after),
- pathTracker.getPath(), name, tracker.getIdentifier()));
+ aggregated.primaryType, aggregated.mixinTypes,
+ aggregated.pathTracker.getPath(), aggregated.name,
aggregated.identifierTracker.getIdentifier()));
}
@Override
public void nodeDeleted(String name, NodeState before) {
IdentifierTracker tracker =
beforeIdentifierTracker.getChildTracker(name, before);
-
+ AggregationResult aggregated = aggregate(name, before, tracker);
queue.addEvent(factory.nodeDeleted(
- getPrimaryType(before), getMixinTypes(before),
- pathTracker.getPath(), name, tracker.getIdentifier()));
+ aggregated.primaryType, aggregated.mixinTypes,
+ aggregated.pathTracker.getPath(), aggregated.name,
aggregated.identifierTracker.getIdentifier()));
}
@Override
@@ -151,9 +251,10 @@ class QueueingHandler extends DefaultEve
final String sourcePath, String name, NodeState moved) {
IdentifierTracker tracker =
identifierTracker.getChildTracker(name, moved);
+ AggregationResult aggregated = aggregate(name, moved, tracker);
queue.addEvent(factory.nodeMoved(
- getPrimaryType(moved), getMixinTypes(moved),
- pathTracker.getPath(), name, tracker.getIdentifier(),
+ aggregated.primaryType, aggregated.mixinTypes,
+ aggregated.pathTracker.getPath(), aggregated.name,
aggregated.identifierTracker.getIdentifier(),
sourcePath));
}
@@ -162,9 +263,10 @@ class QueueingHandler extends DefaultEve
final String destName, final String name, NodeState reordered) {
IdentifierTracker tracker =
identifierTracker.getChildTracker(name, reordered);
+ AggregationResult aggregated = aggregate(name, reordered, tracker);
queue.addEvent(factory.nodeReordered(
- getPrimaryType(reordered), getMixinTypes(reordered),
- pathTracker.getPath(), name, tracker.getIdentifier(),
+ aggregated.primaryType, aggregated.mixinTypes,
+ aggregated.pathTracker.getPath(), aggregated.name,
aggregated.identifierTracker.getIdentifier(),
destName));
}