Author: mduerig
Date: Wed Mar 19 13:58:34 2014
New Revision: 1579233

URL: http://svn.apache.org/r1579233
Log:
OAK-1484: additional observation queue jmx attributes
Simplify logic around handling full queues

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java?rev=1579233&r1=1579232&r2=1579233&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
 Wed Mar 19 13:58:34 2014
@@ -128,7 +128,6 @@ public class BackgroundObserver implemen
                         observer.contentChanged(change.root, change.info);
                         change = queue.poll();
                     }
-                    queueEmpty();
                 } catch (Throwable t) {
                     exceptionHandler.uncaughtException(Thread.currentThread(), 
t);
                 }
@@ -179,16 +178,10 @@ public class BackgroundObserver implemen
     }
 
     /**
-     * Called whenever the queue is 90% full.
+     * Called when ever an item has been added to the queue
+     * @param queueSize  size of the queue
      */
-    protected void queueNearlyFull() {}
-
-    /**
-     * Called whenever the queue has been emptied.
-     */
-    protected void queueEmpty() {
-        warnOnFullQueue = true;
-    }
+    protected void added(int queueSize) { }
 
     /**
      * Clears the change queue and signals the background thread to stop
@@ -251,14 +244,11 @@ public class BackgroundObserver implemen
             last = change;
         }
 
-        if (10 * queue.remainingCapacity() < queueLength) {
-            queueNearlyFull();
-        }
-
         // Set the completion handler on the currently running task. Multiple 
calls
         // to onComplete are not a problem here since we always pass the same 
value.
         // Thus there is no question as to which of the handlers will 
effectively run.
         currentTask.onComplete(completionHandler);
+        added(queue.size());
     }
 
     //------------------------------------------------------------< internal 
>---

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1579233&r1=1579232&r2=1579233&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
 Wed Mar 19 13:58:34 2014
@@ -71,8 +71,7 @@ import org.slf4j.LoggerFactory;
  * delivering observation events and stopped to stop doing so.
  */
 class ChangeProcessor implements Observer {
-
-    private static final Logger log = 
LoggerFactory.getLogger(ChangeProcessor.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangeProcessor.class);
 
     private final ContentSession contentSession;
     private final NamePathMapper namePathMapper;
@@ -147,23 +146,25 @@ class ChangeProcessor implements Observe
     }
 
     private BackgroundObserver createObserver(final WhiteboardExecutor 
executor) {
-        if (commitRateLimiter == null) {
-            return new BackgroundObserver(this, executor, queueLength);
-        } else {
-            return new BackgroundObserver(this, executor, queueLength) {
-                @Override
-                protected void queueNearlyFull() {
-                    super.queueNearlyFull();
-                    commitRateLimiter.blockCommits();
-                }
+        return new BackgroundObserver(this, executor, queueLength) {
+            private volatile boolean warnWhenFull = true;
 
-                @Override
-                protected void queueEmpty() {
-                    super.queueEmpty();
-                    commitRateLimiter.unblockCommits();
+            @Override
+            protected void added(int queueSize) {
+                if (warnWhenFull && queueSize == queueLength) {
+                    warnWhenFull = false;
+                    if (commitRateLimiter != null) {
+                        commitRateLimiter.blockCommits();
+                    }
+                    LOG.warn("Revision queue is full. Further revisions will 
be compacted.");
+                } else if (queueSize <= 1) {
+                    warnWhenFull = true;
+                    if (commitRateLimiter != null) {
+                        commitRateLimiter.unblockCommits();
+                    }
                 }
-            };
-        }
+            }
+        };
     }
 
     private final Monitor runningMonitor = new Monitor();
@@ -244,7 +245,7 @@ class ChangeProcessor implements Observe
                     }
                 }
             } catch (Exception e) {
-                log.warn("Error while dispatching observation events", e);
+                LOG.warn("Error while dispatching observation events", e);
             }
         }
         previousRoot = root;


Reply via email to