Author: jukka
Date: Wed Jan 29 01:46:43 2014
New Revision: 1562341
URL: http://svn.apache.org/r1562341
Log:
OAK-1332: Large number of changes to the same node can fill observation queue
Implement rate-limiting to of ChangeListener callbacks, as mentioned in the
issue
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java?rev=1562341&r1=1562340&r2=1562341&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
Wed Jan 29 01:46:43 2014
@@ -18,8 +18,8 @@
*/
package org.apache.jackrabbit.oak.plugins.observation;
-import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.newLinkedList;
+import static com.google.common.collect.Sets.newLinkedHashSet;
import static org.apache.jackrabbit.oak.api.Type.NAMES;
import static org.apache.jackrabbit.oak.api.Type.STRING;
import static org.apache.jackrabbit.oak.core.AbstractTree.OAK_CHILD_ORDER;
@@ -27,9 +27,10 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.spi.state.MoveDetector.SOURCE_PATH;
import java.util.LinkedList;
-import java.util.List;
+import java.util.Set;
import javax.annotation.Nonnull;
+import javax.swing.event.ChangeListener;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.plugins.observation.handler.ChangeHandler;
@@ -37,192 +38,250 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
/**
- * Generator of a traversable view of events.
+ * Continuation-based content diff implementation that generates
+ * {@link ChangeHandler} callbacks by recursing down a content diff
+ * in a way that guarantees that only a finite number of callbacks
+ * will be made during a {@link #generate()} method call, regardless
+ * of how large or complex the content diff is.
+ * <p>
+ * A simple usage pattern would look like this:
+ * <pre>
+ * EventGenerator generator = new EventGenerator(before, after, handler);
+ * while (generator.isDone()) {
+ * generator.generate();
+ * }
+ * </pre>
*/
public class EventGenerator {
- private final LinkedList<Runnable> continuations = newLinkedList();
+ /**
+ * Maximum number of content changes to process during the
+ * execution of a single diff continuation.
+ */
+ private static final int MAX_CHANGES_PER_CONTINUATION = 10000;
/**
- * Create a new instance of a {@code EventGenerator} reporting events to
the
- * passed {@code listener} after filtering with the passed {@code filter}.
+ * Maximum number of continuations queued for future processing.
+ * Once this limit has been reached, we'll start pushing for the
+ * processing of property-only diffs, which will automatically
+ * help reduce the backlog.
+ */
+ private static final int MAX_QUEUED_CONTINUATIONS = 1000;
+
+ private static final String[] STRING_ARRAY = new String[0];
+
+ private final LinkedList<Continuation> continuations = newLinkedList();
+
+ /**
+ * Creates a new generator instance for processing the given changes.
*/
public EventGenerator(
@Nonnull NodeState before, @Nonnull NodeState after,
@Nonnull ChangeHandler handler) {
- continuations.add(new DiffContinuation(handler, before, after));
+ continuations.add(new Continuation(handler, before, after, 0));
}
- public boolean isComplete() {
+ /**
+ * Checks whether there are no more content changes to be processed.
+ */
+ public boolean isDone() {
return continuations.isEmpty();
}
+ /**
+ * Generates a finite number of {@link ChangeListener} callbacks based
+ * on the content changes that have yet to be processed. Further processing
+ * (even if no callbacks were made) may be postponed to a future
+ * {@link #generate()} call, until the {@link #isDone()} method finally
+ * return {@code true}.
+ */
public void generate() {
if (!continuations.isEmpty()) {
- continuations.removeFirst().run();
+ Continuation c = continuations.removeFirst();
+ c.after.compareAgainstBaseState(c.before, c);
}
}
- private class DiffContinuation implements NodeStateDiff, Runnable {
+ private class Continuation implements NodeStateDiff {
/**
- * The diff handler of the parent node, or {@code null} for the root.
+ * Filtered handler of detected content changes.
*/
- private final DiffContinuation parent;
+ private final ChangeHandler handler;
/**
- * The name of this node, or the empty string for the root.
+ * Before state, possibly non-existent.
*/
- private final String name;
+ private final NodeState before;
/**
- * Before state, or {@code MISSING_NODE} if this node was added.
+ * After state, possibly non-existent.
*/
- private final NodeState before;
+ private final NodeState after;
/**
- * After state, or {@code MISSING_NODE} if this node was removed.
+ * Number of initial changes to skip.
*/
- private final NodeState after;
+ private final int skip;
/**
- * Filtered handler of detected content changes.
+ * Number of changes seen so far.
*/
- private final ChangeHandler handler;
+ private int counter = 0;
- DiffContinuation(ChangeHandler handler, NodeState before, NodeState
after) {
- this.parent = null;
- this.name = null;
- this.before = before;
- this.after = after;
+ private Continuation(
+ ChangeHandler handler, NodeState before, NodeState after,
+ int skip) {
this.handler = handler;
- }
-
- private DiffContinuation(
- DiffContinuation parent, ChangeHandler handler,
- String name, NodeState before, NodeState after) {
- this.parent = parent;
- this.name = name;
this.before = before;
this.after = after;
- this.handler = handler;
- }
-
- //------------------------------------------------------< Runnable >--
-
- @Override
- public void run() {
- if (parent != null) {
- if (before == MISSING_NODE) {
- // postponed handling of added nodes
- parent.handleAddedNode(name, after);
- } else if (after == MISSING_NODE) {
- // postponed handling of removed nodes
- parent.handleDeletedNode(name, before);
- }
- }
-
- // process changes below this node
- after.compareAgainstBaseState(before, this);
+ this.skip = skip;
}
//-------------------------------------------------< NodeStateDiff >--
@Override
public boolean propertyAdded(PropertyState after) {
- handler.propertyAdded(after);
- return true;
+ if (beforeEvent()) {
+ handler.propertyAdded(after);
+ return afterEvent();
+ } else {
+ return true;
+ }
}
@Override
public boolean propertyChanged(
PropertyState before, PropertyState after) {
- // check for reordering of child nodes
- if (OAK_CHILD_ORDER.equals(before.getName())) {
- handleReorderedNodes(
- before.getValue(NAMES), after.getValue(NAMES));
+ if (beforeEvent()) {
+ // check for reordering of child nodes
+ if (OAK_CHILD_ORDER.equals(before.getName())) {
+ Set<String> beforeSet =
+ newLinkedHashSet(after.getValue(NAMES));
+ Set<String> afterSet =
+ newLinkedHashSet(before.getValue(NAMES));
+ afterSet.retainAll(beforeSet);
+ beforeSet.retainAll(afterSet);
+ String[] beforeNames = beforeSet.toArray(STRING_ARRAY);
+ String[] afterNames = afterSet.toArray(STRING_ARRAY);
+
+ // Selection sort beforeNames into afterNames,
+ // recording the swaps as we go
+ for (int a = 0; a < afterNames.length; a++) {
+ String name = afterNames[a];
+ for (int b = a + 1; b < beforeNames.length; b++) {
+ if (name.equals(beforeNames[b])) {
+ beforeNames[b] = beforeNames[a];
+ beforeNames[a] = name;
+ handler.nodeReordered(
+ beforeNames[a + 1], name,
+ this.after.getChildNode(name));
+ }
+ }
+ }
+ }
+
+ handler.propertyChanged(before, after);
+ return afterEvent();
+ } else {
+ return true;
}
- handler.propertyChanged(before, after);
- return true;
}
@Override
public boolean propertyDeleted(PropertyState before) {
- handler.propertyDeleted(before);
- return true;
+ if (beforeEvent()) {
+ handler.propertyDeleted(before);
+ return afterEvent();
+ } else {
+ return true;
+ }
}
@Override
public boolean childNodeAdded(String name, NodeState after) {
- if (!addChildEventGenerator(name, MISSING_NODE, after)) {
- handleAddedNode(name, after); // not postponed
+ if (beforeEvent()) {
+ PropertyState sourceProperty = after.getProperty(SOURCE_PATH);
+ if (sourceProperty != null) {
+ String sourcePath = sourceProperty.getValue(STRING);
+ handler.nodeMoved(sourcePath, name, after);
+ }
+
+ handler.nodeAdded(name, after);
+ return addChildDiff(name, MISSING_NODE, after);
+ } else {
+ return true;
}
- return true;
}
@Override
public boolean childNodeChanged(
String name, NodeState before, NodeState after) {
- addChildEventGenerator(name, before, after);
- return true;
+ if (beforeEvent()) {
+ return addChildDiff(name, before, after);
+ } else {
+ return true;
+ }
}
@Override
public boolean childNodeDeleted(String name, NodeState before) {
- if (!addChildEventGenerator(name, before, MISSING_NODE)) {
- handleDeletedNode(name, before); // not postponed
+ if (beforeEvent()) {
+ handler.nodeDeleted(name, before);
+ return addChildDiff(name, before, MISSING_NODE);
+ } else {
+ return true;
}
- return true;
}
- //------------------------------------------------------------<
private >---
+ //-------------------------------------------------------< private >--
- private boolean addChildEventGenerator(
+ /**
+ * Schedules a continuation for processing changes within the given
+ * child node, if changes within that subtree should be processed.
+ */
+ private boolean addChildDiff(
String name, NodeState before, NodeState after) {
ChangeHandler h = handler.getChildHandler(name, before, after);
if (h != null) {
- continuations.add(new DiffContinuation(this, h, name, before,
after));
- return true;
- } else {
+ continuations.add(new Continuation(h, before, after, 0));
+ }
+
+ if (continuations.size() > MAX_QUEUED_CONTINUATIONS) {
+ // Postpone further processing of the current continuation.
+ // Even though this increases the size of the queue beyond
+ // the limit, doing so ultimately forces property-only
+ // diffs to the beginning of the queue, and thus helps
+ // automatically clean up the backlog.
+ continuations.add(new Continuation(
+ handler, this.before, this.after, counter));
return false;
+ } else {
+ return afterEvent();
}
}
- private void handleAddedNode(String name, NodeState after) {
- PropertyState sourceProperty = after.getProperty(SOURCE_PATH);
- if (sourceProperty != null) {
- String sourcePath = sourceProperty.getValue(STRING);
- handler.nodeMoved(sourcePath, name, after);
- }
-
- handler.nodeAdded(name, after);
- }
-
- protected void handleDeletedNode(String name, NodeState before) {
- handler.nodeDeleted(name, before);
- }
-
- private void handleReorderedNodes(
- Iterable<String> before, Iterable<String> after) {
- List<String> afterNames = newArrayList(after);
- List<String> beforeNames = newArrayList(before);
-
- afterNames.retainAll(beforeNames);
- beforeNames.retainAll(afterNames);
-
- // Selection sort beforeNames into afterNames recording the swaps
as we go
- for (int a = 0; a < afterNames.size(); a++) {
- String afterName = afterNames.get(a);
- for (int b = a; b < beforeNames.size(); b++) {
- String beforeName = beforeNames.get(b);
- if (a != b && beforeName.equals(afterName)) {
- beforeNames.set(b, beforeNames.get(a));
- beforeNames.set(a, beforeName);
- String destName = beforeNames.get(a + 1);
- NodeState afterChild =
this.after.getChildNode(afterName);
- handler.nodeReordered(destName, afterName, afterChild);
- }
- }
+ /**
+ * Increases the event counter and checks whether the event should
+ * be processed, i.e. whether the initial skip count has been reached.
+ */
+ private boolean beforeEvent() {
+ return ++counter > skip;
+ }
+
+ /**
+ * Checks whether enough events have already been processed in this
+ * continuation. If that is the case, we postpone further processing
+ * to a new continuation that will first skip all the initial events
+ * we've already seen. Otherwise we let the current diff continue.
+ */
+ private boolean afterEvent() {
+ if (counter >= skip + MAX_CHANGES_PER_CONTINUATION) {
+ continuations.add(
+ new Continuation(handler, before, after, counter));
+ return false;
+ } else {
+ return true;
}
}
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java?rev=1562341&r1=1562340&r2=1562341&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/EventQueue.java
Wed Jan 29 01:46:43 2014
@@ -71,7 +71,7 @@ class EventQueue implements EventIterato
@Override
public long getSize() {
- if (generator.isComplete()) {
+ if (generator.isDone()) {
// no more new events will be generated, so count just those
// that have already been iterated and those left in the queue
return position + queue.size();
@@ -90,7 +90,7 @@ class EventQueue implements EventIterato
@Override
public boolean hasNext() {
while (queue.isEmpty()) {
- if (generator.isComplete()) {
+ if (generator.isDone()) {
return false;
} else {
generator.generate();
@@ -109,7 +109,7 @@ class EventQueue implements EventIterato
queue.clear();
// generate more events if possible, otherwise fail
- if (!generator.isComplete()) {
+ if (!generator.isDone()) {
generator.generate();
} else {
throw new NoSuchElementException("Not enough events to skip");