Author: mduerig
Date: Wed Mar 19 13:58:34 2014
New Revision: 1579233
URL: http://svn.apache.org/r1579233
Log:
OAK-1484: additional observation queue jmx attributes
Simplify logic around handling full queues
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java?rev=1579233&r1=1579232&r2=1579233&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
Wed Mar 19 13:58:34 2014
@@ -128,7 +128,6 @@ public class BackgroundObserver implemen
observer.contentChanged(change.root, change.info);
change = queue.poll();
}
- queueEmpty();
} catch (Throwable t) {
exceptionHandler.uncaughtException(Thread.currentThread(),
t);
}
@@ -179,16 +178,10 @@ public class BackgroundObserver implemen
}
/**
- * Called whenever the queue is 90% full.
+ * Called when ever an item has been added to the queue
+ * @param queueSize size of the queue
*/
- protected void queueNearlyFull() {}
-
- /**
- * Called whenever the queue has been emptied.
- */
- protected void queueEmpty() {
- warnOnFullQueue = true;
- }
+ protected void added(int queueSize) { }
/**
* Clears the change queue and signals the background thread to stop
@@ -251,14 +244,11 @@ public class BackgroundObserver implemen
last = change;
}
- if (10 * queue.remainingCapacity() < queueLength) {
- queueNearlyFull();
- }
-
// Set the completion handler on the currently running task. Multiple
calls
// to onComplete are not a problem here since we always pass the same
value.
// Thus there is no question as to which of the handlers will
effectively run.
currentTask.onComplete(completionHandler);
+ added(queue.size());
}
//------------------------------------------------------------< internal
>---
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1579233&r1=1579232&r2=1579233&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
Wed Mar 19 13:58:34 2014
@@ -71,8 +71,7 @@ import org.slf4j.LoggerFactory;
* delivering observation events and stopped to stop doing so.
*/
class ChangeProcessor implements Observer {
-
- private static final Logger log =
LoggerFactory.getLogger(ChangeProcessor.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ChangeProcessor.class);
private final ContentSession contentSession;
private final NamePathMapper namePathMapper;
@@ -147,23 +146,25 @@ class ChangeProcessor implements Observe
}
private BackgroundObserver createObserver(final WhiteboardExecutor
executor) {
- if (commitRateLimiter == null) {
- return new BackgroundObserver(this, executor, queueLength);
- } else {
- return new BackgroundObserver(this, executor, queueLength) {
- @Override
- protected void queueNearlyFull() {
- super.queueNearlyFull();
- commitRateLimiter.blockCommits();
- }
+ return new BackgroundObserver(this, executor, queueLength) {
+ private volatile boolean warnWhenFull = true;
- @Override
- protected void queueEmpty() {
- super.queueEmpty();
- commitRateLimiter.unblockCommits();
+ @Override
+ protected void added(int queueSize) {
+ if (warnWhenFull && queueSize == queueLength) {
+ warnWhenFull = false;
+ if (commitRateLimiter != null) {
+ commitRateLimiter.blockCommits();
+ }
+ LOG.warn("Revision queue is full. Further revisions will
be compacted.");
+ } else if (queueSize <= 1) {
+ warnWhenFull = true;
+ if (commitRateLimiter != null) {
+ commitRateLimiter.unblockCommits();
+ }
}
- };
- }
+ }
+ };
}
private final Monitor runningMonitor = new Monitor();
@@ -244,7 +245,7 @@ class ChangeProcessor implements Observe
}
}
} catch (Exception e) {
- log.warn("Error while dispatching observation events", e);
+ LOG.warn("Error while dispatching observation events", e);
}
}
previousRoot = root;