Author: mduerig
Date: Tue Nov 12 09:31:08 2013
New Revision: 1540981
URL: http://svn.apache.org/r1540981
Log:
OAK-1143: [scala] Repository init throws "illegal cyclic reference involving
class ChangeDispatcher"
Implement ChangeDispatcher using CompositeObserver and BackgroundObserver
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java?rev=1540981&r1=1540980&r2=1540981&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
Tue Nov 12 09:31:08 2013
@@ -18,23 +18,14 @@
*/
package org.apache.jackrabbit.oak.spi.commit;
-import static com.google.common.base.Objects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Queues.newLinkedBlockingQueue;
import java.io.Closeable;
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import com.google.common.collect.Sets;
import org.apache.jackrabbit.oak.spi.state.NodeState;
/**
@@ -61,10 +52,10 @@ import org.apache.jackrabbit.oak.spi.sta
* notifications about all changes reported to this instance.
*/
public class ChangeDispatcher implements Observable {
- private final Set<Listener> listeners = Sets.newHashSet();
+ private final CompositeObserver observers = new CompositeObserver();
@Nonnull
- private volatile NodeState root;
+ private NodeState root;
/**
* Create a new instance for recording changes to a {@code NodeStore}
@@ -85,10 +76,17 @@ public class ChangeDispatcher implements
@Override
@Nonnull
public Closeable addObserver(Observer observer) {
- Listener listener = new Listener(observer, root);
- listener.start();
- register(listener);
- return listener;
+ // FIXME don't hard code queue size
+ final BackgroundObserver backgroundObserver = new
BackgroundObserver(observer, 8192);
+ backgroundObserver.contentChanged(root, null);
+ observers.addObserver(backgroundObserver);
+ return new Closeable() {
+ @Override
+ public void close() {
+ backgroundObserver.stop();
+ observers.removeObserver(backgroundObserver);
+ }
+ };
}
private final AtomicLong changeCount = new AtomicLong(0);
@@ -110,8 +108,10 @@ public class ChangeDispatcher implements
*/
public synchronized void beforeCommit(@Nonnull NodeState root) {
checkState(!inLocalCommit());
+ checkNotNull(root);
changeCount.incrementAndGet();
- externalChange(checkNotNull(root));
+ observers.contentChanged(root, null);
+ this.root = root;
}
/**
@@ -134,7 +134,7 @@ public class ChangeDispatcher implements
@Nonnull NodeState root, @Nonnull CommitInfo info) {
checkState(inLocalCommit());
checkNotNull(root);
- add(root, info);
+ observers.contentChanged(root, info);
this.root = root;
}
@@ -151,127 +151,10 @@ public class ChangeDispatcher implements
*/
public synchronized void afterCommit(@Nonnull NodeState root) {
checkState(inLocalCommit());
- externalChange(checkNotNull(root));
+ checkNotNull(root);
+ observers.contentChanged(root, null);
+ this.root = root;
changeCount.incrementAndGet();
}
- private synchronized void externalChange(NodeState root) {
- if (!root.equals(this.root)) {
- add(root, null);
- this.root = root;
- }
- }
-
- private void register(Listener listener) {
- synchronized (listeners) {
- listeners.add(listener);
- }
- }
-
- private void unregister(Listener listener) {
- synchronized (listeners) {
- listeners.remove(listener);
- }
- }
-
- private void add(NodeState root, CommitInfo info) {
- for (Listener l : getListeners()) {
- l.contentChanged(root, info);
- }
- }
-
- private Listener[] getListeners() {
- synchronized (listeners) {
- return listeners.toArray(new Listener[listeners.size()]);
- }
- }
-
- //------------------------------------------------------------< Listener
>---
-
- /**
- * Listener thread receiving changes reported into {@code
ChangeDispatcher} and
- * asynchronously distributing these to an associated {@link Observer}.
- */
- private class Listener extends Thread implements Closeable, Observer {
- private final LinkedBlockingQueue<Commit> commits =
newLinkedBlockingQueue();
- private final Observer observer;
-
- private boolean blocked = false;
- private volatile boolean stopping;
-
- Listener(Observer observer, NodeState root) {
- this.observer = checkNotNull(observer);
- commits.add(new Commit(root, null));
- setDaemon(true);
- setPriority(Thread.MIN_PRIORITY);
- }
-
- @Override
- public void contentChanged(NodeState root, CommitInfo info) {
- Commit commit = new Commit(root, blocked ? null : info);
- blocked = !commits.offer(commit);
- }
-
- @Override
- public void run() {
- try {
- while (!stopping) {
- Commit commit = commits.poll(100, TimeUnit.MILLISECONDS);
- if (commit != null) {
- observer.contentChanged(commit.getRoot(),
commit.getCommitInfo());
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void close() throws IOException {
- checkState(!stopping, "Change processor already stopped");
-
- unregister(this);
- stopping = true;
- if (Thread.currentThread() != this) {
- try {
- join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException
- ("Interruption while waiting for the listener
thread to terminate", e);
- }
- }
- }
- }
-
- //------------------------------------------------------------< Commit >---
-
- private static class Commit {
- private final NodeState root;
- private final CommitInfo commitInfo;
-
- Commit(@Nonnull NodeState root, @Nullable CommitInfo commitInfo) {
- this.root = checkNotNull(root);
- this.commitInfo = commitInfo;
- }
-
- @Nonnull
- NodeState getRoot() {
- return root;
- }
-
- @CheckForNull
- CommitInfo getCommitInfo() {
- return commitInfo;
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .add("root", root)
- .add("commit info", commitInfo)
- .toString();
- }
- }
-
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java?rev=1540981&r1=1540980&r2=1540981&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
Tue Nov 12 09:31:08 2013
@@ -57,7 +57,7 @@ public class CommitQueueTest {
MongoNodeState after = (MongoNodeState) root;
Revision r = after.getRevision();
// System.out.println("seen: " + r);
- if (r.compareRevisionTime(before) < 1) {
+ if (r.compareRevisionTime(before) < 0) {
exceptions.add(new Exception(
"Inconsistent revision sequence. Before: " +
before + ", after: " + r));