Author: jukka
Date: Fri Jun 14 08:11:09 2013
New Revision: 1492989

URL: http://svn.apache.org/r1492989
Log:
OAK-867: Oak whiteboard

Avoid the RepositoryTest.observationDispose() failure

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1492989&r1=1492988&r2=1492989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
 Fri Jun 14 08:11:09 2013
@@ -64,7 +64,9 @@ class ChangeProcessor implements Runnabl
 
     private final Exception initStacktrace;
 
-    private volatile boolean stopping;
+    private volatile boolean running = false;
+    private volatile boolean stopping = false;
+
     private Registration registration;
     private Listener changeListener;
 
@@ -119,8 +121,17 @@ class ChangeProcessor implements Runnabl
     }
 
     @Override
-    public synchronized void run() {
-        try{
+    public void run() {
+        // guarantee that only one thread is processing changes at a time
+        synchronized (this) {
+            if (running) {
+                return;
+            } else {
+                running = true;
+            }
+        }
+
+        try {
             ChangeSet changes = changeListener.getChanges();
             if (changes != null &&
                     !(filterRef.get().excludeLocal() && 
changes.isLocal(observationManager.getContentSession()))) {
@@ -132,6 +143,8 @@ class ChangeProcessor implements Runnabl
             }
         } catch (Exception e) {
             log.error("Unable to generate or send events", e);
+        } finally {
+            running = false;
         }
     }
 

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java?rev=1492989&r1=1492988&r2=1492989&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
 Fri Jun 14 08:11:09 2013
@@ -18,7 +18,10 @@
  */
 package org.apache.jackrabbit.oak.plugins.observation;
 
+import static com.google.common.collect.Lists.newArrayList;
+
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -30,6 +33,8 @@ import javax.jcr.observation.EventListen
 import javax.jcr.observation.ObservationManager;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter;
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
@@ -63,11 +68,17 @@ public class ObservationManagerImpl impl
         this.whiteboard = whiteboard;
     }
 
-    public synchronized void dispose() {
-        for (ChangeProcessor processor : processors.values()) {
+    public void dispose() {
+        List<ChangeProcessor> toBeStopped;
+
+        synchronized (this) {
+            toBeStopped = newArrayList(processors.values());
+            processors.clear();
+        }
+
+        for (ChangeProcessor processor : toBeStopped) {
             processor.stop();
         }
-        processors.clear();
     }
 
     /**


Reply via email to