Author: catholicon
Date: Sun Feb 19 11:02:57 2017
New Revision: 1783624

URL: http://svn.apache.org/viewvc?rev=1783624&view=rev
Log:
OAK-5668: Test failure: 
observation.ObservationQueueFullWarnTest.warnOnRepeatedQueueFull (backport 
r1783619 from trunk)

Test added in OAK-5626 failed intermittently. The reason was that the test's 
assumption of emptying up observation queue when it wished wasn't accurate.
Refactored the test.


Modified:
    jackrabbit/oak/branches/1.4/   (props changed)
    
jackrabbit/oak/branches/1.4/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java

Propchange: jackrabbit/oak/branches/1.4/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Feb 19 11:02:57 2017
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1733615,1733875,1733913,1733929,1734230,1734254,1734279,1734941,1735052,1735081,1735109,1735141,1735267,1735405,1735484,1735549,1735564,1735588,1735622,1735638,1735919,1735983,1736176,1737309-1737310,1737334,1737349,1737998,1738004,1738136,1738138,1738207,1738234,1738252,1738775,1738795,1738833,1738950,1738957,1738963,1739712,1739760,1739867,1739894,1739959-1739960,1740114,1740116,1740250,1740333,1740349,1740360,1740625-1740626,1740774,1740837,1740879,1740971,1741016,1741032,1741339,1741343,1742077,1742117,1742125,1742363,1742520,1742888,1742916,1743097,1743172,1743343,1743674,1744265,1744292,1744589,1744670,1744672,1744959,1745038,1745127,1745197,1745336,1745368,1746086,1746117,1746342,1746345,1746408,1746696,1746981,1747198,1747200,1747341-1747342,1747380,1747387,1747406,1747492,1747512,1747654,1748505,1748553,1748722,1748870,1749275,1749350,1749424,1749443,1749464,1749475,1749645,1749662,1749815,1749872,1749875,1749899,1750052,1750076-1750077,1750287,1750457
 
,1750462,1750465,1750495,1750626,1750809,1750886-1750887,1751396,1751410,1751419,1751445-1751446,1751478,1751748,1751753,1751755,1751871,1752198,1752202,1752259,1752273-1752274,1752283,1752292,1752438,1752447-1752448,1752508,1752596,1752616,1752659,1752672,1753262,1753331-1753332,1753335-1753336,1753355,1753444,1754117,1754239,1755157,1755191,1756520,1756580,1757119,1757166,1758213,1758713,1759433,1759795,1759826,1760326,1760340,1760373,1760387,1760486,1760492,1760494,1760661-1760662,1761412,1761444,1761571,1761762,1761787,1761876,1762453,1762612,1762632,1762635,1763347,1763355-1763356,1763378,1763465,1763735,1764678,1764705,1764814,1764898,1765817,1765983,1766071,1766390,1766423,1766496,1766519,1766554,1766644,1767025,1767265,1767704,1768446,1768637,1769078,1770694,1770982,1771022,1771093,1771098,1771739,1771852,1771870,1771902,1772155,1772162,1772228,1772593,1772768,1773190,1774497,1775474,1775622,1775628,1775757,1778423,1778968,1779478,1780388,1780538,1780543,1781068,1781075,1781
 386,1781846,1781907,1783066,1783089,1783104-1783105
+/jackrabbit/oak/trunk:1733615,1733875,1733913,1733929,1734230,1734254,1734279,1734941,1735052,1735081,1735109,1735141,1735267,1735405,1735484,1735549,1735564,1735588,1735622,1735638,1735919,1735983,1736176,1737309-1737310,1737334,1737349,1737998,1738004,1738136,1738138,1738207,1738234,1738252,1738775,1738795,1738833,1738950,1738957,1738963,1739712,1739760,1739867,1739894,1739959-1739960,1740114,1740116,1740250,1740333,1740349,1740360,1740625-1740626,1740774,1740837,1740879,1740971,1741016,1741032,1741339,1741343,1742077,1742117,1742125,1742363,1742520,1742888,1742916,1743097,1743172,1743343,1743674,1744265,1744292,1744589,1744670,1744672,1744959,1745038,1745127,1745197,1745336,1745368,1746086,1746117,1746342,1746345,1746408,1746696,1746981,1747198,1747200,1747341-1747342,1747380,1747387,1747406,1747492,1747512,1747654,1748505,1748553,1748722,1748870,1749275,1749350,1749424,1749443,1749464,1749475,1749645,1749662,1749815,1749872,1749875,1749899,1750052,1750076-1750077,1750287,1750457
 
