Author: reschke
Date: Fri Nov 13 10:11:03 2015
New Revision: 1714176

URL: http://svn.apache.org/viewvc?rev=1714176&view=rev
Log:
OAK-3566: avoid stale documents in RDBDocumentStore cache by keeping track of 
which documents got invalidated while queries were running (ported to 1.2)

Modified:
    jackrabbit/oak/branches/1.2/   (props changed)
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java

Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 13 10:11:03 2015
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673713,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678124,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680172,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1681955,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820
 
,1684868,1685023,1685075,1685370,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689831,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691139,1691151,1691159,1691167,1691183,1691188,1691210,1691280,1691307,1691331-1691333,1691345,1691384-1691385,1691401,1691509,1692133-1692134,1692156,1692250,1692274,1692363,1692382,1692478,1692955,1693002,1693030,1693209,1693421,1693525-1693526,1694007,1694393-1694394,1694651,1694653-1694654,1695032,1695050,1695122,1695280,1695299,1695420,1695457,1695482,1695492,1695507,1695521,1695540,1695905,1696190,1696194,1696
 
242,1696285,1696375,1696522,1696578,1696759,1696916,1697363,1697373,1697410,1697582,1697589,1697616,1697672,1700191,1700231,1700397,1700403,1700506,1700571,1700727,1700749,1700769,1700775,1701065,1701619,1701733,1701743,1701750,1701768,1701806,1701810,1701814,1701948,1701955,1701959,1701965,1701986,1702014,1702022,1702045,1702051,1702241,1702272,1702387,1702405,1702423,1702860,1702942,1702960,1703212,1703382,1703395,1703411,1703428,1703430,1703568,1703592,1703758,1703858,1703878,1704256,1704282,1704285,1704457,1704479,1704490,1704614,1704629,1704636,1704655,1704670,1704886,1705005,1705027,1705043,1705055,1705250,1705268,1705273,1705323,1705677,1705701,1705871,1705992,1705998,1706009,1706037,1706059,1706212,1706218,1706270,1706764,1706772,1707049,1707191,1707435,1708105,1708315,1708546,1708592,1708766,1709012,1709852,1709978,1710013,1710031,1710049,1710205,1710242,1710559,1710575,1710590,1710614,1710637,1710789,1710811,1710816,1710972,1711248,1711296,1711498,1712042,1712490,1712531,1
 712730,1712785,1713008,1713439,1713461,1713580,1713586,1713626
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673713,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678124,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680172,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1681955,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820
 
,1684868,1685023,1685075,1685370,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689831,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691139,1691151,1691159,1691167,1691183,1691188,1691210,1691280,1691307,1691331-1691333,1691345,1691384-1691385,1691401,1691509,1692133-1692134,1692156,1692250,1692274,1692363,1692382,1692478,1692955,1693002,1693030,1693209,1693421,1693525-1693526,1694007,1694393-1694394,1694651,1694653-1694654,1695032,1695050,1695122,1695280,1695299,1695420,1695457,1695482,1695492,1695507,1695521,1695540,1695905,1696190,1696194,1696
 
242,1696285,1696375,1696522,1696578,1696759,1696916,1697363,1697373,1697410,1697582,1697589,1697616,1697672,1700191,1700231,1700397,1700403,1700506,1700571,1700727,1700749,1700769,1700775,1701065,1701619,1701733,1701743,1701750,1701768,1701806,1701810,1701814,1701948,1701955,1701959,1701965,1701986,1702014,1702022,1702045,1702051,1702241,1702272,1702387,1702405,1702423,1702860,1702942,1702960,1703212,1703382,1703395,1703411,1703428,1703430,1703568,1703592,1703758,1703858,1703878,1704256,1704282,1704285,1704457,1704479,1704490,1704614,1704629,1704636,1704655,1704670,1704886,1705005,1705027,1705043,1705055,1705250,1705268,1705273,1705323,1705677,1705701,1705871,1705992,1705998,1706009,1706037,1706059,1706212,1706218,1706270,1706764,1706772,1707049,1707191,1707435,1708105,1708315,1708546,1708592,1708766,1709012,1709852,1709978,1710013,1710031,1710049,1710205,1710242,1710559,1710575,1710590,1710614,1710637,1710789,1710811,1710816,1710972,1711248,1711282,1711296,1711498,1712042,1712319,1
 
