Author: thomasm
Date: Tue Feb 16 10:39:18 2016
New Revision: 1730650

URL: http://svn.apache.org/viewvc?rev=1730650&view=rev
Log:
OAK-2761 Persistent cache: add data in a different thread

Added:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1730650&r1=1730649&r2=1730650&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
 Tue Feb 16 10:39:18 2016
@@ -17,11 +17,17 @@
 package org.apache.jackrabbit.oak.plugins.document;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.cache.RemovalCause.COLLECTED;
+import static com.google.common.cache.RemovalCause.EXPIRED;
+import static com.google.common.cache.RemovalCause.SIZE;
 
 import java.io.InputStream;
+import java.lang.ref.Reference;
 import java.net.UnknownHostException;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
@@ -32,12 +38,16 @@ import javax.sql.DataSource;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.cache.Weigher;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.mongodb.DB;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.cache.CacheLIRS.EvictionCallback;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.cache.CacheValue;
 import org.apache.jackrabbit.oak.cache.EmpiricalWeigher;
@@ -59,6 +69,7 @@ import org.apache.jackrabbit.oak.plugins
 import 
org.apache.jackrabbit.oak.plugins.document.mongo.MongoMissingLastRevSeeker;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoVersionGCSupport;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.EvictionListener;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
 import org.apache.jackrabbit.oak.plugins.document.rdb.RDBBlobStore;
 import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
@@ -1008,19 +1019,24 @@ public class DocumentMK {
             return new NodeDocumentCache(nodeDocumentsCache, 
nodeDocumentsCacheStats, prevDocumentsCache, prevDocumentsCacheStats, locks);
         }
 
+        @SuppressWarnings("unchecked")
         private <K extends CacheValue, V extends CacheValue> Cache<K, V> 
buildCache(
                 CacheType cacheType,
                 long maxWeight,
                 DocumentNodeStore docNodeStore,
                 DocumentStore docStore
                 ) {
-            Cache<K, V> cache = buildCache(cacheType.name(), maxWeight);
+            Set<EvictionListener<K, V>> listeners = new 
CopyOnWriteArraySet<EvictionListener<K,V>>();
+            Cache<K, V> cache = buildCache(cacheType.name(), maxWeight, 
listeners);
             PersistentCache p = getPersistentCache();
             if (p != null) {
                 if (docNodeStore != null) {
                     docNodeStore.setPersistentCache(p);
                 }
                 cache = p.wrap(docNodeStore, docStore, cache, cacheType);
+                if (cache instanceof EvictionListener) {
+                    listeners.add((EvictionListener<K, V>) cache);
+                }
             }
             return cache;
         }