,1750462,1750465,1750495,1750626,1750809,1750886-1750887,1751396,1751410,1751419,1751445-1751446,1751478,1751748,1751753,1751755,1751871,1752198,1752202,1752259,1752273-1752274,1752283,1752292,1752438,1752447-1752448,1752508,1752596,1752616,1752659,1752672,1753262,1753331-1753332,1753335-1753336,1753355,1753444,1754117,1754239,1755157,1755191,1756520,1756580,1757119,1757166,1758213,1758713,1759433,1759795,1759826,1760326,1760340,1760373,1760387,1760486,1760492,1760494,1760661-1760662,1761412,1761444,1761571,1761762,1761787,1761876,1762453,1762612,1762632,1762635,1763347,1763355-1763356,1763378,1763465,1763735,1764678,1764705,1764814,1764898,1765817,1765983,1766071,1766390,1766423,1766496,1766519,1766554,1766644,1767025,1767265,1767704,1768446,1768637,1769078,1770694,1770982,1771022,1771093,1771098,1771739,1771852,1771870,1771902,1772155,1772162,1772228,1772593,1772768,1773190,1774497,1775474,1775622,1775628,1775757,1778423,1778968,1779478,1780388,1780538,1780543,1781068,1781075,1781
 386,1781846,1781907,1783066,1783089,1783104-1783105,1783619
 /jackrabbit/trunk:1345480

Modified: 
jackrabbit/oak/branches/1.4/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java?rev=1783624&r1=1783623&r2=1783624&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.4/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
 (original)
+++ 
jackrabbit/oak/branches/1.4/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
 Sun Feb 19 11:02:57 2017
@@ -31,11 +31,14 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jcr.Node;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import javax.jcr.SimpleCredentials;
+import javax.jcr.observation.Event;
 import javax.jcr.observation.EventIterator;
 import javax.jcr.observation.EventListener;
 import javax.jcr.observation.ObservationManager;
@@ -44,26 +47,38 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static javax.jcr.observation.Event.NODE_ADDED;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(Parameterized.class)
 public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
-    static final int OBS_QUEUE_LENGTH = 5;
-    static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further 
revisions will be compacted.";
+    private static final int OBS_QUEUE_LENGTH = 5;
+    private static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. 
Further revisions will be compacted.";
 
     private static final String TEST_NODE = "test_node";
     private static final String TEST_NODE_TYPE = "oak:Unstructured";
     private static final String TEST_PATH = '/' + TEST_NODE;
 
-    private static final long CONDITION_TIMEOUT = 10*1000;
+    private static final long OBS_TIMEOUT_PER_ITEM = 1000;
+    private static final long CONDITION_TIMEOUT = OBS_QUEUE_LENGTH * 
OBS_TIMEOUT_PER_ITEM;
 
     private Session observingSession;
     private ObservationManager observationManager;
 
+    private final BlockableListener listener = new BlockableListener();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ObservationQueueFullWarnTest.class);
+
+    private final Semaphore blockObservation = new Semaphore(1);
+
+    private final AtomicInteger numAddedNodes = new AtomicInteger(0);
+    private final AtomicInteger numObservedNodes = new AtomicInteger(0);
+
     public ObservationQueueFullWarnTest(NodeStoreFixture fixture) {
         super(fixture);
+        LOG.info("fixture: {}", fixture);
     }
 
     @Override
@@ -78,7 +93,7 @@ public class ObservationQueueFullWarnTes
         session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
         session.save();
 
-        Map<String,Object> attrs = new HashMap<>();
+        Map<String, Object> attrs = new HashMap<>();
         attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0);
         observingSession = ((JackrabbitRepository) getRepository()).login(new 
SimpleCredentials("admin", "admin".toCharArray()), null, attrs);
         observationManager = 
observingSession.getWorkspace().getObservationManager();
@@ -96,17 +111,13 @@ public class ObservationQueueFullWarnTes
                 .contains(OBS_QUEUE_FULL_WARN)
                 .create();
 
-        final LoggingListener listener = new LoggingListener();
         observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, 
true, null, null, false);
         try {
-            Node n = getAdminSession().getNode(TEST_PATH);
-
             customLogs.starting();
-            addNodeToFillObsQueue(n, 0, listener);
-            assertTrue("Observation queue full warning must gets logged", 
customLogs.getLogs().size() > 0);
+            addNodeToFillObsQueue();
+            assertTrue("Observation queue full warning must get logged", 
customLogs.getLogs().size() > 0);
             customLogs.finished();
-        }
-        finally {
+        } finally {
             observationManager.removeEventListener(listener);
         }
     }
@@ -135,40 +146,35 @@ public class ObservationQueueFullWarnTes
         ChangeProcessor.clock = virtualClock;
         virtualClock.waitUntil(System.currentTimeMillis());
 
-        final LoggingListener listener = new LoggingListener();
         observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, 
true, null, null, false);
         try {
-            Node n = getAdminSession().getNode(TEST_PATH);
-            int nodeNameCounter = 0;
-
             //Create first level WARN message
-            nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, 
listener);
-            emptyObsQueueABit(listener);
+            addNodeToFillObsQueue();
+            emptyObsQueue();
 
             //Don't wait, fill up the queue again
             warnLogs.starting();
             debugLogs.starting();
-            nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, 
listener);
+            addNodeToFillObsQueue();
             assertTrue("Observation queue full warning must not logged until 
some time has past since last log",
                     warnLogs.getLogs().size() == 0);
             assertTrue("Observation queue full warning should get logged on 
debug though in the mean time",
                     debugLogs.getLogs().size() > 0);
             warnLogs.finished();
             debugLogs.finished();