712490,1712531,1712730,1712785,1712963,1713008,1713439,1713461,1713580,1713586,1713600,1713626,1713803
 /jackrabbit/trunk:1345480

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java?rev=1714176&r1=1714175&r2=1714176&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java
 Fri Nov 13 10:11:03 2015
@@ -176,6 +176,7 @@ public class RDBDocumentSerializer {
     /**
      * Reconstructs a {@link Document) based on the persisted {@link RDBRow}.
      */
+    @Nonnull
     public <T extends Document> T fromRow(@Nonnull Collection<T> collection, 
@Nonnull RDBRow row) throws DocumentStoreException {
         T doc = collection.newDocument(store);
         doc.put(ID, row.getId());

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1714176&r1=1714175&r2=1714176&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 Fri Nov 13 10:11:03 2015
@@ -44,6 +44,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Lock;
 import java.util.zip.Deflater;
@@ -81,6 +82,9 @@ import com.google.common.cache.Cache;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.PrimitiveSink;
 import com.google.common.util.concurrent.Striped;
 
 /**
@@ -1029,7 +1033,14 @@ public class RDBDocumentStore implements
 
             for (List<String> chunkedIds : Lists.partition(ids, CHUNKSIZE)) {
                 // remember what we already have in the cache
+                Set<QueryContext> seenQueryContext = new 
HashSet<QueryContext>();
                 Map<String, NodeDocument> cachedDocs = Collections.emptyMap();
+                // keep concurrently running queries from updating
+                // the cache entry for this key
+                for (QueryContext qc : qmap.values()) {
+                    qc.addKeys(chunkedIds);
+                    seenQueryContext.add(qc);
+                }
                 if (collection == Collection.NODES) {
                     cachedDocs = new HashMap<String, NodeDocument>();
                     for (String key : chunkedIds) {
@@ -1052,12 +1063,20 @@ public class RDBDocumentStore implements
                     this.ch.closeConnection(connection);
                 }
                 if (success) {
+                    // keep concurrently running queries from updating
+                    // the cache entry for this key
+                    for (QueryContext qc : qmap.values()) {
+                        if (!seenQueryContext.contains(qc)) {
+                            qc.addKeys(chunkedIds);
+                        }
+                    }
                     for (Entry<String, NodeDocument> entry : 
cachedDocs.entrySet()) {
                         T oldDoc = castAsT(entry.getValue());
                         if (oldDoc == null) {
+                            String id = entry.getKey();
                             // make sure concurrently loaded document is
                             // invalidated
-                            nodesCache.invalidate(new 
StringValue(entry.getKey()));
+                            nodesCache.invalidate(new StringValue(id));
                         } else {
                             T newDoc = applyChanges(collection, oldDoc, 
update, true);
                             if (newDoc != null) {
@@ -1082,6 +1101,72 @@ public class RDBDocumentStore implements
         }
     }
 
+    /**
+     * Class used to track which documents may have been updated since the 
start
+     * of the query and thus may not put into the cache.
+     */
+    private class QueryContext {
+
+        private static final double FPP = 0.01d;
+        private static final int ENTRIES_SCOPED = 1000;
+        private static final int ENTRIES_OPEN = 10000;
+
+        private final String fromKey, toKey;
+        private volatile BloomFilter<String> filter = null;
+
+        private BloomFilter<String> getFilter() {
+            if (filter == null) {
+                synchronized (this) {
+                    if (filter == null) {
+                        filter = BloomFilter.create(new Funnel<String>() {
+                            private static final long serialVersionUID = 
-7114267990225941161L;
+
+                            @Override
+                            public void funnel(String from, PrimitiveSink 
into) {
+                                into.putUnencodedChars(from);
+                            }
+                        }, toKey.equals(NodeDocument.MAX_ID_VALUE) ? 
ENTRIES_OPEN : ENTRIES_SCOPED, FPP);
+                    }
+                }
+            }
+            return filter;
+        }
+
+        public QueryContext(String fromKey, String toKey) {
+            this.fromKey = fromKey;
+            this.toKey = toKey;
+        }
+
+        public void addKey(String key) {
+            if (fromKey.compareTo(key) < 0 && toKey.compareTo(key) > 0) {
+                getFilter().put(key);
+            }
+        }
+
+        public void addKeys(List<String> keys) {
+            for (String key: keys) {
+                addKey(key);
+            }
+        }
+
+        public boolean mayUpdate(String key) {
+            return filter == null ? true : !getFilter().mightContain(key);
+        }
+
+        synchronized public void dispose() {
+            if (LOG.isDebugEnabled()) {
+                if (filter != null) {
+                    LOG.debug("Disposing QueryContext for range " + fromKey + 
"..." + toKey + " - filter fpp was: "
+                            + filter.expectedFpp());
+                } else {
+                    LOG.debug("Disposing QueryContext for range " + fromKey + 
"..." + toKey + " - no filter was needed");
+                }
+            }
+        }
+    }
+
+    private Map<Thread, QueryContext> qmap = new ConcurrentHashMap<Thread, 
QueryContext>();
+
     private <T extends Document> List<T> internalQuery(Collection<T> 
collection, String fromKey, String toKey,
             String indexedProperty, long startValue, int limit) {
         Connection connection = null;
@@ -1094,6 +1179,8 @@ public class RDBDocumentStore implements
         }
         try {
             long now = System.currentTimeMillis();
+            QueryContext qp = new QueryContext(fromKey, toKey);
+            qmap.put(Thread.currentThread(), qp);
             connection = this.ch.getROConnection();
             List<RDBRow> dbresult = dbQuery(connection, tableName, fromKey, 
toKey, indexedProperty, startValue, limit);
             connection.commit();
@@ -1102,14 +1189,16 @@ public class RDBDocumentStore implements
             List<T> result = new ArrayList<T>(size);
             for (int i = 0; i < size; i++) {
                 RDBRow row = dbresult.set(i, null); // free RDBRow ASAP
-                T doc = runThroughCache(collection, row, now);
+                T doc = runThroughCache(collection, row, now, qp);
                 result.add(doc);
             }
+            qp.dispose();
             return result;
         } catch (Exception ex) {
             LOG.error("SQL exception on query", ex);
             throw new DocumentStoreException(ex);
         } finally {
+            qmap.remove(Thread.currentThread());
             this.ch.closeConnection(connection);
         }
     }
@@ -1148,7 +1237,7 @@ public class RDBDocumentStore implements
                     cachedDoc.markUpToDate(System.currentTimeMillis());
                     return castAsT(cachedDoc);
                 } else {
-                    return SR.fromRow(collection, row);
+                    return convertFromDBObject(collection, row);
                 }
             }
         } catch (Exception ex) {
@@ -1972,11 +2061,17 @@ public class RDBDocumentStore implements
         }
     }
 