@@ -1042,7 +1058,8 @@ public class DocumentMK {
         
         private <K extends CacheValue, V extends CacheValue> Cache<K, V> 
buildCache(
                 String module,
-                long maxWeight) {
+                long maxWeight,
+                final Set<EvictionListener<K, V>> listeners) {
             // by default, use the LIRS cache when using the persistent cache,
             // but don't use it otherwise
             boolean useLirs = persistentCacheURI != null;
@@ -1064,6 +1081,14 @@ public class DocumentMK {
                         segmentCount(cacheSegmentCount).
                         stackMoveDistance(cacheStackMoveDistance).
                         recordStats().
+                        evictionCallback(new EvictionCallback<K, V>() {
+                            @Override
+                            public void evicted(K key, V value, RemovalCause 
cause) {
+                                for (EvictionListener<K, V> l : listeners) {
+                                    l.evicted(key, value, cause);
+                                }
+                            }
+                        }).
                         build();
             }
             return CacheBuilder.newBuilder().
@@ -1071,6 +1096,14 @@ public class DocumentMK {
                     weigher(weigher).
                     maximumWeight(maxWeight).
                     recordStats().
+                    removalListener(new RemovalListener<K, V>() {
+                        @Override
+                        public void onRemoval(RemovalNotification<K, V> 
notification) {
+                            for (EvictionListener<K, V> l : listeners) {
+                                l.evicted(notification.getKey(), 
notification.getValue(), notification.getCause());
+                            }
+                        }
+                    }).
                     build();
         }
 

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java?rev=1730650&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java
 Tue Feb 16 10:39:18 2016
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+
+import com.google.common.cache.RemovalCause;
+
+/**
+ * A listener that gets notified of entries that were removed from the cache.
+ * 
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface EvictionListener<K, V> {
+
+    void evicted(K key, V value, RemovalCause removalCause);
+
+}

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java?rev=1730650&r1=1730649&r2=1730650&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
 Tue Feb 16 10:39:18 2016
@@ -16,8 +16,14 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.persistentCache;
 
+import static com.google.common.cache.RemovalCause.COLLECTED;
+import static com.google.common.cache.RemovalCause.EXPIRED;
+import static com.google.common.cache.RemovalCause.SIZE;
+import static java.util.Collections.singleton;
+
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -27,6 +33,8 @@ import javax.annotation.Nullable;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache.GenerationCache;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
 import org.h2.mvstore.MVMap;
 import org.h2.mvstore.WriteBuffer;
 import org.h2.mvstore.type.DataType;
@@ -34,22 +42,29 @@ import org.h2.mvstore.type.DataType;
 import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheStats;
+import com.google.common.cache.RemovalCause;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+class NodeCache<K, V> implements Cache<K, V>, GenerationCache, 
EvictionListener<K, V> {
+
+    private static final Set<RemovalCause> EVICTION_CAUSES = 
ImmutableSet.of(COLLECTED, EXPIRED, SIZE);
 
-class NodeCache<K, V> implements Cache<K, V>, GenerationCache {
-    
     private final PersistentCache cache;
     private final Cache<K, V> memCache;
     private final MultiGenerationMap<K, V> map;
     private final CacheType type;
     private final DataType keyType;
     private final DataType valueType;
-    
+    private final CacheWriteQueue<K, V> writerQueue;
+
     NodeCache(
             PersistentCache cache,
             Cache<K, V> memCache,
             DocumentNodeStore docNodeStore, 
-            DocumentStore docStore, CacheType type) {
+            DocumentStore docStore,
+            CacheType type,
+            CacheActionDispatcher dispatcher) {
         this.cache = cache;
         this.memCache = memCache;
         this.type = type;
@@ -57,6 +72,7 @@ class NodeCache<K, V> implements Cache<K
         map = new MultiGenerationMap<K, V>();
         keyType = new KeyDataType(type);
         valueType = new ValueDataType(docNodeStore, docStore, type);
+        this.writerQueue = new CacheWriteQueue<K, V>(dispatcher, cache, map);
     }
     
     @Override
@@ -82,47 +98,31 @@ class NodeCache<K, V> implements Cache<K
     }
     
     private V readIfPresent(K key) {
+        if (writerQueue.waitsForInvalidation(key)) {
+            return null;
+        }
         cache.switchGenerationIfNeeded();
         V v = map.get(key);
         return v;
     }
-    
-    private void write(final K key, final V value) {
-        write(key, value, true);
-    }
 
-    private void writeWithoutBroadcast(final K key, final V value) {
-        write(key, value, false);
-    }
-
-    private void write(final K key, final V value, boolean broadcast) {
-        cache.switchGenerationIfNeeded();
-        if (broadcast) {
-            cache.broadcast(type, new Function<WriteBuffer, Void>() {
-                @Override
-                @Nullable
-                public Void apply(@Nullable WriteBuffer buffer) {
-                    keyType.write(buffer, key);
-                    if (value == null) {
-                        buffer.put((byte) 0);
-                    } else {
-                        buffer.put((byte) 1);
-                        valueType.write(buffer, value);
-                    }
-                    return null;
+    private void broadcast(final K key, final V value) {
+        cache.broadcast(type, new Function<WriteBuffer, Void>() {
+            @Override
+            @Nullable
+            public Void apply(@Nullable WriteBuffer buffer) {
+                keyType.write(buffer, key);
+                if (value == null) {
+                    buffer.put((byte) 0);
+                } else {
+                    buffer.put((byte) 1);
+                    valueType.write(buffer, value);
                 }
-            });
-        }
-        MultiGenerationMap<K, V> m = map;
-        if (m != null) {
-            if (value == null) {
-                m.remove(key);
-            } else {
-                m.put(key, value);
+                return null;
             }
-        }
+        });
     }
-    
+
     @SuppressWarnings("unchecked")
     @Override
     @Nullable
@@ -147,7 +147,7 @@ class NodeCache<K, V> implements Cache<K
             return value;
         }
         value = memCache.get(key, valueLoader);
-        write(key, value);
+        broadcast(key, value);
         return value;
     }
 
@@ -160,14 +160,15 @@ class NodeCache<K, V> implements Cache<K
     @Override
     public void put(K key, V value) {
         memCache.put(key, value);
-        write(key, value);
+        broadcast(key, value);
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public void invalidate(Object key) {
         memCache.invalidate(key);
-        write((K) key, (V) null);
+        writerQueue.addInvalidate(singleton((K) key));
+        broadcast((K) key, null);
     }
 
     @Override
@@ -218,7 +219,17 @@ class NodeCache<K, V> implements Cache<K
             value = (V) valueType.read(buff);
             memCache.put(key, value);
         }
-        writeWithoutBroadcast(key, value);
+    }
+
+    /**
+     * Invoked on the eviction from the {@link #memCache}
+     */
+    @Override
+    public void evicted(K key, V value, RemovalCause cause) {
+        if (EVICTION_CAUSES.contains(cause) && value != null) { 
+            // invalidations are handled separately
+            writerQueue.addPut(key, value);
+        }
     }
 
 }
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1730650&r1=1730649&r2=1730650&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
 Tue Feb 16 10:39:18 2016
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
 import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
@@ -82,6 +83,8 @@ public class PersistentCache implements
     private ThreadLocal<WriteBuffer> writeBuffer = new 
ThreadLocal<WriteBuffer>();
     private final byte[] broadcastId;
     private DynamicBroadcastConfig broadcastConfig;
+    private CacheActionDispatcher writeDispatcher;
+    private Thread writeDispatcherThread;
     
     {
         ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
@@ -192,6 +195,11 @@ public class PersistentCache implements
         }
         writeStore = createMapFactory(writeGeneration, false);
         initBroadcast(broadcast);
+
+        writeDispatcher = new CacheActionDispatcher();
+        writeDispatcherThread = new Thread(writeDispatcher, "Oak 
CacheWriteQueue");
+        writeDispatcherThread.setDaemon(true);
+        writeDispatcherThread.start();
     }
     
     private void initBroadcast(String broadcast) {
@@ -338,6 +346,13 @@ public class PersistentCache implements
     }
     
     public void close() {
+        writeDispatcher.stop();
+        try {
+            writeDispatcherThread.join();
+        } catch (InterruptedException e) {
+            LOG.error("Can't join the {}", writeDispatcherThread.getName(), e);
+        }
+
         if (writeStore != null) {
             writeStore.closeStore();
         }
@@ -395,7 +410,7 @@ public class PersistentCache implements
         }
         if (wrap) {
             NodeCache<K, V> c = new NodeCache<K, V>(this, 
-                    base, docNodeStore, docStore, type);
+                    base, docNodeStore, docStore, type, writeDispatcher);
             initGenerationCache(c);
             return c;
         }

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java?rev=1730650&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
 Tue Feb 16 10:39:18 2016
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+/**
+ * Object represents an action on the cache (eg. put or invalidate).
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+interface CacheAction<K, V> {
+
+    /**
+     * Execute the action
+     */
+    void execute();
+
+    /**
+     * Cancel the action without executing it
+     */
+    void cancel();
+
+    /**
+     * Return the keys affected by this action
+     *
+     * @return keys affected by this action
+     */
+    Iterable<K> getAffectedKeys();
+
+    /**
+     * Return the owner of this action
+     *
+     * @return {@link CacheWriteQueue} executing this action
+     */
+    CacheWriteQueue<K, V> getOwner();
+
+}

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java?rev=1730650&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
 Tue Feb 16 10:39:18 2016
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import static com.google.common.collect.Multimaps.index;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+/**
+ * An asynchronous buffer of the CacheAction objects. The buffer removes
+ * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than
+ * {@link #MAX_SIZE}.
+ */
+public class CacheActionDispatcher implements Runnable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CacheActionDispatcher.class);
+
+    /**
+     * What's the length of the queue.
+     */
+    static final int MAX_SIZE = 1024;
+
+    /**
+     * How many actions remove once the queue is longer than {@link #MAX_SIZE}.
+     */
+    static final int ACTIONS_TO_REMOVE = 256;
+
+    final BlockingQueue<CacheAction<?, ?>> queue = new 
ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE * 2);
+
+    private volatile boolean isRunning = true;
+
+    @Override
+    public void run() {
+        while (isRunning) {
+            try {
+                CacheAction<?, ?> action = queue.poll(10, 
TimeUnit.MILLISECONDS);
+                if (action != null && isRunning) {
+                    action.execute();
+                }
+            } catch (InterruptedException e) {
+                LOG.debug("Interrupted the queue.poll()", e);
+            }
+        }
+        applyInvalidateActions();
+    }
+
+    /**
+     * Stop the processing.
+     */
+    public void stop() {
+        isRunning = false;
+    }
+
+    /**
+     * Adds the new action and cleans the queue if necessary.
+     *
+     * @param action to be added
+     */
+    synchronized void add(CacheAction<?, ?> action) {
+        if (queue.size() >= MAX_SIZE) {
+            cleanTheQueue();
+        }
+        queue.offer(action);
+    }
+
+    /**
+     * Clean the queue and add a single invalidate action for all the removed 
entries. 
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private void cleanTheQueue() {
+        List<CacheAction> removed = removeOldest();
+        for (Entry<CacheWriteQueue, Collection<CacheAction>> e : 
groupByOwner(removed).entrySet()) {
+            CacheWriteQueue owner = e.getKey();
+            Collection<CacheAction> actions = e.getValue();
+            List<Object> affectedKeys = cancelAll(actions);
+            owner.addInvalidate(affectedKeys);
+        }
+    }
+
+    /**
+     * Remove {@link #ACTIONS_TO_REMOVE} oldest actions.
+     *
+     * @return A list of removed items.
+     */
+    @SuppressWarnings("rawtypes")
+    private List<CacheAction> removeOldest() {
+        List<CacheAction> removed = new ArrayList<CacheAction>();
+        while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) {
+            CacheAction toBeCanceled = queue.poll();
+            if (toBeCanceled == null) {
+                break;
+            } else {
+                removed.add(toBeCanceled);
+            }
+        }
+        return removed;
+    }
+
+    /**
+     * Group passed actions by their owners.
+     *
+     * @param actions to be grouped
+     * @return map in which owner is the key and assigned action list is the 
value
+     */
+    @SuppressWarnings("rawtypes")
+    private static Map<CacheWriteQueue, Collection<CacheAction>> 
groupByOwner(List<CacheAction> actions) {
+        return index(actions, new Function<CacheAction, CacheWriteQueue>() {
+            @Override
+            public CacheWriteQueue apply(CacheAction input) {
+                return input.getOwner();
+            }
+        }).asMap();
+    }
+
+    /**
+     * Cancel all passed actions.
+     *
+     * @param actions to cancel
+     * @return list of affected keys
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private static List<Object> cancelAll(Collection<CacheAction> actions) {
+        List<Object> cancelledKeys = new ArrayList<Object>();
+        for (CacheAction action : actions) {
+            action.cancel();
+            Iterables.addAll(cancelledKeys, action.getAffectedKeys());
+        }
+        return cancelledKeys;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void applyInvalidateActions() {
+        CacheAction action;
+        do {
+            action = queue.poll();
+            if (action instanceof InvalidateCacheAction) {
+                action.execute();
+            }
+        } while (action != null);
+    }
+
+}
\ No newline at end of file

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java?rev=1730650&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
 Tue Feb 16 10:39:18 2016
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+
+/**
+ * A fronted for the {@link CacheActionDispatcher} creating actions and 
maintaining their state.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class CacheWriteQueue<K, V> {
+
+    private final CacheActionDispatcher dispatcher;
+
+    private final PersistentCache cache;
+
+    private final Map<K, V> map;
+
+    final Multiset<K> queuedKeys = HashMultiset.create();
+
+    final Set<K> waitsForInvalidation = new HashSet<K>();
+
+    public CacheWriteQueue(CacheActionDispatcher dispatcher, PersistentCache 
cache, Map<K, V> map) {
+        this.dispatcher = dispatcher;
+        this.cache = cache;
+        this.map = map;
+    }
+
+    /**
+     * Add new invalidate action.
+     *
+     * @param keys to be invalidated
+     */
+    public void addInvalidate(Iterable<K> keys) {
+        synchronized(this) {
+            for (K key : keys) {
+                queuedKeys.add(key);
+                waitsForInvalidation.add(key);
+            }
+        }
+        dispatcher.add(new InvalidateCacheAction<K, V>(this, keys));
+    }
+
+    /**
+     * Add new put action
+     *
+     * @param key to be put to cache
+     * @param value to be put to cache
+     */
+    public void addPut(K key, V value) {
+        synchronized(this) {
+            queuedKeys.add(key);
+            waitsForInvalidation.remove(key);
+        }
+        dispatcher.add(new PutToCacheAction<K, V>(this, key, value));
+    }
+
+    /**
+     * Check if the last action added for this key was invalidate
+     *
+     * @param key to check 
+     * @return {@code true} if the last added action was invalidate
+     */
+    public synchronized boolean waitsForInvalidation(K key) {
+        return waitsForInvalidation.contains(key);
+    }
+
+    /**
+     * Remove the action state when it's finished or cancelled.
+     *
+     * @param key to be removed
+     */
+    synchronized void remove(K key) {
+        queuedKeys.remove(key);
+        if (!queuedKeys.contains(key)) {
+            waitsForInvalidation.remove(key);
+        }
+    }
+
+    PersistentCache getCache() {
+        return cache;
+    }
+
+    Map<K, V> getMap() {
+        return map;
+    }
+}
\ No newline at end of file

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java?rev=1730650&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
 Tue Feb 16 10:39:18 2016
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import java.util.Map;
+
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+
+/**
+ * An invalidate cache action.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+class InvalidateCacheAction<K, V> implements CacheAction<K, V> {
+
+    private final PersistentCache cache;
+
+    private final Map<K, V> map;
+
+    private final CacheWriteQueue<K, V> owner;
+
+    private final Iterable<K> keys;
+
+    InvalidateCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, Iterable<K> 
keys) {
+        this.owner = cacheWriteQueue;
+        this.keys = keys;
+        this.cache = cacheWriteQueue.getCache();
+        this.map = cacheWriteQueue.getMap();
+    }
+
+    @Override
+    public void execute() {
+        try {
+            if (map != null) {
+                for (K key : keys) {
+                    cache.switchGenerationIfNeeded();
+                    map.remove(key);
+                }
+            }
+        } finally {
+            decrement();
+        }
+    }
+
+    @Override
+    public void cancel() {
+        decrement();
+    }
+
+    @Override
+    public CacheWriteQueue<K, V> getOwner() {
+        return owner;
+    }
+
+    @Override
+    public Iterable<K> getAffectedKeys() {
+        return keys;
+    }
+
+    private void decrement() {
+        for (K key : keys) {
+            owner.remove(key);
+        }
+    }
+}
\ No newline at end of file

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java?rev=1730650&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
 Tue Feb 16 10:39:18 2016
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import static java.util.Collections.singleton;
+
+import java.util.Map;
+
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+
+/**
+ * Put to cache action
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+class PutToCacheAction<K, V> implements CacheAction<K, V> {
+
+    private final PersistentCache cache;
+
+    private final Map<K, V> map;
+
+    private final CacheWriteQueue<K, V> owner;
+
+    private final K key;
+
+    private final V value;
+
+    PutToCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, K key, V value) {
+        this.owner = cacheWriteQueue;
+        this.key = key;
+        this.value = value;
+        this.cache = cacheWriteQueue.getCache();
+        this.map = cacheWriteQueue.getMap();
+    }
+
+    @Override
+    public void execute() {
+        try {
+            if (map != null) {
+                cache.switchGenerationIfNeeded();
+                map.put(key, value);
+            }
+        } finally {
+            decrement();
+        }
+    }
+
+    @Override
+    public void cancel() {
+        decrement();
+    }
+
+    @Override
+    public CacheWriteQueue<K, V> getOwner() {
+        return owner;
+    }
+
+    @Override
+    public Iterable<K> getAffectedKeys() {
+        return singleton(key);
+    }
+
+    private void decrement() {
+        owner.remove(key);
+    }
+}
\ No newline at end of file

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java?rev=1730650&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java
 Tue Feb 16 10:39:18 2016
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.io.File;
+
+import com.google.common.cache.Cache;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNull;
+
+public class AsyncCacheTest {
+
+    @Test
+    public void invalidateWhileInQueue() throws Exception {
+        FileUtils.deleteDirectory(new File("target/cacheTest"));
+        DocumentMK.Builder builder = new DocumentMK.Builder();
+        builder.setPersistentCache("target/cacheTest");
+        Cache<PathRev, DocumentNodeState.Children> cache = 
builder.buildChildrenCache();
+        DocumentNodeState.Children c = new DocumentNodeState.Children();
+        for (int i = 0; i < 100; i++) {
+            c.children.add("node-" + i);
+        }
+        PathRev key = null;
+        for (int i = 0; i < 1000; i++) {
+            key = new PathRev("/foo/bar", new RevisionVector(new Revision(i, 
0, 1)));
+            cache.put(key, c);
+        }
+        cache.invalidate(key);
+        // give the write queue some time to write back entries
+        Thread.sleep(200);
+        assertNull(cache.getIfPresent(key));
+        builder.getPersistentCache().close();
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java?rev=1730650&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
 Tue Feb 16 10:39:18 2016
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.collect.Iterables.size;
+import static java.lang.String.valueOf;
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+import static 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE;
+import static 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.InvalidateCacheAction;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class CacheActionDispatcherTest {
+
+    @Test
+    public void testMaxQueueSize() {
+        CacheActionDispatcher dispatcher = new CacheActionDispatcher();
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        CacheWriteQueue<String, Object> queue = new 
CacheWriteQueue(dispatcher, mock(PersistentCache.class), null);
+
+        for (int i = 0; i < MAX_SIZE + 10; i++) {
+            dispatcher.add(createWriteAction(valueOf(i), queue));
+        }
+        assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1, 
dispatcher.queue.size());
+        assertEquals(valueOf(ACTIONS_TO_REMOVE), 
dispatcher.queue.peek().toString());
+
+        InvalidateCacheAction<?, ?> invalidateAction = null;
+        for (CacheAction<?, ?> action : dispatcher.queue) {
+            if (action instanceof InvalidateCacheAction) {
+                invalidateAction = (InvalidateCacheAction<?, ?>) action;
+            }
+        }
+        assertNotNull(invalidateAction);
+        assertEquals(ACTIONS_TO_REMOVE, 
size(invalidateAction.getAffectedKeys()));
+    }
+
+    @Test
+    public void testQueue() throws InterruptedException {
+        int threads = 5;
+        int actionsPerThread = 100;
+
+        @SuppressWarnings("unchecked")
+        CacheWriteQueue<String, Object> queue = 
Mockito.mock(CacheWriteQueue.class);
+        final CacheActionDispatcher dispatcher = new CacheActionDispatcher();
+        Thread queueThread = new Thread(dispatcher);
+        queueThread.start();
+
+        List<DummyCacheWriteAction> allActions = new 
ArrayList<DummyCacheWriteAction>();
+        List<Thread> producerThreads = new ArrayList<Thread>();
+        for (int i = 0; i < threads; i++) {
+            final List<DummyCacheWriteAction> threadActions = new 
ArrayList<DummyCacheWriteAction>();
+            for (int j = 0; j < actionsPerThread; j++) {
+                DummyCacheWriteAction action = new 
DummyCacheWriteAction(String.format("%d_%d", i, j), queue);
+                threadActions.add(action);
+                allActions.add(action);
+            }
+            Thread t = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    for (DummyCacheWriteAction a : threadActions) {
+                        dispatcher.add(a);
+                    }
+                }
+            });
+            producerThreads.add(t);
+        }
+
+        for (Thread t : producerThreads) {
+            t.start();
+        }
+        for (Thread t : producerThreads) {
+            t.join();
+        }
+
+        long start = currentTimeMillis();
+        while (!allActions.isEmpty()) {
+            Iterator<DummyCacheWriteAction> it = allActions.iterator();
+            while (it.hasNext()) {
+                if (it.next().finished) {
+                    it.remove();
+                }
+            }
+            if (currentTimeMillis() - start > 10000) {
+                fail("Following actions hasn't been executed: " + allActions);
+            }
+        }
+
+        dispatcher.stop();
+        queueThread.join();
+        assertFalse(queueThread.isAlive());
+    }
+
+    @Test
+    public void testExecuteInvalidatesOnShutdown() throws InterruptedException 
{
+        Map<String, Object> cacheMap = new HashMap<String, Object>();
+        CacheActionDispatcher dispatcher = new CacheActionDispatcher();
+        CacheWriteQueue<String, Object> queue = new CacheWriteQueue<String, 
Object>(dispatcher,
+                Mockito.mock(PersistentCache.class), cacheMap);
+        Thread queueThread = new Thread(dispatcher);
+        queueThread.start();
+
+        cacheMap.put("2", new Object());
+        cacheMap.put("3", new Object());
+        cacheMap.put("4", new Object());
+        dispatcher.add(new DummyCacheWriteAction("1", queue, 100));
+        dispatcher.add(new InvalidateCacheAction<String, Object>(queue, 
Collections.singleton("2")));
+        dispatcher.add(new InvalidateCacheAction<String, Object>(queue, 
Collections.singleton("3")));
+        dispatcher.add(new InvalidateCacheAction<String, Object>(queue, 
Collections.singleton("4")));
+        Thread.sleep(10); // make sure the first action started
+
+        dispatcher.stop();
+        assertEquals(of("2", "3", "4"), cacheMap.keySet());
+
+        queueThread.join();
+        assertTrue(cacheMap.isEmpty());
+    }
+
+    private DummyCacheWriteAction createWriteAction(String id, 
CacheWriteQueue<String, Object> queue) {
+        return new DummyCacheWriteAction(id, queue);
+    }
+
+    private class DummyCacheWriteAction implements CacheAction<String, Object> 
{
+
+        private final CacheWriteQueue<String, Object> queue;
+
+        private final String id;
+
+        private final long delay;
+
+        private volatile boolean finished;
+
+        private DummyCacheWriteAction(String id, CacheWriteQueue<String, 
Object> queue) {
+            this(id, queue, new Random().nextInt(10));
+        }
+
+        private DummyCacheWriteAction(String id, CacheWriteQueue<String, 
Object> queue, long delay) {
+            this.queue = queue;
+            this.id = id;
+            this.delay = delay;
+        }
+
+        @Override
+        public void execute() {
+            try {
+                sleep(delay);
+            } catch (InterruptedException e) {
+                fail("Interrupted");
+            }
+            finished = true;
+        }
+
+        @Override
+        public void cancel() {
+        }
+
+        @Override
+        public String toString() {
+            return id;
+        }
+
+        @Override
+        public Iterable<String> getAffectedKeys() {
+            return Collections.singleton(id);
+        }
+
+        @Override
+        public CacheWriteQueue<String, Object> getOwner() {
+            return queue;
+        }
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java?rev=1730650&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
 Tue Feb 16 10:39:18 2016
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class CacheWriteQueueTest {
+
+    private CacheWriteQueue<String, Object> queue;
+
+    @SuppressWarnings("rawtypes")
+    private List<CacheAction> actions = Collections.synchronizedList(new 
ArrayList<CacheAction>());
+
+    @Before
+    public void initQueue() {
+        actions.clear();
+
+        CacheActionDispatcher dispatcher = new CacheActionDispatcher() {
+            public void add(CacheAction<?, ?> action) {
+                actions.add(action);
+            }
+        };
+
+        PersistentCache cache = Mockito.mock(PersistentCache.class);
+        queue = new CacheWriteQueue<String, Object>(dispatcher, cache, null);
+    }
+
+    @Test
+    public void testCounters() throws InterruptedException {
+        final int threadCount = 10;
+        final int actionsPerThread = 50;
+
+        final Map<String, AtomicInteger> counters = new HashMap<String, 
AtomicInteger>();
+        for (int i = 0; i < 10; i++) {
+            String key = "key_" + i;
+            counters.put(key, new AtomicInteger());
+        }
+
+        final Random random = new Random();
+        List<Thread> threads = new ArrayList<Thread>();
+        for (int i = 0; i < threadCount; i++) {
+            Thread t = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    for (int j = 0; j < actionsPerThread; j++) {
+                        for (String key : counters.keySet()) {
+                            if (random.nextBoolean()) {
+                                queue.addPut(key, null);
+                            } else {
+                                queue.addPut(key, new Object());
+                            }
+                            counters.get(key).incrementAndGet();
+                        }
+                    }
+                }
+            });
+            threads.add(t);
+        }
+
+        for (Thread t : threads) {
+            t.start();
+        }
+        for (Thread t : threads) {
+            t.join();
+        }
+        for (String key : counters.keySet()) {
+            assertEquals(queue.queuedKeys.count(key), counters.get(key).get());
+        }
+
+        for (CacheAction<?, ?> action : actions) {
+            if (random.nextBoolean()) {
+                action.execute();
+            } else {
+                action.cancel();
+            }
+        }
+
+        assertTrue(queue.queuedKeys.isEmpty());
+        assertTrue(queue.waitsForInvalidation.isEmpty());
+    }
+
+    @Test
+    public void testWaitsForInvalidation() {
+        assertFalse(queue.waitsForInvalidation("key"));
+
+        queue.addInvalidate(singleton("key"));
+        assertTrue(queue.waitsForInvalidation("key"));
+
+        queue.addPut("key", new Object());
+        assertFalse(queue.waitsForInvalidation("key"));
+
+        queue.addInvalidate(singleton("key"));
+        assertTrue(queue.waitsForInvalidation("key"));
+
+        int i;
+        for (i = 0; i < actions.size() - 1; i++) {
+            actions.get(i).execute();
+            assertTrue(queue.waitsForInvalidation("key"));
+        }
+
+        actions.get(i).execute();
+        assertFalse(queue.waitsForInvalidation("key"));
+    }
+
+}
\ No newline at end of file


Reply via email to