Author: stefanegli
Date: Mon Oct 31 17:24:02 2016
New Revision: 1767337

URL: http://svn.apache.org/viewvc?rev=1767337&view=rev
Log:
OAK-5021 : introducing support for file aggregation, or more generally for node 
type aggregation to OakEventFilter: using the new withNodeTypeAggregate allows 
to register a number of node types combined with a number of relative subpaths 
containing globs, which will create an 'aggregate node type filter' that 
listens for changes at a particular node type and reports them 'aggregated', ie 
reports them relative to the node where it happened

Modified:
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/filter/OakEventFilter.java
    
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java?rev=1767337&r1=1767336&r2=1767337&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
 Mon Oct 31 17:24:02 2016
@@ -19,23 +19,33 @@
 package org.apache.jackrabbit.oak.jcr.observation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+import static javax.jcr.observation.Event.NODE_REMOVED;
 
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
-
-import static javax.jcr.observation.Event.NODE_REMOVED;
+import java.util.regex.Pattern;
 
 import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.api.observation.JackrabbitEventFilter;
+import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.jcr.observation.filter.OakEventFilter;
+import org.apache.jackrabbit.oak.plugins.nodetype.TypePredicate;
+import org.apache.jackrabbit.oak.plugins.observation.filter.ConstantFilter;
+import org.apache.jackrabbit.oak.plugins.observation.filter.EventAggregator;
+import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder;
-import 
org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory;
 import 
