Author: tomekr
Date: Tue Nov 29 09:52:19 2016
New Revision: 1771871

URL: http://svn.apache.org/viewvc?rev=1771871&view=rev
Log:
OAK-5147: Backport asynchronous queue improvements to 1.2

Added:
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
    
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
Removed:
    
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
Modified:
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
    
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
    
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java

Added: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java?rev=1771871&view=auto
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
 (added)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
 Tue Nov 29 09:52:19 2016
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.collect.Maps.newConcurrentMap;
+
+/**
+ * In order to avoid leaking values from the metadataMap, following order 
should
+ * be maintained for combining the cache and CacheMetadata:
+ *
+ * 1. For remove(), removeAll() and clear():
+ *
+ * - cache.invalidate()
+ * - metadata.remove()
+ *
+ * 2. For put(), putAll() and putFromPersistenceAndIncrement():
+ *
+ * - metadata.put()
+ * - cache.put()
+ *
+ * 3. For increment():
+ *
+ * - metadata.increment()
+ * - cache.get()
+ * - (metadata.remove() if value doesn't exists in cache)
+ *
+ * 4. For incrementAll():
+ *
+ * - metadata.incrementAll()
+ * - cache.getAll()
+ * - (metadata.removeAll() on keys that returned nulls)
+ *
+ * Preserving this order will allow to avoid leaked values in the metadata 
without
+ * an extra synchronization between cache and metadata operations. This 
strategy
+ * is a best-effort option - it may happen that cache values won't have their
+ * metadata entries.
+ */
+public class CacheMetadata<K> {
+
+    private final ConcurrentMap<K, MetadataEntry> metadataMap = 
newConcurrentMap();
+
+    private boolean enabled = true;
+
+    boolean isEnabled() {
+        return enabled;
+    }
+
+    void disable() {
+        this.enabled = false;
+    }
+
+    void put(K key) {
+        if (!enabled) {
+            return;
+        }
+        getOrCreate(key, false);
+    }
+
+    void putFromPersistenceAndIncrement(K key) {
+        if (!enabled) {
+            return;
+        }
+        getOrCreate(key, true).incrementCount();
+    }
+
+    void increment(K key) {
+        if (!enabled) {
+            return;
+        }
+        getOrCreate(key, false).incrementCount();
+    }
+
+    MetadataEntry remove(Object key) {
+        if (!enabled) {
+            return null;
+        }
+        return metadataMap.remove(key);
+    }
+
+    void putAll(Iterable<?> keys) {
+        if (!enabled) {
+            return;
+        }
+        for (Object k : keys) {
+            getOrCreate((K) k, false);
+        }
+    }
+
+    void incrementAll(Iterable<?> keys) {
+        if (!enabled) {
+            return;
+        }
+        for (Object k : keys) {
+            getOrCreate((K) k, false).incrementCount();
+        }
+    }
+
+    void removeAll(Iterable<?> keys) {
+        if (!enabled) {
+            return;
+        }
+        for (Object k : keys) {
+            metadataMap.remove(k);
+        }
+    }
+
+    void clear() {
+        if (!enabled) {
+            return;
+        }
+        metadataMap.clear();
+    }
+
+    private MetadataEntry getOrCreate(K key, boolean readFromPersistentCache) {
+        if (!enabled) {
+            return null;
+        }
+        MetadataEntry metadata = metadataMap.get(key);
+        if (metadata == null) {
+            MetadataEntry newEntry = new 
MetadataEntry(readFromPersistentCache);
+            MetadataEntry oldEntry = metadataMap.putIfAbsent(key, newEntry);
+            metadata = oldEntry == null ? newEntry : oldEntry;
+        }
+        return metadata;
+    }
+
+
+    static class MetadataEntry {
+
+        private final AtomicLong accessCount = new AtomicLong();
+
+        private final boolean readFromPersistentCache;
+
+        private MetadataEntry(boolean readFromPersistentCache) {
+            this.readFromPersistentCache = readFromPersistentCache;
+        }
+
+        void incrementCount() {
+            accessCount.incrementAndGet();
+        }
+
+        long getAccessCount() {
+            return accessCount.get();
+        }
+
+        boolean isReadFromPersistentCache() {
+            return readFromPersistentCache;
+        }
+    }
+
+}

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
 Tue Nov 29 09:52:19 2016