-    private <T extends Document> T runThroughCache(Collection<T> collection, 
RDBRow row, long now) {
+    @Nonnull
+    protected <T extends Document> T convertFromDBObject(@Nonnull 
Collection<T> collection, @Nonnull RDBRow row) {
+        // this method is present here in order to facilitate unit testing for 
OAK-3566
+        return SR.fromRow(collection, row);
+    }
+
+    private <T extends Document> T runThroughCache(Collection<T> collection, 
RDBRow row, long now, QueryContext qp) {
 
         if (collection != Collection.NODES) {
             // not in the cache anyway
-            return SR.fromRow(collection, row);
+            return convertFromDBObject(collection, row);
         }
 
         String id = row.getId();
@@ -1999,9 +2094,13 @@ public class RDBDocumentStore implements
             }
         }
 
-        NodeDocument fresh = (NodeDocument) SR.fromRow(collection, row);
+        NodeDocument fresh = (NodeDocument) convertFromDBObject(collection, 
row);
         fresh.seal();
 
+        if (!qp.mayUpdate(id)) {
+            return castAsT(fresh);
+        }
+
         Lock lock = getAndLock(id);
         try {
             inCache = nodesCache.getIfPresent(cacheKey);
@@ -2039,5 +2138,8 @@ public class RDBDocumentStore implements
         }
         return false;
     }
-    
+
+    protected Cache<CacheValue, NodeDocument> getNodeDocumentCache() {
+        return nodesCache;
+    }
 }


Reply via email to