Author: jukka
Date: Fri Jun 14 08:11:09 2013
New Revision: 1492989
URL: http://svn.apache.org/r1492989
Log:
OAK-867: Oak whiteboard
Avoid the RepositoryTest.observationDispose() failure
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1492989&r1=1492988&r2=1492989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
Fri Jun 14 08:11:09 2013
@@ -64,7 +64,9 @@ class ChangeProcessor implements Runnabl
private final Exception initStacktrace;
- private volatile boolean stopping;
+ private volatile boolean running = false;
+ private volatile boolean stopping = false;
+
private Registration registration;
private Listener changeListener;
@@ -119,8 +121,17 @@ class ChangeProcessor implements Runnabl
}
@Override
- public synchronized void run() {
- try{
+ public void run() {
+ // guarantee that only one thread is processing changes at a time
+ synchronized (this) {
+ if (running) {
+ return;
+ } else {
+ running = true;
+ }
+ }
+
+ try {
ChangeSet changes = changeListener.getChanges();
if (changes != null &&
!(filterRef.get().excludeLocal() &&
changes.isLocal(observationManager.getContentSession()))) {
@@ -132,6 +143,8 @@ class ChangeProcessor implements Runnabl
}
} catch (Exception e) {
log.error("Unable to generate or send events", e);
+ } finally {
+ running = false;
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java?rev=1492989&r1=1492988&r2=1492989&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
Fri Jun 14 08:11:09 2013
@@ -18,7 +18,10 @@
*/
package org.apache.jackrabbit.oak.plugins.observation;
+import static com.google.common.collect.Lists.newArrayList;
+
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,6 +33,8 @@ import javax.jcr.observation.EventListen
import javax.jcr.observation.ObservationManager;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
@@ -63,11 +68,17 @@ public class ObservationManagerImpl impl
this.whiteboard = whiteboard;
}
- public synchronized void dispose() {
- for (ChangeProcessor processor : processors.values()) {
+ public void dispose() {
+ List<ChangeProcessor> toBeStopped;
+
+ synchronized (this) {
+ toBeStopped = newArrayList(processors.values());
+ processors.clear();
+ }
+
+ for (ChangeProcessor processor : toBeStopped) {
processor.stop();
}
- processors.clear();
}
/**