@@ -50,15 +50,22 @@ public class MultiGenerationMap<K, V> im
     @SuppressWarnings("unchecked")
     @Override
     public V get(Object key) {
+        ValueWithGenerationInfo<V> value = readValue(key);
+        if (value == null) {
+            return null;
+        } else if (!value.isCurrentGeneration()) {
+            put((K) key, value.value);
+        }
+        return value.getValue();
+    }
+
+    ValueWithGenerationInfo<V> readValue(Object key) {
         for (int generation : read.descendingKeySet()) {
             CacheMap<K, V> m = read.get(generation);
             if (m != null) {
                 V value = m.get(key);
                 if (value != null) {
-                    if (m != write) {
-                        put((K) key, value);
-                    }
-                    return value;
+                    return new ValueWithGenerationInfo<V>(value, m == write);
                 }
             }
         }
@@ -123,4 +130,23 @@ public class MultiGenerationMap<K, V> im
         throw new UnsupportedOperationException();
     }
 
+    static class ValueWithGenerationInfo<V> {
+
+        private final V value;
+
+        private final boolean isCurrentGeneration;
+
+        private ValueWithGenerationInfo(V value, boolean isCurrentGeneration) {
+            this.value = value;
+            this.isCurrentGeneration = isCurrentGeneration;
+        }
+
+        V getValue() {
+            return value;
+        }
+
+        boolean isCurrentGeneration() {
+            return isCurrentGeneration;
+        }
+    }
 }

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
 Tue Nov 29 09:52:19 2016
@@ -16,9 +16,12 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.persistentCache;
 
+import static com.google.common.base.Predicates.in;
+import static com.google.common.base.Predicates.not;
 import static com.google.common.cache.RemovalCause.COLLECTED;
 import static com.google.common.cache.RemovalCause.EXPIRED;
 import static com.google.common.cache.RemovalCause.SIZE;
+import static com.google.common.collect.Iterables.filter;
 import static java.util.Collections.singleton;
 
 import java.util.Map;
@@ -42,51 +45,54 @@ import com.google.common.cache.CacheStat
 import com.google.common.cache.RemovalCause;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class NodeCache<K, V> implements Cache<K, V>, GenerationCache, 
