Author: jukka
Date: Wed Jan 29 01:46:43 2014
New Revision: 1562341

URL: http://svn.apache.org/r1562341
Log:
OAK-1332: Large number of changes to the same node can fill observation queue

Implement rate-limiting to of ChangeListener callbacks, as mentioned in the 
issue

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java?rev=1562341&r1=1562340&r2=1562341&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
 Wed Jan 29 01:46:43 2014
@@ -18,8 +18,8 @@
  */
 package org.apache.jackrabbit.oak.plugins.observation;
 
-import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Lists.newLinkedList;
+import static com.google.common.collect.Sets.newLinkedHashSet;
 import static org.apache.jackrabbit.oak.api.Type.NAMES;
 import static org.apache.jackrabbit.oak.api.Type.STRING;
 import static org.apache.jackrabbit.oak.core.AbstractTree.OAK_CHILD_ORDER;
@@ -27,9 +27,10 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.spi.state.MoveDetector.SOURCE_PATH;
 
 import java.util.LinkedList;
-import java.util.List;
+import java.util.Set;
 
 import javax.annotation.Nonnull;
+import javax.swing.event.ChangeListener;
 
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.plugins.observation.handler.ChangeHandler;
@@ -37,192 +38,250 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
 
 /**
- * Generator of a traversable view of events.
+ * Continuation-based content diff implementation that generates
+ * {@link ChangeHandler} callbacks by recursing down a content diff
+ * in a way that guarantees that only a finite number of callbacks
+ * will be made during a {@link #generate()} method call, regardless
+ * of how large or complex the content diff is.
+ * <p>
+ * A simple usage pattern would look like this:
+ * <pre>
+ * EventGenerator generator = new EventGenerator(before, after, handler);
+ * while (generator.isDone()) {
+ *     generator.generate();
+ * }
+ * </pre>
  */
 public class EventGenerator {
 
-    private final LinkedList<Runnable> continuations = newLinkedList();
+    /**
+     * Maximum number of content changes to process during the
+     * execution of a single diff continuation.
+     */
+    private static final int MAX_CHANGES_PER_CONTINUATION = 10000;
 
     /**
-     * Create a new instance of a {@code EventGenerator} reporting events to 
the
-     * passed {@code listener} after filtering with the passed {@code filter}.
+     * Maximum number of continuations queued for future processing.
+     * Once this limit has been reached, we'll start pushing for the
+     * processing of property-only diffs, which will automatically
+     * help reduce the backlog.
+     */
+    private static final int MAX_QUEUED_CONTINUATIONS = 1000;
+
+    private static final String[] STRING_ARRAY = new String[0];
+
+    private final LinkedList<Continuation> continuations = newLinkedList();
+
+    /**
+     * Creates a new generator instance for processing the given changes.
      */
     public EventGenerator(
             @Nonnull NodeState before, @Nonnull NodeState after,
             @Nonnull ChangeHandler handler) {
-        continuations.add(new DiffContinuation(handler, before, after));
+        continuations.add(new Continuation(handler, before, after, 0));
     }
 
-    public boolean isComplete() {
+    /**
+     * Checks whether there are no more content changes to be processed.
+     */
+    public boolean isDone() {
         return continuations.isEmpty();
     }
 
+    /**
+     * Generates a finite number of {@link ChangeListener} callbacks based
+     * on the content changes that have yet to be processed. Further processing
+     * (even if no callbacks were made) may be postponed to a future
+     * {@link #generate()} call, until the {@link #isDone()} method finally
+     * return {@code true}.
+     */
     public void generate() {
         if (!continuations.isEmpty()) {
-            continuations.removeFirst().run();
+            Continuation c = continuations.removeFirst();
+            c.after.compareAgainstBaseState(c.before, c);
         }
     }
 
-    private class DiffContinuation implements NodeStateDiff, Runnable {
+    private class Continuation implements NodeStateDiff {
 
         /**
-         * The diff handler of the parent node, or {@code null} for the root.
+         * Filtered handler of detected content changes.
          */
-        private final DiffContinuation parent;
+        private final ChangeHandler handler;
 
         /**
-         * The name of this node, or the empty string for the root.
+         * Before state, possibly non-existent.
          */
-        private final String name;
+        private final NodeState before;
 
         /**
-         * Before state, or {@code MISSING_NODE} if this node was added.
+         * After state, possibly non-existent.
          */
-        private final NodeState before;
+        private final NodeState after;
 
         /**
-         * After state, or {@code MISSING_NODE} if this node was removed.
+         * Number of initial changes to skip.
          */
-        private final NodeState after;
+        private final int skip;
 
         /**
-         * Filtered handler of detected content changes.
+         * Number of changes seen so far.
          */
-        private final ChangeHandler handler;
+        private int counter = 0;
 
-        DiffContinuation(ChangeHandler handler, NodeState before, NodeState 
after) {
-            this.parent = null;
-            this.name = null;
-            this.before = before;
-            this.after = after;
+        private Continuation(
+                ChangeHandler handler, NodeState before, NodeState after,
+                int skip) {
             this.handler = handler;
-        }
-
-        private DiffContinuation(
-                DiffContinuation parent, ChangeHandler handler,
-                String name, NodeState before, NodeState after) {
-            this.parent = parent;
-            this.name = name;
             this.before = before;
             this.after = after;
-            this.handler = handler;
-        }
-
-        //------------------------------------------------------< Runnable >--
-
-        @Override
-        public void run() {
-            if (parent != null) {
-                if (before == MISSING_NODE) {
-                    // postponed handling of added nodes
-                    parent.handleAddedNode(name, after);
-                } else if (after == MISSING_NODE) {
-                    // postponed handling of removed nodes
-                    parent.handleDeletedNode(name, before);
-                }
-            }
-
-            // process changes below this node
-            after.compareAgainstBaseState(before, this);
+            this.skip = skip;
         }
 
         //-------------------------------------------------< NodeStateDiff >--
 
         @Override
         public boolean propertyAdded(PropertyState after) {
-            handler.propertyAdded(after);
-            return true;
+            if (beforeEvent()) {
+                handler.propertyAdded(after);
+                return afterEvent();
+            } else {
+                return true;
+            }
         }
 
         @Override
         public boolean propertyChanged(
                 PropertyState before, PropertyState after) {
-            // check for reordering of child nodes
-            if (OAK_CHILD_ORDER.equals(before.getName())) {
-                handleReorderedNodes(
-                        before.getValue(NAMES), after.getValue(NAMES));
+            if (beforeEvent()) {
+                // check for reordering of child nodes
+                if (OAK_CHILD_ORDER.equals(before.getName())) {
+                    Set<String> beforeSet =
+                            newLinkedHashSet(after.getValue(NAMES));
+                    Set<String> afterSet =
+                            newLinkedHashSet(before.getValue(NAMES));
+                    afterSet.retainAll(beforeSet);
+                    beforeSet.retainAll(afterSet);
+                    String[] beforeNames = beforeSet.toArray(STRING_ARRAY);
+                    String[] afterNames = afterSet.toArray(STRING_ARRAY);
+
+                    // Selection sort beforeNames into afterNames,
+                    // recording the swaps as we go
+                    for (int a = 0; a < afterNames.length; a++) {
+                        String name = afterNames[a];
+                        for (int b = a + 1; b < beforeNames.length; b++) {
+                            if (name.equals(beforeNames[b])) {
+                                beforeNames[b] = beforeNames[a];
+                                beforeNames[a] = name;
+                                handler.nodeReordered(
+                                        beforeNames[a + 1], name,
+                                        this.after.getChildNode(name));
+                            }
+                        }
+                    }
+                }
+
+                handler.propertyChanged(before, after);
+                return afterEvent();
+            } else {
+                return true;
             }
-            handler.propertyChanged(before, after);
-            return true;
         }
 
         @Override
         public boolean propertyDeleted(PropertyState before) {
-            handler.propertyDeleted(before);
-            return true;
+            if (beforeEvent()) {
+                handler.propertyDeleted(before);
+                return afterEvent();
+            } else {
+                return true;
+            }
         }
 
         @Override
         public boolean childNodeAdded(String name, NodeState after) {
-            if (!addChildEventGenerator(name, MISSING_NODE, after)) {
-                handleAddedNode(name, after); // not postponed
+            if (beforeEvent()) {
+                PropertyState sourceProperty = after.getProperty(SOURCE_PATH);
+                if (sourceProperty != null) {
+                    String sourcePath = sourceProperty.getValue(STRING);
+                    handler.nodeMoved(sourcePath, name, after);
+                }
+
+                handler.nodeAdded(name, after);
+                return addChildDiff(name, MISSING_NODE, after);
+            } else {
+                return true;
             }
-            return true;
         }
 
         @Override
         public boolean childNodeChanged(
                 String name, NodeState before, NodeState after) {
-            addChildEventGenerator(name, before, after);
-            return true;
+            if (beforeEvent()) {
+                return addChildDiff(name, before, after);
+            } else {
+                return true;
+            }
         }
 
         @Override
         public boolean childNodeDeleted(String name, NodeState before) {
-            if (!addChildEventGenerator(name, before, MISSING_NODE)) {
-                handleDeletedNode(name, before); // not postponed
+            if (beforeEvent()) {
+                handler.nodeDeleted(name, before);
+                return addChildDiff(name, before, MISSING_NODE);
+            } else {
+                return true;
             }
-            return true;
         }
 
-        //------------------------------------------------------------< 
private >---
+        //-------------------------------------------------------< private >--
 
-        private boolean addChildEventGenerator(
+        /**
+         * Schedules a continuation for processing changes within the given
+         * child node, if changes within that subtree should be processed.
+         */
+        private boolean addChildDiff(
                 String name, NodeState before, NodeState after) {
             ChangeHandler h = handler.getChildHandler(name, before, after);
             if (h != null) {
-                continuations.add(new DiffContinuation(this, h, name, before, 
after));
-                return true;
-            } else {
+                continuations.add(new Continuation(h, before, after, 0));
+            }
+
+            if (continuations.size() > MAX_QUEUED_CONTINUATIONS) {
+                // Postpone further processing of the current continuation.
+                // Even though this increases the size of the queue beyond
+                // the limit, doing so ultimately forces property-only
+                // diffs to the beginning of the queue, and thus helps
+                // automatically clean up the backlog.
+                continuations.add(new Continuation(
+                        handler, this.before, this.after, counter));
                 return false;
+            } else {
+                return afterEvent();
             }
         }
 
-        private void handleAddedNode(String name, NodeState after) {
-            PropertyState sourceProperty = after.getProperty(SOURCE_PATH);
-            if (sourceProperty != null) {
-                String sourcePath = sourceProperty.getValue(STRING);
-                handler.nodeMoved(sourcePath, name, after);
-            }
-
-            handler.nodeAdded(name, after);
-        }
-
-        protected void handleDeletedNode(String name, NodeState before) {
-            handler.nodeDeleted(name, before);
-        }
-
-        private void handleReorderedNodes(
-                Iterable<String> before, Iterable<String> after) {
-            List<String> afterNames = newArrayList(after);
-            List<String> beforeNames = newArrayList(before);
-
-            afterNames.retainAll(beforeNames);
-            beforeNames.retainAll(afterNames);
-
-            // Selection sort beforeNames into afterNames recording the swaps 
as we go
-            for (int a = 0; a < afterNames.size(); a++) {
-                String afterName = afterNames.get(a);
-                for (int b = a; b < beforeNames.size(); b++) {
-                    String beforeName = beforeNames.get(b);
-                    if (a != b && beforeName.equals(afterName)) {
-                        beforeNames.set(b, beforeNames.get(a));
-                        beforeNames.set(a, beforeName);
-                        String destName = beforeNames.get(a + 1);
-                        NodeState afterChild = 
this.after.getChildNode(afterName);
-                        handler.nodeReordered(destName, afterName, afterChild);
-                    }
-                }
+        /**
+         * Increases the event counter and checks whether the event should
+         * be processed, i.e. whether the initial skip count has been reached.
+         */
+        private boolean beforeEvent() {
+            return ++counter > skip;
+        }
+
+        /**
+         * Checks whether enough events have already been processed in this
+         * continuation. If that is the case, we postpone further processing
+         * to a new continuation that will first skip all the initial events
+         * we've already seen. Otherwise we let the current diff continue.
+         */
+        private boolean afterEvent() {
+            if (counter >= skip + MAX_CHANGES_PER_CONTINUATION) {
+                continuations.add(
+                        new Continuation(handler, before, after, counter));
+                return false;
+            } else {
+                return true;
             }
         }
 

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java?rev=1562341&r1=1562340&r2=1562341&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
 Wed Jan 29 01:46:43 2014
@@ -71,7 +71,7 @@ class EventQueue implements EventIterato
 
     @Override
     public long getSize() {
-        if (generator.isComplete()) {
+        if (generator.isDone()) {
             // no more new events will be generated, so count just those
             // that have already been iterated and those left in the queue
             return position + queue.size();
@@ -90,7 +90,7 @@ class EventQueue implements EventIterato
     @Override
     public boolean hasNext() {
         while (queue.isEmpty()) {
-            if (generator.isComplete()) {
+            if (generator.isDone()) {
                 return false;
             } else {
                 generator.generate();
@@ -109,7 +109,7 @@ class EventQueue implements EventIterato
             queue.clear();
 
             // generate more events if possible, otherwise fail
-            if (!generator.isComplete()) {
+            if (!generator.isDone()) {
                 generator.generate();
             } else {
                 throw new NoSuchElementException("Not enough events to skip");


Reply via email to