-            emptyObsQueueABit(listener);
+            emptyObsQueue();
 
             //Wait some time so reach WARN level again
             virtualClock.waitUntil(virtualClock.getTime() + 
ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
 
             warnLogs.starting();
             debugLogs.starting();
-            addNodeToFillObsQueue(n, nodeNameCounter, listener);
+            addNodeToFillObsQueue();
             assertTrue("Observation queue full warning must get logged after 
some time has past since last log",
                     warnLogs.getLogs().size() > 0);
             warnLogs.finished();
             debugLogs.finished();
-        }
-        finally {
+        } finally {
             observationManager.removeEventListener(listener);
             ChangeProcessor.clock = oldClockInstance;
             ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval;
@@ -177,66 +183,79 @@ public class ObservationQueueFullWarnTes
         }
     }
 
-    private void emptyObsQueueABit(final LoggingListener listener) throws 
InterruptedException {
-        //Let queue empty up a bit.
-        boolean notTimedOut = listener.waitFor(CONDITION_TIMEOUT, new 
Condition() {
-            @Override
-            public boolean evaluate() {
-                return listener.numAdded >= 2;
-            }
-        });
-        listener.numAdded = 0;
-        assertTrue("Listener didn't process events within time-out", 
notTimedOut);
-    }
-
-    private interface Condition {
-        boolean evaluate();
+    private void addANode(String prefix) throws RepositoryException {
+        Session session = getAdminSession();
+        Node parent = session.getNode(TEST_PATH);
+        String nodeName = prefix + numAddedNodes.get();
+        parent.addNode(nodeName);
+        session.save();
+        numAddedNodes.incrementAndGet();
     }
 
-    private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, 
LoggingListener listener)
+    private void addNodeToFillObsQueue()
             throws RepositoryException {
-        listener.blockObservation.acquireUninterruptibly();
+        blockObservation.acquireUninterruptibly();
         try {
-            for (int i = 0; i <= OBS_QUEUE_LENGTH; i++, nodeNameCounter++) {
-                parent.addNode("n" + nodeNameCounter);
-                parent.getSession().save();
+            for (int i = 0; i <= OBS_QUEUE_LENGTH; i++) {
+                addANode("n");
             }
-            return nodeNameCounter;
         } finally {
-            listener.blockObservation.release();
+            blockObservation.release();
         }
     }
 
-    private class LoggingListener implements EventListener {
+    private interface Condition {
+        boolean evaluate();
+    }
 
-        private volatile int numAdded = 0;
+    private boolean waitFor(long timeout, Condition c)
+            throws InterruptedException {
+        long end = System.currentTimeMillis() + timeout;
+        long remaining = end - System.currentTimeMillis();
+        while (remaining > 0) {
+            if (c.evaluate()) {
+                return true;
+            }
 
-        Semaphore blockObservation = new Semaphore(1);
+            //Add another node only when num_pending_to_be_observed nodes is
+            //less that observation queue. This is done to let all observation 
finish
+            //up in case last few event were dropped due to full observation 
queue
+            //(which is ok as the next event that comes in gets diff-ed with 
last
+            //processed revision)
+            if (numAddedNodes.get() < numObservedNodes.get() + 
OBS_QUEUE_LENGTH) {
+                try {
+                    addANode("addedWhileWaiting");
+                } catch (RepositoryException e) {
+                    LOG.warn("exception while adding during wait: {}", e);
+                }
+            }
+            Thread.sleep(OBS_TIMEOUT_PER_ITEM/10);//The constant is exaggerated
+            remaining = end - System.currentTimeMillis();
+        }
+        return c.evaluate();
+    }
 
+    private void emptyObsQueue() throws InterruptedException {
+        boolean notTimedOut = waitFor(CONDITION_TIMEOUT, new Condition() {
+            @Override
+            public boolean evaluate() {
+                return numObservedNodes.get()==numAddedNodes.get();
+            }
+        });
+        assertTrue("Listener didn't process events within time-out", 
notTimedOut);
+    }
+
+    private class BlockableListener implements EventListener {
         @Override
-        public synchronized void onEvent(EventIterator events) {
+        public void onEvent(EventIterator events) {
             blockObservation.acquireUninterruptibly();
             while (events.hasNext()) {
-                events.nextEvent();
-                numAdded++;
-            }
-            blockObservation.release();
-
-            notifyAll();
-        }
-
-        synchronized boolean waitFor(long timeout, Condition c)
-                throws InterruptedException {
-            long end = System.currentTimeMillis() + timeout;
-            long remaining = end - System.currentTimeMillis();
-            while (remaining > 0) {
-                if (c.evaluate()) {
-                    return true;
+                Event event = events.nextEvent();
+                if (event.getType() == Event.NODE_ADDED) {
+                    numObservedNodes.incrementAndGet();
                 }
-                wait(remaining);
-                remaining = end - System.currentTimeMillis();
             }
-            return false;
+            blockObservation.release();
         }
     }
 }


Reply via email to