org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder.Condition;
+import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
+import org.apache.jackrabbit.oak.plugins.observation.filter.GlobbingPathFilter;
+import 
org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory;
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
 
 /**
  * Implements OakEventFilter which is an extension to the JackrabbitEventFilter
@@ -43,6 +53,175 @@ import org.apache.jackrabbit.oak.plugins
  */
 public class OakEventFilterImpl extends OakEventFilter {
 
+    static class NodeTypeAggregationFilter implements EventFilter, Condition {
+
+        private final String[] nodeTypes;
+        private final String[] relativeGlobPaths;
+        private final boolean includeThis;
+        private TypePredicate predicate;
+
+        NodeTypeAggregationFilter(String[] nodeTypes, String[] 
relativeGlobPaths) {
+            this.nodeTypes = nodeTypes;
+            this.relativeGlobPaths = relativeGlobPaths;
+            boolean includeThis = false;
+            for (String aRelativeGlobPath : relativeGlobPaths) {
+                if (aRelativeGlobPath.equals("") || 
aRelativeGlobPath.equals(".") || aRelativeGlobPath.equals("*")
+                        || aRelativeGlobPath.equals("**")) {
+                    includeThis = true;
+                }
+            }
+            this.includeThis = includeThis;
+        }
+
+        @Override
+        public boolean includeAdd(PropertyState after) {
+            // the AggregateFilter 'waits' for the first hit based on nodeTypes
+            // at which point it switches to a GlobbingPathFilter - so property
+            // changes will be handled in the GlobbingPathFilter, never here.
+            return false;
+        }
+
+        @Override
+        public boolean includeChange(PropertyState before, PropertyState 
after) {
+            // the AggregateFilter 'waits' for the first hit based on nodeTypes
+            // at which point it switches to a GlobbingPathFilter - so property
+            // changes will be handled in the GlobbingPathFilter, never here.
+            return false;
+        }
+
+        @Override
+        public boolean includeDelete(PropertyState before) {
+            // the AggregateFilter 'waits' for the first hit based on nodeTypes
+            // at which point it switches to a GlobbingPathFilter - so property
+            // changes will be handled in the GlobbingPathFilter, never here.
+            return false;
+        }
+
+        @Override
+        public boolean includeAdd(String name, NodeState after) {
+            return includeThis && predicate.apply(after);
+        }
+
+        @Override
+        public boolean includeDelete(String name, NodeState before) {
+            return includeThis && predicate.apply(before);
+        }
+
+        @Override
+        public boolean includeMove(String sourcePath, String name, NodeState 
moved) {
+            return includeThis && predicate.apply(moved);
+        }
+
+        @Override
+        public boolean includeReorder(String destName, String name, NodeState 
reordered) {
+            return includeThis && predicate.apply(reordered);
+        }
+
+        @Override
+        public EventFilter create(String name, NodeState before, NodeState 
after) {
+            boolean predicateMatches = false;
+            if (after.exists()) {
+                predicateMatches = predicate.apply(after);
+            } else {
+                predicateMatches = predicate.apply(before);
+            }
+            if (predicateMatches) {
+                // greedy match - we switch to the globbing path filters
+                List<EventFilter> filters = newArrayList();
+                for (String relativeGlobPath : relativeGlobPaths) {
+                    if (relativeGlobPath.endsWith("*")) {
+                        filters.add(new GlobbingPathFilter(relativeGlobPath));
+                    } else {
+                        filters.add(new GlobbingPathFilter(relativeGlobPath + 
"/*"));
+                    }
+                }
+                return filters.isEmpty() ? ConstantFilter.EXCLUDE_ALL : 
Filters.any(filters);
+            } else {
+                // non-match - we stay with this filter
+                return this;
+            }
+        }
+
+        @Override
+        public EventFilter createFilter(NodeState before, NodeState after) {
+            if (after.exists()) {
+                predicate = new TypePredicate(after, nodeTypes);
+            } else {
+                predicate = new TypePredicate(before, nodeTypes);
+            }
+            return this;
+        }
+
+    }
+
+    private static final class NodeTypeAggregator implements EventAggregator {
+        private final String[] nodeTypes;
+        private final Pattern[] relativePathPatterns;
+
+        private NodeTypeAggregator(String[] nodeTypes, Pattern[] 
relativePathPatterns) {
+            this.nodeTypes = nodeTypes;
+            this.relativePathPatterns = relativePathPatterns;
+        }
+
+        @Override
+        public int aggregate(NodeState root, List<ChildNodeEntry> parents, 
ChildNodeEntry childNodeState) {
+            final TypePredicate nodeTypePredicate = new TypePredicate(root, 
nodeTypes);
+            final int depth = parents.size();
+            for (int i = 0; i < depth; i++) {
+                ChildNodeEntry child = parents.get(i);
+                NodeState nodeState = child.getNodeState();
+                if (!nodeTypePredicate.apply(nodeState)) {
+                    continue;
+                }
+                if (i + 1 <= depth) {
+                    String childPath = asPath(parents.subList(i + 1, depth));
+                    for (Pattern pattern : relativePathPatterns) {
+                        if (pattern.matcher(childPath).matches()) {
+                            return depth - i;
+                        }
+                    }
+                }
+            }
+            return 0;
+        }
+
+        @Override
+        public int aggregate(NodeState root, List<ChildNodeEntry> parents, 
PropertyState propertyState) {
+            final TypePredicate nodeTypePredicate = new TypePredicate(root, 
nodeTypes);
+            final int depth = parents.size();
+            for (int i = 0; i < depth; i++) {
+                ChildNodeEntry child = parents.get(i);
+                NodeState nodeState = child.getNodeState();
+                if (!nodeTypePredicate.apply(nodeState)) {
+                    continue;
+                }
+                if (i + 1 <= depth) {
+                    String childPath = asPath(parents.subList(i + 1, depth));
+                    for (Pattern pattern : relativePathPatterns) {
+                        if (pattern.matcher(childPath).matches()) {
+                            return depth - (i + 1);
+                        }
+                    }
+                }
+            }
+            return 0;
+        }
+
+        private String asPath(List<ChildNodeEntry> children) {
+            if (children.isEmpty()) {
+                return "";
+            }
+            StringBuilder sb = new StringBuilder();
+            for (ChildNodeEntry child : children) {
+                if (sb.length() != 0) {
+                    sb.append("/");
+                }
+                sb.append(child.getName());
+            }
+            return sb.toString();
+        }
+    }
+
     private final JackrabbitEventFilter delegate;
     
     /** whether or not applyNodeTypeOnSelf feature is enabled */
@@ -56,6 +235,12 @@ public class OakEventFilterImpl extends
 
     private String[] globPaths;
 
+    private FilterBuilder builder;
+
+    private Condition all;
+
+    private EventAggregator aggregator;
+
     public OakEventFilterImpl(@Nonnull JackrabbitEventFilter delegate) {
         checkNotNull(delegate);
         this.delegate = delegate;
@@ -280,4 +465,66 @@ public class OakEventFilterImpl extends
         return globPaths;
     }
 
+    public FilterBuilder builder() {
+        if (builder == null) {
+            builder = new FilterBuilder();
+        }
+        return builder;
+    }
+
+    public OakEventFilterImpl and(Condition... condition) {
+        checkNotNull(condition);
+        if (all == null) {
+            all = builder().all(condition);
+        } else {
+            all = builder().all(all, builder.all(condition));
+        }
+        return this;
+    }
+
+    public OakEventFilterImpl aggregator(EventAggregator aggregator) {
+        checkNotNull(aggregator);
+        this.aggregator = aggregator;
+        return this;
+    }
+
+    public Condition getAnds() {
+        return all;
+    }
+
+    public EventAggregator getAggregator() {
+        return aggregator;
+    }
+
+    @Override
+    public OakEventFilter withNodeTypeAggregate(String[] nodeTypes, String[] 
relativeGlobPaths) {
+        final Pattern[] relativePathPatterns = new 
Pattern[relativeGlobPaths.length];
+        for (int i = 0; i < relativePathPatterns.length; i++) {
+            relativePathPatterns[i] = 
Pattern.compile(globAsRegex(relativeGlobPaths[i]));
+        }
+        aggregator(new NodeTypeAggregator(nodeTypes, relativePathPatterns));
+
+        and(new NodeTypeAggregationFilter(nodeTypes, relativeGlobPaths));
+
+        return this;
+    }
+
+    private static String globAsRegex(String patternWithGlobs) {
+        if (patternWithGlobs == null) {
+            return null;
+        }
+        String[] starStarParts = patternWithGlobs.split("\\*\\*", -1);
+        StringBuffer sb = new StringBuffer();
+        sb.append("\\Q");
+        for (int i = 0; i < starStarParts.length; i++) {
+            if (i > 0) {
+                // the '**' regexp equivalent
+                sb.append("\\E.*\\Q");
+            }
+            sb.append(starStarParts[i].replace("*", "\\E[^/]*\\Q"));
+        }
+        sb.append("\\E");
+        return sb.toString();
+    }
+
 }

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1767337&r1=1767336&r2=1767337&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
 Mon Oct 31 17:24:02 2016
