Author: mduerig
Date: Tue Nov 12 09:33:33 2013
New Revision: 1540983
URL: http://svn.apache.org/r1540983
Log:
OAK-1143: [scala] Repository init throws "illegal cyclic reference involving
class ChangeDispatcher"
Make ChangeDispatcher implement Observer and replace beforeCommit, localCommit
and afterCommit method with Observer.contentChanged
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java?rev=1540983&r1=1540982&r2=1540983&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
Tue Nov 12 09:33:33 2013
@@ -109,11 +109,11 @@ class CommitQueue {
store.setHeadRevision(c.getRevision());
NodeState root = store.getRoot();
// TODO: correct?
- dispatcher.beforeCommit(store.getRoot(before));
+ dispatcher.contentChanged(store.getRoot(before), null);
try {
- dispatcher.localCommit(root, info);
+ dispatcher.contentChanged(root, info);
} finally {
- dispatcher.afterCommit(root);
+ dispatcher.contentChanged(root, null);
}
} finally {
// notify next if there is any
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1540983&r1=1540982&r2=1540983&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
Tue Nov 12 09:33:33 2013
@@ -78,16 +78,16 @@ public class SegmentNodeStore implements
}
boolean setHead(SegmentNodeState base, SegmentNodeState head, CommitInfo
info) {
- changeDispatcher.beforeCommit(base.getChildNode(ROOT));
+ changeDispatcher.contentChanged(base.getChildNode(ROOT), null);
try {
if (journal.setHead(base.getRecordId(), head.getRecordId())) {
- changeDispatcher.localCommit(head.getChildNode(ROOT), info);
+ changeDispatcher.contentChanged(head.getChildNode(ROOT), info);
return true;
} else {
return false;
}
} finally {
- changeDispatcher.afterCommit(getRoot());
+ changeDispatcher.contentChanged(getRoot(), null);
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java?rev=1540983&r1=1540982&r2=1540983&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
Tue Nov 12 09:33:33 2013
@@ -19,47 +19,35 @@
package org.apache.jackrabbit.oak.spi.commit;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
import java.io.Closeable;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.apache.jackrabbit.oak.spi.state.NodeState;
/**
- * A {@code ChangeDispatcher} instance records changes to a
- * {@link org.apache.jackrabbit.oak.spi.state.NodeStore}
- * and dispatches them to interested parties.
+ * A {@code ChangeDispatcher} instance dispatches content changes
+ * to registered {@link Observer}s.
* <p>
- * Actual changes are reported by calling {@link #beforeCommit(NodeState)},
- * {@link #localCommit(NodeState, CommitInfo)} and {@link
#afterCommit(NodeState)} in that order:
- * <pre>
- NodeState root = store.getRoot();
- branch.rebase();
- changeDispatcher.beforeCommit(root);
- try {
- NodeState head = branch.getHead();
- branch.merge();
- changeDispatcher.localCommit(head);
- } finally {
- changeDispatcher.afterCommit(store.getRoot());
- }
- * </pre>
+ * Changes are reported by calling {@link #contentChanged(NodeState,
CommitInfo)}.
* <p>
* The {@link #addObserver(Observer)} method registers an {@link Observer} for
receiving
- * notifications about all changes reported to this instance.
+ * notifications for all changes reported to this instance.
*/
-public class ChangeDispatcher implements Observable {
+public class ChangeDispatcher implements Observable, Observer {
+ // TODO make the queue size configurable
+ private static final int QUEUE_SIZE = 8192;
+
private final CompositeObserver observers = new CompositeObserver();
@Nonnull
private NodeState root;
/**
- * Create a new instance for recording changes to a {@code NodeStore}
- * @param root current root node state of the node store
+ * Create a new instance for dispatching content changes
+ * @param root current root node state
*/
public ChangeDispatcher(@Nonnull NodeState root) {
this.root = checkNotNull(root);
@@ -76,8 +64,7 @@ public class ChangeDispatcher implements
@Override
@Nonnull
public Closeable addObserver(Observer observer) {
- // FIXME don't hard code queue size
- final BackgroundObserver backgroundObserver = new
BackgroundObserver(observer, 8192);
+ final BackgroundObserver backgroundObserver = new
BackgroundObserver(observer, QUEUE_SIZE);
backgroundObserver.contentChanged(root, null);
observers.addObserver(backgroundObserver);
return new Closeable() {
@@ -89,72 +76,10 @@ public class ChangeDispatcher implements
};
}
- private final AtomicLong changeCount = new AtomicLong(0);
-
- private boolean inLocalCommit() {
- return changeCount.get() % 2 == 1;
- }
-
- /**
- * Call with the latest persisted root node state right before persisting
further changes.
- * Calling this method marks this instance to be inside a local commit.
- * <p>
- * The differences from the root node state passed to the last call to
- * {@link #afterCommit(NodeState)} to {@code root} are reported as cluster
external
- * changes to any listener.
- *
- * @param root latest persisted root node state.
- * @throws IllegalStateException if inside a local commit
- */
- public synchronized void beforeCommit(@Nonnull NodeState root) {
- checkState(!inLocalCommit());
- checkNotNull(root);
- changeCount.incrementAndGet();
- observers.contentChanged(root, null);
- this.root = root;
- }
-
- /**
- * Call right after changes have been successfully persisted passing
- * the new root node state resulting from the persist operation.
- * <p>
- * The differences from the root node state passed to the last call to
- * {@link #beforeCommit(NodeState)} to {@code root} are reported as
- * cluster local changes to any listener in case non-{@code null}
- * commit information is provided. If no local commit information is
- * given, then no local changes are reported and the committed changes
- * will only show up as an aggregate with any concurrent external changes
- * reported during the {@link #afterCommit(NodeState)} call.
- *
- * @param root root node state just persisted
- * @param info commit information
- * @throws IllegalStateException if not inside a local commit
- */
- public synchronized void localCommit(
- @Nonnull NodeState root, @Nonnull CommitInfo info) {
- checkState(inLocalCommit());
+ @Override
+ public synchronized void contentChanged(@Nonnull NodeState root, @Nullable
CommitInfo info) {
checkNotNull(root);
observers.contentChanged(root, info);
this.root = root;
}
-
- /**
- * Call to mark the end of a persist operation passing the latest
persisted root node state.
- * Calling this method marks this instance to not be inside a local commit.
- * <p>
- * The difference from the root node state passed to the las call to
- * {@link #localCommit(NodeState, CommitInfo)} to {@code root} are
reported as cluster external
- * changes to any listener.
-
- * @param root latest persisted root node state.
- * @throws IllegalStateException if not inside a local commit
- */
- public synchronized void afterCommit(@Nonnull NodeState root) {
- checkState(inLocalCommit());
- checkNotNull(root);
- observers.contentChanged(root, null);
- this.root = root;
- changeCount.incrementAndGet();
- }
-
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java?rev=1540983&r1=1540982&r2=1540983&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java
Tue Nov 12 09:33:33 2013
@@ -389,14 +389,14 @@ public abstract class AbstractNodeStoreB
throws CommitFailedException {
try {
rebase();
- dispatcher.beforeCommit(base);
+ dispatcher.contentChanged(base, null);
NodeState toCommit = checkNotNull(hook).processCommit(base,
head);
NodeState newHead =
AbstractNodeStoreBranch.this.persist(toCommit, base, info);
- dispatcher.localCommit(newHead, info);
+ dispatcher.contentChanged(newHead, info);
branchState = new Merged(base);
return newHead;
} finally {
- dispatcher.afterCommit(getRoot());
+ dispatcher.contentChanged(getRoot(), null);
}
}
}
@@ -469,7 +469,7 @@ public abstract class AbstractNodeStoreB
NodeState merge(@Nonnull CommitHook hook, CommitInfo info) throws
CommitFailedException {
try {
rebase();
- dispatcher.beforeCommit(base);
+ dispatcher.contentChanged(base, null);
NodeState toCommit = checkNotNull(hook).processCommit(base,
head);
if (toCommit.equals(base)) {
branchState = new Merged(base);
@@ -477,12 +477,12 @@ public abstract class AbstractNodeStoreB
} else {
head = AbstractNodeStoreBranch.this.persist(toCommit,
head, info);
NodeState newRoot =
AbstractNodeStoreBranch.this.merge(head, info);
- dispatcher.localCommit(newRoot, info);
+ dispatcher.contentChanged(newRoot, info);
branchState = new Merged(base);
return newRoot;
}
} finally {
- dispatcher.afterCommit(getRoot());
+ dispatcher.contentChanged(getRoot(), null);
}
}