Author: thomasm Date: Tue Feb 16 10:39:18 2016 New Revision: 1730650 URL: http://svn.apache.org/viewvc?rev=1730650&view=rev Log: OAK-2761 Persistent cache: add data in a different thread
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1730650&r1=1730649&r2=1730650&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Tue Feb 16 10:39:18 2016 @@ -17,11 +17,17 @@ package org.apache.jackrabbit.oak.plugins.document; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.cache.RemovalCause.COLLECTED; +import static com.google.common.cache.RemovalCause.EXPIRED; +import static com.google.common.cache.RemovalCause.SIZE; import java.io.InputStream; +import java.lang.ref.Reference; import java.net.UnknownHostException; +import java.util.EnumSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -32,12 +38,16 @@ import javax.sql.DataSource; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.cache.Weigher; import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import com.mongodb.DB; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.cache.CacheLIRS; +import org.apache.jackrabbit.oak.cache.CacheLIRS.EvictionCallback; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.cache.CacheValue; import org.apache.jackrabbit.oak.cache.EmpiricalWeigher; @@ -59,6 +69,7 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.document.mongo.MongoMissingLastRevSeeker; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoVersionGCSupport; import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.EvictionListener; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; import org.apache.jackrabbit.oak.plugins.document.rdb.RDBBlobStore; import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore; @@ -1008,19 +1019,24 @@ public class DocumentMK { return new NodeDocumentCache(nodeDocumentsCache, nodeDocumentsCacheStats, prevDocumentsCache, prevDocumentsCacheStats, locks); } + @SuppressWarnings("unchecked") private <K extends CacheValue, V extends CacheValue> Cache<K, V> buildCache( CacheType cacheType, long maxWeight, DocumentNodeStore docNodeStore, DocumentStore docStore ) { - Cache<K, V> cache = buildCache(cacheType.name(), maxWeight); + Set<EvictionListener<K, V>> listeners = new CopyOnWriteArraySet<EvictionListener<K,V>>(); + Cache<K, V> cache = buildCache(cacheType.name(), maxWeight, listeners); PersistentCache p = getPersistentCache(); if (p != null) { if (docNodeStore != null) { docNodeStore.setPersistentCache(p); } cache = p.wrap(docNodeStore, docStore, cache, cacheType); + if (cache instanceof EvictionListener) { + listeners.add((EvictionListener<K, V>) cache); + } } return cache; } @@ -1042,7 +1058,8 @@ public class DocumentMK { private <K extends CacheValue, V extends CacheValue> Cache<K, V> buildCache( String module, - long maxWeight) { + long maxWeight, + final Set<EvictionListener<K, V>> listeners) { // by default, use the LIRS cache when using the persistent cache, // but don't use it otherwise boolean useLirs = persistentCacheURI != null; @@ -1064,6 +1081,14 @@ public class DocumentMK { segmentCount(cacheSegmentCount). stackMoveDistance(cacheStackMoveDistance). recordStats(). + evictionCallback(new EvictionCallback<K, V>() { + @Override + public void evicted(K key, V value, RemovalCause cause) { + for (EvictionListener<K, V> l : listeners) { + l.evicted(key, value, cause); + } + } + }). build(); } return CacheBuilder.newBuilder(). @@ -1071,6 +1096,14 @@ public class DocumentMK { weigher(weigher). maximumWeight(maxWeight). recordStats(). + removalListener(new RemovalListener<K, V>() { + @Override + public void onRemoval(RemovalNotification<K, V> notification) { + for (EvictionListener<K, V> l : listeners) { + l.evicted(notification.getKey(), notification.getValue(), notification.getCause()); + } + } + }). build(); } Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java?rev=1730650&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java Tue Feb 16 10:39:18 2016 @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache; + +import com.google.common.cache.RemovalCause; + +/** + * A listener that gets notified of entries that were removed from the cache. + * + * @param <K> the key type + * @param <V> the value type + */ +public interface EvictionListener<K, V> { + + void evicted(K key, V value, RemovalCause removalCause); + +} Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java?rev=1730650&r1=1730649&r2=1730650&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java Tue Feb 16 10:39:18 2016 @@ -16,8 +16,14 @@ */ package org.apache.jackrabbit.oak.plugins.document.persistentCache; +import static com.google.common.cache.RemovalCause.COLLECTED; +import static com.google.common.cache.RemovalCause.EXPIRED; +import static com.google.common.cache.RemovalCause.SIZE; +import static java.util.Collections.singleton; + import java.nio.ByteBuffer; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -27,6 +33,8 @@ import javax.annotation.Nullable; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache.GenerationCache; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; import org.h2.mvstore.MVMap; import org.h2.mvstore.WriteBuffer; import org.h2.mvstore.type.DataType; @@ -34,22 +42,29 @@ import org.h2.mvstore.type.DataType; import com.google.common.base.Function; import com.google.common.cache.Cache; import com.google.common.cache.CacheStats; +import com.google.common.cache.RemovalCause; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +class NodeCache<K, V> implements Cache<K, V>, GenerationCache, EvictionListener<K, V> { + + private static final Set<RemovalCause> EVICTION_CAUSES = ImmutableSet.of(COLLECTED, EXPIRED, SIZE); -class NodeCache<K, V> implements Cache<K, V>, GenerationCache { - private final PersistentCache cache; private final Cache<K, V> memCache; private final MultiGenerationMap<K, V> map; private final CacheType type; private final DataType keyType; private final DataType valueType; - + private final CacheWriteQueue<K, V> writerQueue; + NodeCache( PersistentCache cache, Cache<K, V> memCache, DocumentNodeStore docNodeStore, - DocumentStore docStore, CacheType type) { + DocumentStore docStore, + CacheType type, + CacheActionDispatcher dispatcher) { this.cache = cache; this.memCache = memCache; this.type = type; @@ -57,6 +72,7 @@ class NodeCache<K, V> implements Cache<K map = new MultiGenerationMap<K, V>(); keyType = new KeyDataType(type); valueType = new ValueDataType(docNodeStore, docStore, type); + this.writerQueue = new CacheWriteQueue<K, V>(dispatcher, cache, map); } @Override @@ -82,47 +98,31 @@ class NodeCache<K, V> implements Cache<K } private V readIfPresent(K key) { + if (writerQueue.waitsForInvalidation(key)) { + return null; + } cache.switchGenerationIfNeeded(); V v = map.get(key); return v; } - - private void write(final K key, final V value) { - write(key, value, true); - } - private void writeWithoutBroadcast(final K key, final V value) { - write(key, value, false); - } - - private void write(final K key, final V value, boolean broadcast) { - cache.switchGenerationIfNeeded(); - if (broadcast) { - cache.broadcast(type, new Function<WriteBuffer, Void>() { - @Override - @Nullable - public Void apply(@Nullable WriteBuffer buffer) { - keyType.write(buffer, key); - if (value == null) { - buffer.put((byte) 0); - } else { - buffer.put((byte) 1); - valueType.write(buffer, value); - } - return null; + private void broadcast(final K key, final V value) { + cache.broadcast(type, new Function<WriteBuffer, Void>() { + @Override + @Nullable + public Void apply(@Nullable WriteBuffer buffer) { + keyType.write(buffer, key); + if (value == null) { + buffer.put((byte) 0); + } else { + buffer.put((byte) 1); + valueType.write(buffer, value); } - }); - } - MultiGenerationMap<K, V> m = map; - if (m != null) { - if (value == null) { - m.remove(key); - } else { - m.put(key, value); + return null; } - } + }); } - + @SuppressWarnings("unchecked") @Override @Nullable @@ -147,7 +147,7 @@ class NodeCache<K, V> implements Cache<K return value; } value = memCache.get(key, valueLoader); - write(key, value); + broadcast(key, value); return value; } @@ -160,14 +160,15 @@ class NodeCache<K, V> implements Cache<K @Override public void put(K key, V value) { memCache.put(key, value); - write(key, value); + broadcast(key, value); } @SuppressWarnings("unchecked") @Override public void invalidate(Object key) { memCache.invalidate(key); - write((K) key, (V) null); + writerQueue.addInvalidate(singleton((K) key)); + broadcast((K) key, null); } @Override @@ -218,7 +219,17 @@ class NodeCache<K, V> implements Cache<K value = (V) valueType.read(buff); memCache.put(key, value); } - writeWithoutBroadcast(key, value); + } + + /** + * Invoked on the eviction from the {@link #memCache} + */ + @Override + public void evicted(K key, V value, RemovalCause cause) { + if (EVICTION_CAUSES.contains(cause) && value != null) { + // invalidations are handled separately + writerQueue.addPut(key, value); + } } } \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1730650&r1=1730649&r2=1730650&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java Tue Feb 16 10:39:18 2016 @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster; import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster; import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster; @@ -82,6 +83,8 @@ public class PersistentCache implements private ThreadLocal<WriteBuffer> writeBuffer = new ThreadLocal<WriteBuffer>(); private final byte[] broadcastId; private DynamicBroadcastConfig broadcastConfig; + private CacheActionDispatcher writeDispatcher; + private Thread writeDispatcherThread; { ByteBuffer bb = ByteBuffer.wrap(new byte[16]); @@ -192,6 +195,11 @@ public class PersistentCache implements } writeStore = createMapFactory(writeGeneration, false); initBroadcast(broadcast); + + writeDispatcher = new CacheActionDispatcher(); + writeDispatcherThread = new Thread(writeDispatcher, "Oak CacheWriteQueue"); + writeDispatcherThread.setDaemon(true); + writeDispatcherThread.start(); } private void initBroadcast(String broadcast) { @@ -338,6 +346,13 @@ public class PersistentCache implements } public void close() { + writeDispatcher.stop(); + try { + writeDispatcherThread.join(); + } catch (InterruptedException e) { + LOG.error("Can't join the {}", writeDispatcherThread.getName(), e); + } + if (writeStore != null) { writeStore.closeStore(); } @@ -395,7 +410,7 @@ public class PersistentCache implements } if (wrap) { NodeCache<K, V> c = new NodeCache<K, V>(this, - base, docNodeStore, docStore, type); + base, docNodeStore, docStore, type, writeDispatcher); initGenerationCache(c); return c; } Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java?rev=1730650&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java Tue Feb 16 10:39:18 2016 @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +/** + * Object represents an action on the cache (eg. put or invalidate). + * + * @param <K> key type + * @param <V> value type + */ +interface CacheAction<K, V> { + + /** + * Execute the action + */ + void execute(); + + /** + * Cancel the action without executing it + */ + void cancel(); + + /** + * Return the keys affected by this action + * + * @return keys affected by this action + */ + Iterable<K> getAffectedKeys(); + + /** + * Return the owner of this action + * + * @return {@link CacheWriteQueue} executing this action + */ + CacheWriteQueue<K, V> getOwner(); + +} Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java?rev=1730650&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java Tue Feb 16 10:39:18 2016 @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static com.google.common.collect.Multimaps.index; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +/** + * An asynchronous buffer of the CacheAction objects. The buffer removes + * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than + * {@link #MAX_SIZE}. + */ +public class CacheActionDispatcher implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(CacheActionDispatcher.class); + + /** + * What's the length of the queue. + */ + static final int MAX_SIZE = 1024; + + /** + * How many actions remove once the queue is longer than {@link #MAX_SIZE}. + */ + static final int ACTIONS_TO_REMOVE = 256; + + final BlockingQueue<CacheAction<?, ?>> queue = new ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE * 2); + + private volatile boolean isRunning = true; + + @Override + public void run() { + while (isRunning) { + try { + CacheAction<?, ?> action = queue.poll(10, TimeUnit.MILLISECONDS); + if (action != null && isRunning) { + action.execute(); + } + } catch (InterruptedException e) { + LOG.debug("Interrupted the queue.poll()", e); + } + } + applyInvalidateActions(); + } + + /** + * Stop the processing. + */ + public void stop() { + isRunning = false; + } + + /** + * Adds the new action and cleans the queue if necessary. + * + * @param action to be added + */ + synchronized void add(CacheAction<?, ?> action) { + if (queue.size() >= MAX_SIZE) { + cleanTheQueue(); + } + queue.offer(action); + } + + /** + * Clean the queue and add a single invalidate action for all the removed entries. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void cleanTheQueue() { + List<CacheAction> removed = removeOldest(); + for (Entry<CacheWriteQueue, Collection<CacheAction>> e : groupByOwner(removed).entrySet()) { + CacheWriteQueue owner = e.getKey(); + Collection<CacheAction> actions = e.getValue(); + List<Object> affectedKeys = cancelAll(actions); + owner.addInvalidate(affectedKeys); + } + } + + /** + * Remove {@link #ACTIONS_TO_REMOVE} oldest actions. + * + * @return A list of removed items. + */ + @SuppressWarnings("rawtypes") + private List<CacheAction> removeOldest() { + List<CacheAction> removed = new ArrayList<CacheAction>(); + while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) { + CacheAction toBeCanceled = queue.poll(); + if (toBeCanceled == null) { + break; + } else { + removed.add(toBeCanceled); + } + } + return removed; + } + + /** + * Group passed actions by their owners. + * + * @param actions to be grouped + * @return map in which owner is the key and assigned action list is the value + */ + @SuppressWarnings("rawtypes") + private static Map<CacheWriteQueue, Collection<CacheAction>> groupByOwner(List<CacheAction> actions) { + return index(actions, new Function<CacheAction, CacheWriteQueue>() { + @Override + public CacheWriteQueue apply(CacheAction input) { + return input.getOwner(); + } + }).asMap(); + } + + /** + * Cancel all passed actions. + * + * @param actions to cancel + * @return list of affected keys + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static List<Object> cancelAll(Collection<CacheAction> actions) { + List<Object> cancelledKeys = new ArrayList<Object>(); + for (CacheAction action : actions) { + action.cancel(); + Iterables.addAll(cancelledKeys, action.getAffectedKeys()); + } + return cancelledKeys; + } + + @SuppressWarnings("rawtypes") + private void applyInvalidateActions() { + CacheAction action; + do { + action = queue.poll(); + if (action instanceof InvalidateCacheAction) { + action.execute(); + } + } while (action != null); + } + +} \ No newline at end of file Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java?rev=1730650&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java Tue Feb 16 10:39:18 2016 @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; + +/** + * A fronted for the {@link CacheActionDispatcher} creating actions and maintaining their state. + * + * @param <K> key type + * @param <V> value type + */ +public class CacheWriteQueue<K, V> { + + private final CacheActionDispatcher dispatcher; + + private final PersistentCache cache; + + private final Map<K, V> map; + + final Multiset<K> queuedKeys = HashMultiset.create(); + + final Set<K> waitsForInvalidation = new HashSet<K>(); + + public CacheWriteQueue(CacheActionDispatcher dispatcher, PersistentCache cache, Map<K, V> map) { + this.dispatcher = dispatcher; + this.cache = cache; + this.map = map; + } + + /** + * Add new invalidate action. + * + * @param keys to be invalidated + */ + public void addInvalidate(Iterable<K> keys) { + synchronized(this) { + for (K key : keys) { + queuedKeys.add(key); + waitsForInvalidation.add(key); + } + } + dispatcher.add(new InvalidateCacheAction<K, V>(this, keys)); + } + + /** + * Add new put action + * + * @param key to be put to cache + * @param value to be put to cache + */ + public void addPut(K key, V value) { + synchronized(this) { + queuedKeys.add(key); + waitsForInvalidation.remove(key); + } + dispatcher.add(new PutToCacheAction<K, V>(this, key, value)); + } + + /** + * Check if the last action added for this key was invalidate + * + * @param key to check + * @return {@code true} if the last added action was invalidate + */ + public synchronized boolean waitsForInvalidation(K key) { + return waitsForInvalidation.contains(key); + } + + /** + * Remove the action state when it's finished or cancelled. + * + * @param key to be removed + */ + synchronized void remove(K key) { + queuedKeys.remove(key); + if (!queuedKeys.contains(key)) { + waitsForInvalidation.remove(key); + } + } + + PersistentCache getCache() { + return cache; + } + + Map<K, V> getMap() { + return map; + } +} \ No newline at end of file Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java?rev=1730650&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java Tue Feb 16 10:39:18 2016 @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import java.util.Map; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; + +/** + * An invalidate cache action. + * + * @param <K> key type + * @param <V> value type + */ +class InvalidateCacheAction<K, V> implements CacheAction<K, V> { + + private final PersistentCache cache; + + private final Map<K, V> map; + + private final CacheWriteQueue<K, V> owner; + + private final Iterable<K> keys; + + InvalidateCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, Iterable<K> keys) { + this.owner = cacheWriteQueue; + this.keys = keys; + this.cache = cacheWriteQueue.getCache(); + this.map = cacheWriteQueue.getMap(); + } + + @Override + public void execute() { + try { + if (map != null) { + for (K key : keys) { + cache.switchGenerationIfNeeded(); + map.remove(key); + } + } + } finally { + decrement(); + } + } + + @Override + public void cancel() { + decrement(); + } + + @Override + public CacheWriteQueue<K, V> getOwner() { + return owner; + } + + @Override + public Iterable<K> getAffectedKeys() { + return keys; + } + + private void decrement() { + for (K key : keys) { + owner.remove(key); + } + } +} \ No newline at end of file Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java?rev=1730650&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java Tue Feb 16 10:39:18 2016 @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static java.util.Collections.singleton; + +import java.util.Map; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; + +/** + * Put to cache action + * + * @param <K> key type + * @param <V> value type + */ +class PutToCacheAction<K, V> implements CacheAction<K, V> { + + private final PersistentCache cache; + + private final Map<K, V> map; + + private final CacheWriteQueue<K, V> owner; + + private final K key; + + private final V value; + + PutToCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, K key, V value) { + this.owner = cacheWriteQueue; + this.key = key; + this.value = value; + this.cache = cacheWriteQueue.getCache(); + this.map = cacheWriteQueue.getMap(); + } + + @Override + public void execute() { + try { + if (map != null) { + cache.switchGenerationIfNeeded(); + map.put(key, value); + } + } finally { + decrement(); + } + } + + @Override + public void cancel() { + decrement(); + } + + @Override + public CacheWriteQueue<K, V> getOwner() { + return owner; + } + + @Override + public Iterable<K> getAffectedKeys() { + return singleton(key); + } + + private void decrement() { + owner.remove(key); + } +} \ No newline at end of file Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java?rev=1730650&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java (added) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java Tue Feb 16 10:39:18 2016 @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.io.File; + +import com.google.common.cache.Cache; + +import org.apache.commons.io.FileUtils; +import org.junit.Test; + +import static org.junit.Assert.assertNull; + +public class AsyncCacheTest { + + @Test + public void invalidateWhileInQueue() throws Exception { + FileUtils.deleteDirectory(new File("target/cacheTest")); + DocumentMK.Builder builder = new DocumentMK.Builder(); + builder.setPersistentCache("target/cacheTest"); + Cache<PathRev, DocumentNodeState.Children> cache = builder.buildChildrenCache(); + DocumentNodeState.Children c = new DocumentNodeState.Children(); + for (int i = 0; i < 100; i++) { + c.children.add("node-" + i); + } + PathRev key = null; + for (int i = 0; i < 1000; i++) { + key = new PathRev("/foo/bar", new RevisionVector(new Revision(i, 0, 1))); + cache.put(key, c); + } + cache.invalidate(key); + // give the write queue some time to write back entries + Thread.sleep(200); + assertNull(cache.getIfPresent(key)); + builder.getPersistentCache().close(); + } +} Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java?rev=1730650&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java (added) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java Tue Feb 16 10:39:18 2016 @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static com.google.common.collect.ImmutableSet.of; +import static com.google.common.collect.Iterables.size; +import static java.lang.String.valueOf; +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; +import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE; +import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.InvalidateCacheAction; +import org.junit.Test; +import org.mockito.Mockito; + +public class CacheActionDispatcherTest { + + @Test + public void testMaxQueueSize() { + CacheActionDispatcher dispatcher = new CacheActionDispatcher(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + CacheWriteQueue<String, Object> queue = new CacheWriteQueue(dispatcher, mock(PersistentCache.class), null); + + for (int i = 0; i < MAX_SIZE + 10; i++) { + dispatcher.add(createWriteAction(valueOf(i), queue)); + } + assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1, dispatcher.queue.size()); + assertEquals(valueOf(ACTIONS_TO_REMOVE), dispatcher.queue.peek().toString()); + + InvalidateCacheAction<?, ?> invalidateAction = null; + for (CacheAction<?, ?> action : dispatcher.queue) { + if (action instanceof InvalidateCacheAction) { + invalidateAction = (InvalidateCacheAction<?, ?>) action; + } + } + assertNotNull(invalidateAction); + assertEquals(ACTIONS_TO_REMOVE, size(invalidateAction.getAffectedKeys())); + } + + @Test + public void testQueue() throws InterruptedException { + int threads = 5; + int actionsPerThread = 100; + + @SuppressWarnings("unchecked") + CacheWriteQueue<String, Object> queue = Mockito.mock(CacheWriteQueue.class); + final CacheActionDispatcher dispatcher = new CacheActionDispatcher(); + Thread queueThread = new Thread(dispatcher); + queueThread.start(); + + List<DummyCacheWriteAction> allActions = new ArrayList<DummyCacheWriteAction>(); + List<Thread> producerThreads = new ArrayList<Thread>(); + for (int i = 0; i < threads; i++) { + final List<DummyCacheWriteAction> threadActions = new ArrayList<DummyCacheWriteAction>(); + for (int j = 0; j < actionsPerThread; j++) { + DummyCacheWriteAction action = new DummyCacheWriteAction(String.format("%d_%d", i, j), queue); + threadActions.add(action); + allActions.add(action); + } + Thread t = new Thread(new Runnable() { + @Override + public void run() { + for (DummyCacheWriteAction a : threadActions) { + dispatcher.add(a); + } + } + }); + producerThreads.add(t); + } + + for (Thread t : producerThreads) { + t.start(); + } + for (Thread t : producerThreads) { + t.join(); + } + + long start = currentTimeMillis(); + while (!allActions.isEmpty()) { + Iterator<DummyCacheWriteAction> it = allActions.iterator(); + while (it.hasNext()) { + if (it.next().finished) { + it.remove(); + } + } + if (currentTimeMillis() - start > 10000) { + fail("Following actions hasn't been executed: " + allActions); + } + } + + dispatcher.stop(); + queueThread.join(); + assertFalse(queueThread.isAlive()); + } + + @Test + public void testExecuteInvalidatesOnShutdown() throws InterruptedException { + Map<String, Object> cacheMap = new HashMap<String, Object>(); + CacheActionDispatcher dispatcher = new CacheActionDispatcher(); + CacheWriteQueue<String, Object> queue = new CacheWriteQueue<String, Object>(dispatcher, + Mockito.mock(PersistentCache.class), cacheMap); + Thread queueThread = new Thread(dispatcher); + queueThread.start(); + + cacheMap.put("2", new Object()); + cacheMap.put("3", new Object()); + cacheMap.put("4", new Object()); + dispatcher.add(new DummyCacheWriteAction("1", queue, 100)); + dispatcher.add(new InvalidateCacheAction<String, Object>(queue, Collections.singleton("2"))); + dispatcher.add(new InvalidateCacheAction<String, Object>(queue, Collections.singleton("3"))); + dispatcher.add(new InvalidateCacheAction<String, Object>(queue, Collections.singleton("4"))); + Thread.sleep(10); // make sure the first action started + + dispatcher.stop(); + assertEquals(of("2", "3", "4"), cacheMap.keySet()); + + queueThread.join(); + assertTrue(cacheMap.isEmpty()); + } + + private DummyCacheWriteAction createWriteAction(String id, CacheWriteQueue<String, Object> queue) { + return new DummyCacheWriteAction(id, queue); + } + + private class DummyCacheWriteAction implements CacheAction<String, Object> { + + private final CacheWriteQueue<String, Object> queue; + + private final String id; + + private final long delay; + + private volatile boolean finished; + + private DummyCacheWriteAction(String id, CacheWriteQueue<String, Object> queue) { + this(id, queue, new Random().nextInt(10)); + } + + private DummyCacheWriteAction(String id, CacheWriteQueue<String, Object> queue, long delay) { + this.queue = queue; + this.id = id; + this.delay = delay; + } + + @Override + public void execute() { + try { + sleep(delay); + } catch (InterruptedException e) { + fail("Interrupted"); + } + finished = true; + } + + @Override + public void cancel() { + } + + @Override + public String toString() { + return id; + } + + @Override + public Iterable<String> getAffectedKeys() { + return Collections.singleton(id); + } + + @Override + public CacheWriteQueue<String, Object> getOwner() { + return queue; + } + } +} Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java?rev=1730650&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java (added) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java Tue Feb 16 10:39:18 2016 @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static java.util.Collections.singleton; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class CacheWriteQueueTest { + + private CacheWriteQueue<String, Object> queue; + + @SuppressWarnings("rawtypes") + private List<CacheAction> actions = Collections.synchronizedList(new ArrayList<CacheAction>()); + + @Before + public void initQueue() { + actions.clear(); + + CacheActionDispatcher dispatcher = new CacheActionDispatcher() { + public void add(CacheAction<?, ?> action) { + actions.add(action); + } + }; + + PersistentCache cache = Mockito.mock(PersistentCache.class); + queue = new CacheWriteQueue<String, Object>(dispatcher, cache, null); + } + + @Test + public void testCounters() throws InterruptedException { + final int threadCount = 10; + final int actionsPerThread = 50; + + final Map<String, AtomicInteger> counters = new HashMap<String, AtomicInteger>(); + for (int i = 0; i < 10; i++) { + String key = "key_" + i; + counters.put(key, new AtomicInteger()); + } + + final Random random = new Random(); + List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + for (int j = 0; j < actionsPerThread; j++) { + for (String key : counters.keySet()) { + if (random.nextBoolean()) { + queue.addPut(key, null); + } else { + queue.addPut(key, new Object()); + } + counters.get(key).incrementAndGet(); + } + } + } + }); + threads.add(t); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + for (String key : counters.keySet()) { + assertEquals(queue.queuedKeys.count(key), counters.get(key).get()); + } + + for (CacheAction<?, ?> action : actions) { + if (random.nextBoolean()) { + action.execute(); + } else { + action.cancel(); + } + } + + assertTrue(queue.queuedKeys.isEmpty()); + assertTrue(queue.waitsForInvalidation.isEmpty()); + } + + @Test + public void testWaitsForInvalidation() { + assertFalse(queue.waitsForInvalidation("key")); + + queue.addInvalidate(singleton("key")); + assertTrue(queue.waitsForInvalidation("key")); + + queue.addPut("key", new Object()); + assertFalse(queue.waitsForInvalidation("key")); + + queue.addInvalidate(singleton("key")); + assertTrue(queue.waitsForInvalidation("key")); + + int i; + for (i = 0; i < actions.size() - 1; i++) { + actions.get(i).execute(); + assertTrue(queue.waitsForInvalidation("key")); + } + + actions.get(i).execute(); + assertFalse(queue.waitsForInvalidation("key")); + } + +} \ No newline at end of file