@@ -276,6 +276,11 @@ public class ObservationManagerImpl impl
         Selector nodeTypeSelector = Selectors.PARENT;
         boolean deleteSubtree = true;
         if (oakEventFilter != null) {
+            Condition ands = oakEventFilter.getAnds();
+            if (ands != null) {
+                excludeConditions.add(ands);
+            }
+            filterBuilder.aggregator(oakEventFilter.getAggregator());
             if (oakEventFilter.getApplyNodeTypeOnSelf()) {
                 nodeTypeSelector = Selectors.THIS;
             }

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/filter/OakEventFilter.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/filter/OakEventFilter.java?rev=1767337&r1=1767336&r2=1767337&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/filter/OakEventFilter.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/filter/OakEventFilter.java
 Mon Oct 31 17:24:02 2016
@@ -94,4 +94,23 @@ public abstract class OakEventFilter ext
      */
     public abstract OakEventFilter withIncludeGlobPaths(String... globPaths);
 
+    /**
+     * Greedy aggregating filter which upon first (hence greedy) hit of 
provided
+     * nodeTypes checks if the child subtree leading to the actual change
+     * matches any of the provided relativeGlobPaths.
+     * <p>
+     * Note that unlike 'normal' include and exclude paths, this variant
+     * doesn't apply Oak's NamePathMapper.
+     * <p>
+     * This filter property is added in 'and' mode.
+     * 
+     * @param nodeTypes
+     *            note that these nodeTypes are not mapped to oak nor validated
+     * @param relativeGlobPaths
+     *            glob paths that are added to the set of include paths.
+     *            Note that Oak's NamePathMapper is not applied to these 
relativeGlobPaths.
+     * @return this filter with the filter change applied
+     */
+    public abstract OakEventFilter withNodeTypeAggregate(final String[] 
nodeTypes, final String[] relativeGlobPaths);
+
 }

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java?rev=1767337&r1=1767336&r2=1767337&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
 Mon Oct 31 17:24:02 2016
@@ -1125,6 +1125,15 @@ public class ObservationTest extends Abs
             });
         }
 
+        public Future<Event> expect(final String path, final String 
identifier, final int type) {
+            return expect(new Expectation("path = " + path + ", identifier = " 
+ identifier + ", type = " + type) {
+                @Override
+                public boolean onEvent(Event event) throws RepositoryException 
{
+                    return type == event.getType() && equal(path, 
event.getPath()) && equal(identifier, event.getIdentifier());
+                }
+            });
+        }
+
         public Node expectAdd(Node node) throws RepositoryException {
             expect(node.getPath(), NODE_ADDED);
             expect(node.getPath() + "/jcr:primaryType", PROPERTY_ADDED);
@@ -1235,6 +1244,8 @@ public class ObservationTest extends Abs
         }
     }
     
+    //------------------------------------------------------------< 
OakEventFilter tests >---
+
     @Test
     public void applyNodeTypeOnSelf() throws Exception {
         assumeTrue(observationManager instanceof ObservationManagerImpl);
@@ -1423,4 +1434,92 @@ public class ObservationTest extends Abs
         assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
     }
 
