Hi,
While working on a test failure (OAK-5668), I noticed that if incoming
commits *stop* once observation queue is full, then the listeners
won't get last changes that were added after the queue got full.
What happens (at least afaiu):
* Obs q fills up as [C1, C2, ..., Cn]
* Another commit Cn+1 comes - this marks Cn as "external" and itself
gets dropped
* The listener can consume all events till Cn, but Cn+1 and future
ones don't get to listener
* Add another Cn+w after listener has consumed some events
* The listener would now consume event by diff(Cn, Cn+w)
(patch which updates an existing test to show this at [0]. That patch
would make ObservationQueueFullWarnTest#warnOnQueueFull fail.)
Is this expected? I was under the assumption that on arrival of Cn+1,
Cn gets dropped and Cn+1 is put in. Cn+1 is also marked external.
Thanks,
Vikas
[0]:
diff --git
a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
index dda8eab..3fddac5 100644
---
a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
+++
b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
@@ -115,6 +115,7 @@ public class ObservationQueueFullWarnTest extends
AbstractRepositoryTest {
try {
customLogs.starting();
addNodeToFillObsQueue();
+ emptyObsQueue();
assertTrue("Observation queue full warning must get
logged", customLogs.getLogs().size() > 0);
customLogs.finished();
} finally {
@@ -196,7 +197,7 @@ public class ObservationQueueFullWarnTest extends
AbstractRepositoryTest {
throws RepositoryException {
blockObservation.acquireUninterruptibly();
try {
- for (int i = 0; i <= OBS_QUEUE_LENGTH; i++) {
+ for (int i = 0; i <= OBS_QUEUE_LENGTH+1; i++) {
addANode("n");
}
} finally {
@@ -222,6 +223,7 @@ public class ObservationQueueFullWarnTest extends
AbstractRepositoryTest {
//up in case last few event were dropped due to full
observation queue
//(which is ok as the next event that comes in gets
diff-ed with last
//processed revision)
+ /*
if (numAddedNodes.get() < numObservedNodes.get() +
OBS_QUEUE_LENGTH) {
try {
addANode("addedWhileWaiting");
@@ -229,6 +231,7 @@ public class ObservationQueueFullWarnTest extends
AbstractRepositoryTest {
LOG.warn("exception while adding during wait: {}", e);
}
}
+ */
Thread.sleep(OBS_TIMEOUT_PER_ITEM/10);//The constant is exaggerated
remaining = end - System.currentTimeMillis();
}