EvictionListener<K, V> {
 
-    private static final Set<RemovalCause> EVICTION_CAUSES = 
ImmutableSet.of(COLLECTED, EXPIRED, SIZE);
+    static final Logger LOG = LoggerFactory.getLogger(NodeCache.class);
 
-    /**
-     * Whether to use the queue to put items into cache. Default: false (cache
-     * will be updated synchronously).
-     */
-    private static final boolean ASYNC_CACHE = 
Boolean.getBoolean("oak.cache.asynchronous");
+    private static final Set<RemovalCause> EVICTION_CAUSES = 
ImmutableSet.of(COLLECTED, EXPIRED, SIZE);
 
     private final PersistentCache cache;
     private final Cache<K, V> memCache;
     private final MultiGenerationMap<K, V> map;
     private final CacheType type;
-    private final DocumentNodeStore docNodeStore;
-    private final DocumentStore docStore;
-    private final CacheWriteQueue<K, V> writerQueue;
+    private final DataType keyType;
+    private final DataType valueType;
+    private final CacheMetadata<K> memCacheMetadata;
+    private final boolean async;
+    CacheWriteQueue<K, V> writeQueue;
 
     NodeCache(
             PersistentCache cache,
             Cache<K, V> memCache,
-            DocumentNodeStore docNodeStore, 
+            DocumentNodeStore docNodeStore,
             DocumentStore docStore,
             CacheType type,
-            CacheActionDispatcher dispatcher) {
+            CacheActionDispatcher dispatcher,
+            boolean async) {
         this.cache = cache;
         this.memCache = memCache;
         this.type = type;
-        this.docNodeStore = docNodeStore;
-        this.docStore = docStore;
+        this.async = async;
         PersistentCache.LOG.info("wrapping map " + this.type);
         map = new MultiGenerationMap<K, V>();
-
-        if (ASYNC_CACHE) {
-            this.writerQueue = new CacheWriteQueue<K, V>(dispatcher, cache, 
map);
+        keyType = new KeyDataType(type);
+        valueType = new ValueDataType(docNodeStore, docStore, type);
+        this.memCacheMetadata = new CacheMetadata<K>();
+        if (async) {
+            this.writeQueue = new CacheWriteQueue<K, V>(dispatcher, cache, 
map);
+            LOG.info("The persistent cache {} writes will be asynchronous", 
type);
         } else {
-            this.writerQueue = null;
+            this.writeQueue = null;
+            this.memCacheMetadata.disable();
+            LOG.info("The persistent cache {} writes will be synchronous", 
type);
         }
     }
-    
+
     @Override
     public void addGeneration(int generation, boolean readOnly) {
-        DataType keyType = new KeyDataType(type);
-        DataType valueType = new ValueDataType(docNodeStore, docStore, type);
         MVMap.Builder<K, V> b = new MVMap.Builder<K, V>().
                 keyType(keyType).valueType(valueType);
         String mapName = type.name();
@@ -96,21 +102,40 @@ class NodeCache<K, V> implements Cache<K
             map.setWriteMap(m);
         }
     }
-    
+
     @Override
     public void removeGeneration(int generation) {
         map.removeReadMap(generation);
     }
-    
+
     private V readIfPresent(K key) {
-        if (ASYNC_CACHE && writerQueue.waitsForInvalidation(key)) {
-            return null;
-        }
+        return async ? asyncReadIfPresent(key) : syncReadIfPresent(key);
+    }
+
+    private V syncReadIfPresent(K key) {
         cache.switchGenerationIfNeeded();
         V v = map.get(key);
+        if (v != null) {
+            memCacheMetadata.putFromPersistenceAndIncrement(key);
+        }
         return v;
     }
 
+    private V asyncReadIfPresent(K key) {
+            MultiGenerationMap.ValueWithGenerationInfo<V> v = 
map.readValue(key);
+            if (v == null) {
+                return null;
+            }
+            if (v.isCurrentGeneration() && !cache.needSwitch()) {
+                // don't persist again on eviction
+                memCacheMetadata.putFromPersistenceAndIncrement(key);
+            } else {
+                // persist again during eviction
+                memCacheMetadata.increment(key);
+            }
+            return v.getValue();
+    }
+
     private void write(final K key, final V value) {
         cache.switchGenerationIfNeeded();
         if (value == null) {
@@ -124,10 +149,15 @@ class NodeCache<K, V> implements Cache<K
     @Override
     @Nullable
     public V getIfPresent(Object key) {
+        memCacheMetadata.increment((K) key);
         V value = memCache.getIfPresent(key);
-        if (value != null) {
+        if (value == null) {
+            memCacheMetadata.remove(key);
+        } else {
             return value;
         }
+
+        // it takes care of updating memCacheMetadata
         value = readIfPresent((K) key);
         if (value != null) {
             memCache.put((K) key, value);
@@ -137,15 +167,17 @@ class NodeCache<K, V> implements Cache<K
 
     @Override
     public V get(K key,
-            Callable<? extends V> valueLoader)
+                 Callable<? extends V> valueLoader)
             throws ExecutionException {
         V value = getIfPresent(key);
         if (value != null) {
             return value;
         }
+
+        memCacheMetadata.increment(key);
         value = memCache.get(key, valueLoader);
-        if (!ASYNC_CACHE) {
-            write(key, value);
+        if (!async) {
+            write((K) key, value);
         }
         return value;
     }
@@ -153,14 +185,19 @@ class NodeCache<K, V> implements Cache<K
     @Override
     public ImmutableMap<K, V> getAllPresent(
             Iterable<?> keys) {
-        return memCache.getAllPresent(keys);
+        Iterable<K> typedKeys = (Iterable<K>) keys;
+        memCacheMetadata.incrementAll(keys);
+        ImmutableMap<K, V> result = memCache.getAllPresent(keys);
+        memCacheMetadata.removeAll(filter(typedKeys, 
not(in(result.keySet()))));
+        return result;
     }
 
     @Override
     public void put(K key, V value) {
+        memCacheMetadata.put(key);
         memCache.put(key, value);
-        if (!ASYNC_CACHE) {
-            write(key, value);
+        if (!async) {
+            write((K) key, value);
         }
     }
 
@@ -168,8 +205,9 @@ class NodeCache<K, V> implements Cache<K
     @Override
     public void invalidate(Object key) {
         memCache.invalidate(key);
-        if (ASYNC_CACHE) {
-            writerQueue.addInvalidate(singleton((K) key));
+        memCacheMetadata.remove(key);
+        if (async) {
+            writeQueue.addInvalidate(singleton((K) key));
         } else {
             write((K) key, null);
         }
@@ -177,17 +215,20 @@ class NodeCache<K, V> implements Cache<K
 
     @Override
     public void putAll(Map<? extends K, ? extends V> m) {
+        memCacheMetadata.putAll(m.keySet());
         memCache.putAll(m);
     }
 
     @Override
     public void invalidateAll(Iterable<?> keys) {
         memCache.invalidateAll(keys);
+        memCacheMetadata.removeAll(keys);
     }
 
     @Override
     public void invalidateAll() {
         memCache.invalidateAll();
+        memCacheMetadata.clear();
         map.clear();
     }
 
@@ -209,6 +250,7 @@ class NodeCache<K, V> implements Cache<K
     @Override
     public void cleanUp() {
         memCache.cleanUp();
+        memCacheMetadata.clear();
     }
 
     /**
@@ -216,9 +258,18 @@ class NodeCache<K, V> implements Cache<K
      */
     @Override
     public void evicted(K key, V value, RemovalCause cause) {
-        if (ASYNC_CACHE && EVICTION_CAUSES.contains(cause) && value != null) { 
-            // invalidations are handled separately
-            writerQueue.addPut(key, value);
+        if (async && EVICTION_CAUSES.contains(cause) && value != null) {
+            CacheMetadata.MetadataEntry metadata = 
memCacheMetadata.remove(key);
+            boolean qualifiesToPersist = true;
+            if (metadata != null && metadata.isReadFromPersistentCache()) {
+                qualifiesToPersist = false;
+            } else if (metadata != null && metadata.getAccessCount() < 1) {
+                qualifiesToPersist = false;
+            }
+
+            if (qualifiesToPersist) {
+                writeQueue.addPut(key, value);
+            }
         }
     }
-}
\ No newline at end of file
+}

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
 Tue Nov 29 09:52:19 2016
@@ -42,7 +42,13 @@ import com.google.common.cache.Cache;
 public class PersistentCache {
     
     static final Logger LOG = LoggerFactory.getLogger(PersistentCache.class);
-   
+
+    /**
+     * Whether to use the queue to put items into cache. Default: false (cache
+     * will be updated synchronously).
+     */
+    private static final boolean ASYNC_CACHE = 
Boolean.parseBoolean(System.getProperty("oak.cache.asynchronous", "false"));
+
     private static final String FILE_PREFIX = "cache-";
     private static final String FILE_SUFFIX = ".data";
     private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -55,6 +61,8 @@ public class PersistentCache {
     private boolean cacheDocChildren;
     private boolean compactOnClose;
     private boolean compress = true;
+    private boolean asyncCache = ASYNC_CACHE;
+    private boolean asyncDiffCache = false;
     private ArrayList<GenerationCache> caches = 
             new ArrayList<GenerationCache>();
     
@@ -112,6 +120,12 @@ public class PersistentCache {
                 appendOnly = true;
             } else if (p.equals("manualCommit")) {
                 manualCommit = true;
+            } else if (p.equals("+async")) {
+                asyncCache = true;
+            } else if (p.equals("-async")) {
+                asyncCache = false;
+            } else if (p.equals("+asyncDiff")) {
+                asyncDiffCache = true;
             }
         }
         this.directory = dir;
@@ -325,6 +339,7 @@ public class PersistentCache {
             DocumentStore docStore,
             Cache<K, V> base, CacheType type) {
         boolean wrap;
+        boolean async = asyncCache;
         switch (type) {
         case NODE:
             wrap = cacheNodes;
@@ -334,9 +349,11 @@ public class PersistentCache {
             break;
         case DIFF:
             wrap = cacheDiff;
+            async = asyncDiffCache;
             break;
         case LOCAL_DIFF:
             wrap = cacheLocalDiff;
+            async = asyncDiffCache;
             break;
         case DOC_CHILDREN:
             wrap = cacheDocChildren;
@@ -349,7 +366,7 @@ public class PersistentCache {
             break;
         }
         if (wrap) {
-            NodeCache<K, V> c = new NodeCache<K, V>(this, base, docNodeStore, 
docStore, type, writeDispatcher);
+            NodeCache<K, V> c = new NodeCache<K, V>(this, base, docNodeStore, 
docStore, type, writeDispatcher, async);
             initGenerationCache(c);
             return c;
         }
@@ -408,7 +425,7 @@ public class PersistentCache {
         }
     }
     
-    private boolean needSwitch() {
+    boolean needSwitch() {
         long size = writeStore.getFileSize();
         if (size / 1024 / 1024 <= maxSizeMB) {
             return false;

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
 Tue Nov 29 09:52:19 2016
@@ -29,23 +29,4 @@ interface CacheAction<K, V> {
      */
     void execute();
 
-    /**
-     * Cancel the action without executing it
-     */
-    void cancel();
-
-    /**
-     * Return the keys affected by this action
-     *
-     * @return keys affected by this action
-     */
-    Iterable<K> getAffectedKeys();
-
-    /**
-     * Return the owner of this action
-     *
-     * @return {@link CacheWriteQueue} executing this action
-     */
-    CacheWriteQueue<K, V> getOwner();
-
 }

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
 Tue Nov 29 09:52:19 2016
@@ -16,13 +16,6 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
 
-import static com.google.common.collect.Multimaps.index;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -30,13 +23,10 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
 /**
- * An asynchronous buffer of the CacheAction objects. The buffer removes
- * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than
- * {@link #MAX_SIZE}.
+ * An asynchronous buffer of the CacheAction objects. The buffer only accepts
+ * {@link #MAX_SIZE} number of elements. If the queue is already full, the new
+ * elements are dropped.
  */
 public class CacheActionDispatcher implements Runnable {
 
@@ -45,14 +35,9 @@ public class CacheActionDispatcher imple
     /**
      * What's the length of the queue.
      */
-    static final int MAX_SIZE = 1024;
-
-    /**
-     * How many actions remove once the queue is longer than {@link #MAX_SIZE}.
-     */
-    static final int ACTIONS_TO_REMOVE = 256;
+    static final int MAX_SIZE = 16 * 1024;
 
-    final BlockingQueue<CacheAction<?, ?>> queue = new 
ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE * 2);
+    final BlockingQueue<CacheAction<?, ?>> queue = new 
ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE);
 
     private volatile boolean isRunning = true;
 
@@ -68,7 +53,6 @@ public class CacheActionDispatcher imple
                 LOG.debug("Interrupted the queue.poll()", e);
             }
         }
-        applyInvalidateActions();
     }
 
     /**
@@ -79,91 +63,11 @@ public class CacheActionDispatcher imple
     }
 
     /**
-     * Adds the new action and cleans the queue if necessary.
+     * Tries to add new action.
      *
      * @param action to be added
      */
-    synchronized void add(CacheAction<?, ?> action) {
-        if (queue.size() >= MAX_SIZE) {
-            cleanTheQueue();
-        }
-        queue.offer(action);
-    }
-
-    /**
-     * Clean the queue and add a single invalidate action for all the removed 
entries. 
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private void cleanTheQueue() {
-        List<CacheAction> removed = removeOldest();
-        for (Entry<CacheWriteQueue, Collection<CacheAction>> e : 
groupByOwner(removed).entrySet()) {
-            CacheWriteQueue owner = e.getKey();
-            Collection<CacheAction> actions = e.getValue();
-            List<Object> affectedKeys = cancelAll(actions);
-            owner.addInvalidate(affectedKeys);
-        }
-    }
-
-    /**
-     * Remove {@link #ACTIONS_TO_REMOVE} oldest actions.
-     *
-     * @return A list of removed items.
-     */
-    @SuppressWarnings("rawtypes")
-    private List<CacheAction> removeOldest() {
-        List<CacheAction> removed = new ArrayList<CacheAction>();
-        while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) {
-            CacheAction toBeCanceled = queue.poll();
-            if (toBeCanceled == null) {
-                break;
-            } else {
-                removed.add(toBeCanceled);
-            }
-        }
-        return removed;
-    }
-
-    /**
-     * Group passed actions by their owners.
-     *
-     * @param actions to be grouped
-     * @return map in which owner is the key and assigned action list is the 
value
-     */
-    @SuppressWarnings("rawtypes")
-    private static Map<CacheWriteQueue, Collection<CacheAction>> 
groupByOwner(List<CacheAction> actions) {
-        return index(actions, new Function<CacheAction, CacheWriteQueue>() {
-            @Override
-            public CacheWriteQueue apply(CacheAction input) {
-                return input.getOwner();
-            }
-        }).asMap();
+    boolean add(CacheAction<?, ?> action) {
+        return queue.offer(action);
     }
-
-    /**
-     * Cancel all passed actions.
-     *
-     * @param actions to cancel
-     * @return list of affected keys
-     */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    private static List<Object> cancelAll(Collection<CacheAction> actions) {
-        List<Object> cancelledKeys = new ArrayList<Object>();
-        for (CacheAction action : actions) {
-            action.cancel();
-            Iterables.addAll(cancelledKeys, action.getAffectedKeys());
-        }
-        return cancelledKeys;
-    }
-
-    @SuppressWarnings("rawtypes")
-    private void applyInvalidateActions() {
-        CacheAction action;
-        do {
-            action = queue.poll();
-            if (action instanceof InvalidateCacheAction) {
-                action.execute();
-            }
-        } while (action != null);
-    }
-
-}
\ No newline at end of file
+}

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
 Tue Nov 29 09:52:19 2016
@@ -16,21 +16,10 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
 
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
+import java.util.Map;
 
-/**
- * A fronted for the {@link CacheActionDispatcher} creating actions and 
maintaining their state.
- *
- * @param <K> key type
- * @param <V> value type
- */
 public class CacheWriteQueue<K, V> {
 
     private final CacheActionDispatcher dispatcher;
@@ -39,65 +28,18 @@ public class CacheWriteQueue<K, V> {
 
     private final Map<K, V> map;
 
-    final Multiset<K> queuedKeys = HashMultiset.create();
-
-    final Set<K> waitsForInvalidation = new HashSet<K>();
-
     public CacheWriteQueue(CacheActionDispatcher dispatcher, PersistentCache 
cache, Map<K, V> map) {
         this.dispatcher = dispatcher;
         this.cache = cache;
         this.map = map;
     }
 
-    /**
-     * Add new invalidate action.
-     *
-     * @param keys to be invalidated
-     */
-    public void addInvalidate(Iterable<K> keys) {
-        synchronized(this) {
-            for (K key : keys) {
-                queuedKeys.add(key);
-                waitsForInvalidation.add(key);
-            }
-        }
-        dispatcher.add(new InvalidateCacheAction<K, V>(this, keys));
-    }
-
-    /**
-     * Add new put action
-     *
-     * @param key to be put to cache
-     * @param value to be put to cache
-     */
-    public void addPut(K key, V value) {
-        synchronized(this) {
-            queuedKeys.add(key);
-            waitsForInvalidation.remove(key);
-        }
-        dispatcher.add(new PutToCacheAction<K, V>(this, key, value));
-    }
-
-    /**
-     * Check if the last action added for this key was invalidate
-     *
-     * @param key to check 
-     * @return {@code true} if the last added action was invalidate
-     */
-    public synchronized boolean waitsForInvalidation(K key) {
-        return waitsForInvalidation.contains(key);
+    public boolean addPut(K key, V value) {
+        return dispatcher.add(new PutToCacheAction<K, V>(key, value, this));
     }
 
-    /**
-     * Remove the action state when it's finished or cancelled.
-     *
-     * @param key to be removed
-     */
-    synchronized void remove(K key) {
-        queuedKeys.remove(key);
-        if (!queuedKeys.contains(key)) {
-            waitsForInvalidation.remove(key);
-        }
+    public boolean addInvalidate(Iterable<K> keys) {
+        return dispatcher.add(new InvalidateCacheAction<K, V>(keys, this));
     }
 
     PersistentCache getCache() {
@@ -107,4 +49,4 @@ public class CacheWriteQueue<K, V> {
     Map<K, V> getMap() {
         return map;
     }
-}
\ No newline at end of file
+}

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
 Tue Nov 29 09:52:19 2016
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.Map;
 
+import com.google.common.collect.Iterables;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
 
 /**
@@ -32,49 +33,26 @@ class InvalidateCacheAction<K, V> implem
 
     private final Map<K, V> map;
 
-    private final CacheWriteQueue<K, V> owner;
-
     private final Iterable<K> keys;
 
-    InvalidateCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, Iterable<K> 
keys) {
-        this.owner = cacheWriteQueue;
+    InvalidateCacheAction(Iterable<K> keys, CacheWriteQueue<K, V> queue) {
         this.keys = keys;
-        this.cache = cacheWriteQueue.getCache();
-        this.map = cacheWriteQueue.getMap();
+        this.cache = queue.getCache();
+        this.map = queue.getMap();
     }
 
     @Override
     public void execute() {
-        try {
-            if (map != null) {
-                for (K key : keys) {
-                    cache.switchGenerationIfNeeded();
-                    map.remove(key);
-                }
+        if (map != null) {
+            for (K key : keys) {
+                cache.switchGenerationIfNeeded();
+                map.remove(key);
             }
-        } finally {
-            decrement();
         }
     }
 
     @Override
-    public void cancel() {
-        decrement();
-    }
-
-    @Override
-    public CacheWriteQueue<K, V> getOwner() {
-        return owner;
-    }
-
-    @Override
-    public Iterable<K> getAffectedKeys() {
-        return keys;
-    }
-
-    private void decrement() {
-        for (K key : keys) {
-            owner.remove(key);
-        }
+    public String toString() {
+        return new 
StringBuilder("InvalidateCacheAction").append(Iterables.toString(keys)).toString();
     }
-}
\ No newline at end of file
+}

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
 Tue Nov 29 09:52:19 2016
@@ -20,6 +20,7 @@ import static java.util.Collections.sing
 
 import java.util.Map;
 
+import com.google.common.collect.Iterables;
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
 
 /**
@@ -34,48 +35,27 @@ class PutToCacheAction<K, V> implements
 
     private final Map<K, V> map;
 
-    private final CacheWriteQueue<K, V> owner;
-
     private final K key;
 
     private final V value;
 
-    PutToCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, K key, V value) {
-        this.owner = cacheWriteQueue;
+    PutToCacheAction(K key, V value, CacheWriteQueue<K, V> queue) {
         this.key = key;
         this.value = value;
-        this.cache = cacheWriteQueue.getCache();
-        this.map = cacheWriteQueue.getMap();
+        this.cache = queue.getCache();
+        this.map = queue.getMap();
     }
 
     @Override
     public void execute() {
-        try {
-            if (map != null) {
-                cache.switchGenerationIfNeeded();
-                map.put(key, value);
-            }
-        } finally {
-            decrement();
+        if (map != null) {
+            cache.switchGenerationIfNeeded();
+            map.put(key, value);
         }
     }
 
     @Override
-    public void cancel() {
-        decrement();
-    }
-
-    @Override
-    public CacheWriteQueue<K, V> getOwner() {
-        return owner;
-    }
-
-    @Override
-    public Iterable<K> getAffectedKeys() {
-        return singleton(key);
-    }
-
-    private void decrement() {
-        owner.remove(key);
+    public String toString() {
+        return new 
StringBuilder("PutToCacheAction[").append(key).append(']').toString();
     }
-}
\ No newline at end of file
+}

Added: 
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java?rev=1771871&view=auto
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
 (added)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
 Tue Nov 29 09:52:19 2016
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+
+import com.google.common.cache.RemovalCause;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider;
+import org.apache.jackrabbit.oak.plugins.document.PathRev;
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
+import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+
+public class AsyncQueueTest {
+
+    @Rule
+    public DocumentMKBuilderProvider builderProvider = new 
DocumentMKBuilderProvider();
+
+    private static final StringValue VAL = new StringValue("xyz");
+
+    private PersistentCache pCache;
+
+    private List<PathRev> putActions;
+
+    private List<PathRev> invalidateActions;
+
+    private NodeCache<PathRev, StringValue> nodeCache;
+
+    private int id;
+
+    @Before
+    public void setup() throws IOException {
+        FileUtils.deleteDirectory(new File("target/cacheTest"));
+        pCache = new PersistentCache("target/cacheTest,+async");
+        final AtomicReference<NodeCache<PathRev, StringValue>> nodeCacheRef = 
new AtomicReference<NodeCache<PathRev, StringValue>>();
+        CacheLIRS<PathRev, StringValue> cache = new CacheLIRS.Builder<PathRev, 
StringValue>().maximumSize(1).evictionCallback(new 
CacheLIRS.EvictionCallback<PathRev, StringValue>() {
+            @Override
+            public void evicted(@Nonnull PathRev key, @Nullable StringValue 
value, @Nonnull RemovalCause cause) {
+                if (nodeCacheRef.get() != null) {
+                    nodeCacheRef.get().evicted(key, value, cause);
+                }
+            }
+        }).build();
+        nodeCache = (NodeCache<PathRev, StringValue>) 
pCache.wrap(builderProvider.newBuilder().getNodeStore(),
+                null, cache,  CacheType.NODE);
+        nodeCacheRef.set(nodeCache);
+
+        CacheWriteQueueWrapper writeQueue = new 
CacheWriteQueueWrapper(nodeCache.writeQueue);
+        nodeCache.writeQueue = writeQueue;
+
+        this.putActions = writeQueue.putActions;
+        this.invalidateActions = writeQueue.invalidateActions;
+        this.id = 0;
+    }
+
+    @After
+    public void teardown() {
+        if (pCache != null) {
+            pCache.close();
+        }
+    }
+
+    @Test
+    public void unusedItemsShouldntBePersisted() {
+        PathRev k = generatePathRev();
+        nodeCache.put(k, VAL);
+        flush();
+        assertEquals(emptyList(), putActions);
+    }
+
+    @Test
+    public void readItemsShouldntBePersistedAgain() {
+        PathRev k = generatePathRev();
+        nodeCache.put(k, VAL);
+        nodeCache.getIfPresent(k);
+        flush();
+        assertEquals(asList(k), putActions);
+
+        putActions.clear();
+        nodeCache.getIfPresent(k); // k should be loaded from persisted cache
+        flush();
+        assertEquals(emptyList(), putActions); // k is not persisted again
+    }
+
+    @Test
+    public void usedItemsShouldBePersisted() {
+        PathRev k = generatePathRev();
+        nodeCache.put(k, VAL);
+        nodeCache.getIfPresent(k);
+        flush();
+        assertEquals(asList(k), putActions);
+    }
+
+    private PathRev generatePathRev() {
+        return new PathRev("/" + id++, new Revision(0, 0, 0));
+    }
+
+    private void flush() {
+        for (int i = 0; i < 1024; i++) {
+            nodeCache.put(generatePathRev(), VAL); // cause eviction of k
+        }
+    }
+
+    private static class CacheWriteQueueWrapper extends 
CacheWriteQueue<PathRev, StringValue> {
+
+        private final CacheWriteQueue<PathRev, StringValue>  wrapped;
+
+        private final List<PathRev> putActions = newArrayList();
+
+        private final List<PathRev> invalidateActions = newArrayList();
+
+        public CacheWriteQueueWrapper(CacheWriteQueue<PathRev, StringValue>  
wrapped) {
+            super(null, null, null);
+            this.wrapped = wrapped;
+        }
+
+        @Override
+        public boolean addPut(PathRev key, StringValue value) {
+            putActions.add(key);
+            return wrapped.addPut(key, value);
+        }
+
+        public boolean addInvalidate(Iterable<PathRev> keys) {
+            invalidateActions.addAll(newArrayList(keys));
+            return wrapped.addInvalidate(keys);
+        }
+    }
+
+}

Modified: 
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
 Tue Nov 29 09:52:19 2016
@@ -18,33 +18,21 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
 
-import static com.google.common.collect.ImmutableSet.of;
-import static com.google.common.collect.Iterables.size;
 import static java.lang.String.valueOf;
 import static java.lang.System.currentTimeMillis;
 import static java.lang.Thread.sleep;
-import static 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE;
 import static 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
-import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction;
-import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
-import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
-import 
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.InvalidateCacheAction;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -59,17 +47,8 @@ public class CacheActionDispatcherTest {
         for (int i = 0; i < MAX_SIZE + 10; i++) {
             dispatcher.add(createWriteAction(valueOf(i), queue));
         }
-        assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1, 
dispatcher.queue.size());
-        assertEquals(valueOf(ACTIONS_TO_REMOVE), 
dispatcher.queue.peek().toString());
-
-        InvalidateCacheAction<?, ?> invalidateAction = null;
-        for (CacheAction<?, ?> action : dispatcher.queue) {
-            if (action instanceof InvalidateCacheAction) {
-                invalidateAction = (InvalidateCacheAction<?, ?>) action;
-            }
-        }
-        assertNotNull(invalidateAction);
-        assertEquals(ACTIONS_TO_REMOVE, 
size(invalidateAction.getAffectedKeys()));
+        assertEquals(MAX_SIZE, dispatcher.queue.size());
+        assertEquals("0", dispatcher.queue.peek().toString());
     }
 
     @Test
@@ -128,31 +107,6 @@ public class CacheActionDispatcherTest {
         assertFalse(queueThread.isAlive());
     }
 
-    @Test
-    public void testExecuteInvalidatesOnShutdown() throws InterruptedException 
{
-        Map<String, Object> cacheMap = new HashMap<String, Object>();
-        CacheActionDispatcher dispatcher = new CacheActionDispatcher();
-        CacheWriteQueue<String, Object> queue = new CacheWriteQueue<String, 
Object>(dispatcher,
-                Mockito.mock(PersistentCache.class), cacheMap);
-        Thread queueThread = new Thread(dispatcher);
-        queueThread.start();
-
-        cacheMap.put("2", new Object());
-        cacheMap.put("3", new Object());
-        cacheMap.put("4", new Object());
-        dispatcher.add(new DummyCacheWriteAction("1", queue, 100));
-        dispatcher.add(new InvalidateCacheAction<String, Object>(queue, 
Collections.singleton("2")));
-        dispatcher.add(new InvalidateCacheAction<String, Object>(queue, 
Collections.singleton("3")));
-        dispatcher.add(new InvalidateCacheAction<String, Object>(queue, 
Collections.singleton("4")));
-        Thread.sleep(10); // make sure the first action started
-
-        dispatcher.stop();
-        assertEquals(of("2", "3", "4"), cacheMap.keySet());
-
-        queueThread.join();
-        assertTrue(cacheMap.isEmpty());
-    }
-
     private DummyCacheWriteAction createWriteAction(String id, 
CacheWriteQueue<String, Object> queue) {
         return new DummyCacheWriteAction(id, queue);
     }
@@ -188,22 +142,9 @@ public class CacheActionDispatcherTest {
         }
 
         @Override
-        public void cancel() {
-        }
-
-        @Override
         public String toString() {
             return id;
         }
 
-        @Override
-        public Iterable<String> getAffectedKeys() {
-            return Collections.singleton(id);
-        }
-
-        @Override
-        public CacheWriteQueue<String, Object> getOwner() {
-            return queue;
-        }
     }
 }


Reply via email to