+    @Test
+    public void testAggregate1() throws Exception {
+        assumeTrue(observationManager instanceof ObservationManagerImpl);
+        ObservationManagerImpl oManager = (ObservationManagerImpl) 
observationManager;
+        ExpectationListener listener = new ExpectationListener();
+        JackrabbitEventFilter filter = new JackrabbitEventFilter();
+        filter.setAbsPath("/parent");
+        filter.setIsDeep(true);
+        filter.setEventTypes(ALL_EVENTS);
+        filter = FilterFactory.wrap(filter).withNodeTypeAggregate(new String[] 
{ "oak:Unstructured" },
+                new String[] { "", "jcr:content", "jcr:content/**" });
+        oManager.addEventListener(listener, filter);
+        Node parent = getAdminSession().getRootNode().addNode("parent", 
"nt:unstructured");
+        Node child = parent.addNode("child", "nt:unstructured");
+        Node file = child.addNode("file", "oak:Unstructured");
+        listener.expectAdd(file);
+        Node jcrContent = file.addNode("jcr:content", "nt:unstructured");
+        listener.expect(jcrContent.getPath(), "/parent/child/file", 
NODE_ADDED);
+        listener.expect(jcrContent.getPath() + "/jcr:primaryType", 
"/parent/child/file", PROPERTY_ADDED);
+        Property jcrDataProperty = jcrContent.setProperty("jcr:data", "foo");
+        listener.expect(jcrDataProperty.getPath(), "/parent/child/file", 
PROPERTY_ADDED);
+        parent.getSession().save();
+
+        List<Expectation> missing = listener.getMissing(TIME_OUT, 
TimeUnit.SECONDS);
+        List<Event> unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+
+        file = 
getAdminSession().getRootNode().getNode("parent").getNode("child").getNode("file");
+        jcrContent = file.getNode("jcr:content");
+        Property newProperty = jcrContent.setProperty("newProperty", "foo");
+        listener.expect(newProperty.getPath(), "/parent/child/file", 
PROPERTY_ADDED);
+        Property lastModifiedBy = jcrContent.setProperty("jcr:lastModifiedBy", 
"bar");
+        listener.expect(lastModifiedBy.getPath(), "/parent/child/file", 
PROPERTY_ADDED);
+        jcrContent.getSession().save();
+
+        Thread.sleep(2000);
+        missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS);
+        unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+    }
+
+    @Test
+    public void testAggregate2() throws Exception {
+        assumeTrue(observationManager instanceof ObservationManagerImpl);
+        ObservationManagerImpl oManager = (ObservationManagerImpl) 
observationManager;
+        ExpectationListener listener = new ExpectationListener();
+        JackrabbitEventFilter filter = new JackrabbitEventFilter();
+        filter.setAbsPath("/parent");
+        filter.setIsDeep(true);
+        filter.setEventTypes(ALL_EVENTS);
+        filter = FilterFactory.wrap(filter).withNodeTypeAggregate(new String[] 
{ "oak:Unstructured" },
+                new String[] { "", "**" });// "file", "file/jcr:content",
+                                           // "file/jcr:content/**");
+        oManager.addEventListener(listener, filter);
+        Node parent = getAdminSession().getRootNode().addNode("parent", 
"nt:unstructured");
+        Node child = parent.addNode("child", "oak:Unstructured");
+        listener.expectAdd(child);
+        Node file = child.addNode("file", "nt:unstructured");
+        listener.expectAdd(file);
+        Node jcrContent = file.addNode("jcr:content", "nt:unstructured");
+        listener.expect(jcrContent.getPath(), "/parent/child", NODE_ADDED);
+        listener.expect(jcrContent.getPath() + "/jcr:primaryType", 
"/parent/child", PROPERTY_ADDED);
+        Property jcrDataProperty = jcrContent.setProperty("jcr:data", "foo");
+        listener.expect(jcrDataProperty.getPath(), "/parent/child", 
PROPERTY_ADDED);
+        parent.getSession().save();
+
+        List<Expectation> missing = listener.getMissing(TIME_OUT, 
TimeUnit.SECONDS);
+        List<Event> unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+
+        file = 
getAdminSession().getRootNode().getNode("parent").getNode("child").getNode("file");
+        jcrContent = file.getNode("jcr:content");
+        Property newProperty = jcrContent.setProperty("newProperty", "foo");
+        listener.expect(newProperty.getPath(), "/parent/child", 
PROPERTY_ADDED);
+        Property lastModifiedBy = jcrContent.setProperty("jcr:lastModifiedBy", 
"bar");
+        listener.expect(lastModifiedBy.getPath(), "/parent/child", 
PROPERTY_ADDED);
+        jcrContent.getSession().save();
+
+        Thread.sleep(2000);
+        missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS);
+        unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+    }
+
 }


Reply via email to