Author: tomekr
Date: Tue Nov 29 09:52:19 2016
New Revision: 1771871
URL: http://svn.apache.org/viewvc?rev=1771871&view=rev
Log:
OAK-5147: Backport asynchronous queue improvements to 1.2
Added:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
Removed:
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
Added:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java?rev=1771871&view=auto
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
(added)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
Tue Nov 29 09:52:19 2016
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.collect.Maps.newConcurrentMap;
+
+/**
+ * In order to avoid leaking values from the metadataMap, following order
should
+ * be maintained for combining the cache and CacheMetadata:
+ *
+ * 1. For remove(), removeAll() and clear():
+ *
+ * - cache.invalidate()
+ * - metadata.remove()
+ *
+ * 2. For put(), putAll() and putFromPersistenceAndIncrement():
+ *
+ * - metadata.put()
+ * - cache.put()
+ *
+ * 3. For increment():
+ *
+ * - metadata.increment()
+ * - cache.get()
+ * - (metadata.remove() if value doesn't exists in cache)
+ *
+ * 4. For incrementAll():
+ *
+ * - metadata.incrementAll()
+ * - cache.getAll()
+ * - (metadata.removeAll() on keys that returned nulls)
+ *
+ * Preserving this order will allow to avoid leaked values in the metadata
without
+ * an extra synchronization between cache and metadata operations. This
strategy
+ * is a best-effort option - it may happen that cache values won't have their
+ * metadata entries.
+ */
+public class CacheMetadata<K> {
+
+ private final ConcurrentMap<K, MetadataEntry> metadataMap =
newConcurrentMap();
+
+ private boolean enabled = true;
+
+ boolean isEnabled() {
+ return enabled;
+ }
+
+ void disable() {
+ this.enabled = false;
+ }
+
+ void put(K key) {
+ if (!enabled) {
+ return;
+ }
+ getOrCreate(key, false);
+ }
+
+ void putFromPersistenceAndIncrement(K key) {
+ if (!enabled) {
+ return;
+ }
+ getOrCreate(key, true).incrementCount();
+ }
+
+ void increment(K key) {
+ if (!enabled) {
+ return;
+ }
+ getOrCreate(key, false).incrementCount();
+ }
+
+ MetadataEntry remove(Object key) {
+ if (!enabled) {
+ return null;
+ }
+ return metadataMap.remove(key);
+ }
+
+ void putAll(Iterable<?> keys) {
+ if (!enabled) {
+ return;
+ }
+ for (Object k : keys) {
+ getOrCreate((K) k, false);
+ }
+ }
+
+ void incrementAll(Iterable<?> keys) {
+ if (!enabled) {
+ return;
+ }
+ for (Object k : keys) {
+ getOrCreate((K) k, false).incrementCount();
+ }
+ }
+
+ void removeAll(Iterable<?> keys) {
+ if (!enabled) {
+ return;
+ }
+ for (Object k : keys) {
+ metadataMap.remove(k);
+ }
+ }
+
+ void clear() {
+ if (!enabled) {
+ return;
+ }
+ metadataMap.clear();
+ }
+
+ private MetadataEntry getOrCreate(K key, boolean readFromPersistentCache) {
+ if (!enabled) {
+ return null;
+ }
+ MetadataEntry metadata = metadataMap.get(key);
+ if (metadata == null) {
+ MetadataEntry newEntry = new
MetadataEntry(readFromPersistentCache);
+ MetadataEntry oldEntry = metadataMap.putIfAbsent(key, newEntry);
+ metadata = oldEntry == null ? newEntry : oldEntry;
+ }
+ return metadata;
+ }
+
+
+ static class MetadataEntry {
+
+ private final AtomicLong accessCount = new AtomicLong();
+
+ private final boolean readFromPersistentCache;
+
+ private MetadataEntry(boolean readFromPersistentCache) {
+ this.readFromPersistentCache = readFromPersistentCache;
+ }
+
+ void incrementCount() {
+ accessCount.incrementAndGet();
+ }
+
+ long getAccessCount() {
+ return accessCount.get();
+ }
+
+ boolean isReadFromPersistentCache() {
+ return readFromPersistentCache;
+ }
+ }
+
+}
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/MultiGenerationMap.java
Tue Nov 29 09:52:19 2016
@@ -50,15 +50,22 @@ public class MultiGenerationMap<K, V> im
@SuppressWarnings("unchecked")
@Override
public V get(Object key) {
+ ValueWithGenerationInfo<V> value = readValue(key);
+ if (value == null) {
+ return null;
+ } else if (!value.isCurrentGeneration()) {
+ put((K) key, value.value);
+ }
+ return value.getValue();
+ }
+
+ ValueWithGenerationInfo<V> readValue(Object key) {
for (int generation : read.descendingKeySet()) {
CacheMap<K, V> m = read.get(generation);
if (m != null) {
V value = m.get(key);
if (value != null) {
- if (m != write) {
- put((K) key, value);
- }
- return value;
+ return new ValueWithGenerationInfo<V>(value, m == write);
}
}
}
@@ -123,4 +130,23 @@ public class MultiGenerationMap<K, V> im
throw new UnsupportedOperationException();
}
+ static class ValueWithGenerationInfo<V> {
+
+ private final V value;
+
+ private final boolean isCurrentGeneration;
+
+ private ValueWithGenerationInfo(V value, boolean isCurrentGeneration) {
+ this.value = value;
+ this.isCurrentGeneration = isCurrentGeneration;
+ }
+
+ V getValue() {
+ return value;
+ }
+
+ boolean isCurrentGeneration() {
+ return isCurrentGeneration;
+ }
+ }
}
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
Tue Nov 29 09:52:19 2016
@@ -16,9 +16,12 @@
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+import static com.google.common.base.Predicates.in;
+import static com.google.common.base.Predicates.not;
import static com.google.common.cache.RemovalCause.COLLECTED;
import static com.google.common.cache.RemovalCause.EXPIRED;
import static com.google.common.cache.RemovalCause.SIZE;
+import static com.google.common.collect.Iterables.filter;
import static java.util.Collections.singleton;
import java.util.Map;
@@ -42,51 +45,54 @@ import com.google.common.cache.CacheStat
import com.google.common.cache.RemovalCause;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class NodeCache<K, V> implements Cache<K, V>, GenerationCache,
EvictionListener<K, V> {
- private static final Set<RemovalCause> EVICTION_CAUSES =
ImmutableSet.of(COLLECTED, EXPIRED, SIZE);
+ static final Logger LOG = LoggerFactory.getLogger(NodeCache.class);
- /**
- * Whether to use the queue to put items into cache. Default: false (cache
- * will be updated synchronously).
- */
- private static final boolean ASYNC_CACHE =
Boolean.getBoolean("oak.cache.asynchronous");
+ private static final Set<RemovalCause> EVICTION_CAUSES =
ImmutableSet.of(COLLECTED, EXPIRED, SIZE);
private final PersistentCache cache;
private final Cache<K, V> memCache;
private final MultiGenerationMap<K, V> map;
private final CacheType type;
- private final DocumentNodeStore docNodeStore;
- private final DocumentStore docStore;
- private final CacheWriteQueue<K, V> writerQueue;
+ private final DataType keyType;
+ private final DataType valueType;
+ private final CacheMetadata<K> memCacheMetadata;
+ private final boolean async;
+ CacheWriteQueue<K, V> writeQueue;
NodeCache(
PersistentCache cache,
Cache<K, V> memCache,
- DocumentNodeStore docNodeStore,
+ DocumentNodeStore docNodeStore,
DocumentStore docStore,
CacheType type,
- CacheActionDispatcher dispatcher) {
+ CacheActionDispatcher dispatcher,
+ boolean async) {
this.cache = cache;
this.memCache = memCache;
this.type = type;
- this.docNodeStore = docNodeStore;
- this.docStore = docStore;
+ this.async = async;
PersistentCache.LOG.info("wrapping map " + this.type);
map = new MultiGenerationMap<K, V>();
-
- if (ASYNC_CACHE) {
- this.writerQueue = new CacheWriteQueue<K, V>(dispatcher, cache,
map);
+ keyType = new KeyDataType(type);
+ valueType = new ValueDataType(docNodeStore, docStore, type);
+ this.memCacheMetadata = new CacheMetadata<K>();
+ if (async) {
+ this.writeQueue = new CacheWriteQueue<K, V>(dispatcher, cache,
map);
+ LOG.info("The persistent cache {} writes will be asynchronous",
type);
} else {
- this.writerQueue = null;
+ this.writeQueue = null;
+ this.memCacheMetadata.disable();
+ LOG.info("The persistent cache {} writes will be synchronous",
type);
}
}
-
+
@Override
public void addGeneration(int generation, boolean readOnly) {
- DataType keyType = new KeyDataType(type);
- DataType valueType = new ValueDataType(docNodeStore, docStore, type);
MVMap.Builder<K, V> b = new MVMap.Builder<K, V>().
keyType(keyType).valueType(valueType);
String mapName = type.name();
@@ -96,21 +102,40 @@ class NodeCache<K, V> implements Cache<K
map.setWriteMap(m);
}
}
-
+
@Override
public void removeGeneration(int generation) {
map.removeReadMap(generation);
}
-
+
private V readIfPresent(K key) {
- if (ASYNC_CACHE && writerQueue.waitsForInvalidation(key)) {
- return null;
- }
+ return async ? asyncReadIfPresent(key) : syncReadIfPresent(key);
+ }
+
+ private V syncReadIfPresent(K key) {
cache.switchGenerationIfNeeded();
V v = map.get(key);
+ if (v != null) {
+ memCacheMetadata.putFromPersistenceAndIncrement(key);
+ }
return v;
}
+ private V asyncReadIfPresent(K key) {
+ MultiGenerationMap.ValueWithGenerationInfo<V> v =
map.readValue(key);
+ if (v == null) {
+ return null;
+ }
+ if (v.isCurrentGeneration() && !cache.needSwitch()) {
+ // don't persist again on eviction
+ memCacheMetadata.putFromPersistenceAndIncrement(key);
+ } else {
+ // persist again during eviction
+ memCacheMetadata.increment(key);
+ }
+ return v.getValue();
+ }
+
private void write(final K key, final V value) {
cache.switchGenerationIfNeeded();
if (value == null) {
@@ -124,10 +149,15 @@ class NodeCache<K, V> implements Cache<K
@Override
@Nullable
public V getIfPresent(Object key) {
+ memCacheMetadata.increment((K) key);
V value = memCache.getIfPresent(key);
- if (value != null) {
+ if (value == null) {
+ memCacheMetadata.remove(key);
+ } else {
return value;
}
+
+ // it takes care of updating memCacheMetadata
value = readIfPresent((K) key);
if (value != null) {
memCache.put((K) key, value);
@@ -137,15 +167,17 @@ class NodeCache<K, V> implements Cache<K
@Override
public V get(K key,
- Callable<? extends V> valueLoader)
+ Callable<? extends V> valueLoader)
throws ExecutionException {
V value = getIfPresent(key);
if (value != null) {
return value;
}
+
+ memCacheMetadata.increment(key);
value = memCache.get(key, valueLoader);
- if (!ASYNC_CACHE) {
- write(key, value);
+ if (!async) {
+ write((K) key, value);
}
return value;
}
@@ -153,14 +185,19 @@ class NodeCache<K, V> implements Cache<K
@Override
public ImmutableMap<K, V> getAllPresent(
Iterable<?> keys) {
- return memCache.getAllPresent(keys);
+ Iterable<K> typedKeys = (Iterable<K>) keys;
+ memCacheMetadata.incrementAll(keys);
+ ImmutableMap<K, V> result = memCache.getAllPresent(keys);
+ memCacheMetadata.removeAll(filter(typedKeys,
not(in(result.keySet()))));
+ return result;
}
@Override
public void put(K key, V value) {
+ memCacheMetadata.put(key);
memCache.put(key, value);
- if (!ASYNC_CACHE) {
- write(key, value);
+ if (!async) {
+ write((K) key, value);
}
}
@@ -168,8 +205,9 @@ class NodeCache<K, V> implements Cache<K
@Override
public void invalidate(Object key) {
memCache.invalidate(key);
- if (ASYNC_CACHE) {
- writerQueue.addInvalidate(singleton((K) key));
+ memCacheMetadata.remove(key);
+ if (async) {
+ writeQueue.addInvalidate(singleton((K) key));
} else {
write((K) key, null);
}
@@ -177,17 +215,20 @@ class NodeCache<K, V> implements Cache<K
@Override
public void putAll(Map<? extends K, ? extends V> m) {
+ memCacheMetadata.putAll(m.keySet());
memCache.putAll(m);
}
@Override
public void invalidateAll(Iterable<?> keys) {
memCache.invalidateAll(keys);
+ memCacheMetadata.removeAll(keys);
}
@Override
public void invalidateAll() {
memCache.invalidateAll();
+ memCacheMetadata.clear();
map.clear();
}
@@ -209,6 +250,7 @@ class NodeCache<K, V> implements Cache<K
@Override
public void cleanUp() {
memCache.cleanUp();
+ memCacheMetadata.clear();
}
/**
@@ -216,9 +258,18 @@ class NodeCache<K, V> implements Cache<K
*/
@Override
public void evicted(K key, V value, RemovalCause cause) {
- if (ASYNC_CACHE && EVICTION_CAUSES.contains(cause) && value != null) {
- // invalidations are handled separately
- writerQueue.addPut(key, value);
+ if (async && EVICTION_CAUSES.contains(cause) && value != null) {
+ CacheMetadata.MetadataEntry metadata =
memCacheMetadata.remove(key);
+ boolean qualifiesToPersist = true;
+ if (metadata != null && metadata.isReadFromPersistentCache()) {
+ qualifiesToPersist = false;
+ } else if (metadata != null && metadata.getAccessCount() < 1) {
+ qualifiesToPersist = false;
+ }
+
+ if (qualifiesToPersist) {
+ writeQueue.addPut(key, value);
+ }
}
}
-}
\ No newline at end of file
+}
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
Tue Nov 29 09:52:19 2016
@@ -42,7 +42,13 @@ import com.google.common.cache.Cache;
public class PersistentCache {
static final Logger LOG = LoggerFactory.getLogger(PersistentCache.class);
-
+
+ /**
+ * Whether to use the queue to put items into cache. Default: false (cache
+ * will be updated synchronously).
+ */
+ private static final boolean ASYNC_CACHE =
Boolean.parseBoolean(System.getProperty("oak.cache.asynchronous", "false"));
+
private static final String FILE_PREFIX = "cache-";
private static final String FILE_SUFFIX = ".data";
private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -55,6 +61,8 @@ public class PersistentCache {
private boolean cacheDocChildren;
private boolean compactOnClose;
private boolean compress = true;
+ private boolean asyncCache = ASYNC_CACHE;
+ private boolean asyncDiffCache = false;
private ArrayList<GenerationCache> caches =
new ArrayList<GenerationCache>();
@@ -112,6 +120,12 @@ public class PersistentCache {
appendOnly = true;
} else if (p.equals("manualCommit")) {
manualCommit = true;
+ } else if (p.equals("+async")) {
+ asyncCache = true;
+ } else if (p.equals("-async")) {
+ asyncCache = false;
+ } else if (p.equals("+asyncDiff")) {
+ asyncDiffCache = true;
}
}
this.directory = dir;
@@ -325,6 +339,7 @@ public class PersistentCache {
DocumentStore docStore,
Cache<K, V> base, CacheType type) {
boolean wrap;
+ boolean async = asyncCache;
switch (type) {
case NODE:
wrap = cacheNodes;
@@ -334,9 +349,11 @@ public class PersistentCache {
break;
case DIFF:
wrap = cacheDiff;
+ async = asyncDiffCache;
break;
case LOCAL_DIFF:
wrap = cacheLocalDiff;
+ async = asyncDiffCache;
break;
case DOC_CHILDREN:
wrap = cacheDocChildren;
@@ -349,7 +366,7 @@ public class PersistentCache {
break;
}
if (wrap) {
- NodeCache<K, V> c = new NodeCache<K, V>(this, base, docNodeStore,
docStore, type, writeDispatcher);
+ NodeCache<K, V> c = new NodeCache<K, V>(this, base, docNodeStore,
docStore, type, writeDispatcher, async);
initGenerationCache(c);
return c;
}
@@ -408,7 +425,7 @@ public class PersistentCache {
}
}
- private boolean needSwitch() {
+ boolean needSwitch() {
long size = writeStore.getFileSize();
if (size / 1024 / 1024 <= maxSizeMB) {
return false;
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
Tue Nov 29 09:52:19 2016
@@ -29,23 +29,4 @@ interface CacheAction<K, V> {
*/
void execute();
- /**
- * Cancel the action without executing it
- */
- void cancel();
-
- /**
- * Return the keys affected by this action
- *
- * @return keys affected by this action
- */
- Iterable<K> getAffectedKeys();
-
- /**
- * Return the owner of this action
- *
- * @return {@link CacheWriteQueue} executing this action
- */
- CacheWriteQueue<K, V> getOwner();
-
}
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
Tue Nov 29 09:52:19 2016
@@ -16,13 +16,6 @@
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
-import static com.google.common.collect.Multimaps.index;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -30,13 +23,10 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
/**
- * An asynchronous buffer of the CacheAction objects. The buffer removes
- * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than
- * {@link #MAX_SIZE}.
+ * An asynchronous buffer of the CacheAction objects. The buffer only accepts
+ * {@link #MAX_SIZE} number of elements. If the queue is already full, the new
+ * elements are dropped.
*/
public class CacheActionDispatcher implements Runnable {
@@ -45,14 +35,9 @@ public class CacheActionDispatcher imple
/**
* What's the length of the queue.
*/
- static final int MAX_SIZE = 1024;
-
- /**
- * How many actions remove once the queue is longer than {@link #MAX_SIZE}.
- */
- static final int ACTIONS_TO_REMOVE = 256;
+ static final int MAX_SIZE = 16 * 1024;
- final BlockingQueue<CacheAction<?, ?>> queue = new
ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE * 2);
+ final BlockingQueue<CacheAction<?, ?>> queue = new
ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE);
private volatile boolean isRunning = true;
@@ -68,7 +53,6 @@ public class CacheActionDispatcher imple
LOG.debug("Interrupted the queue.poll()", e);
}
}
- applyInvalidateActions();
}
/**
@@ -79,91 +63,11 @@ public class CacheActionDispatcher imple
}
/**
- * Adds the new action and cleans the queue if necessary.
+ * Tries to add new action.
*
* @param action to be added
*/
- synchronized void add(CacheAction<?, ?> action) {
- if (queue.size() >= MAX_SIZE) {
- cleanTheQueue();
- }
- queue.offer(action);
- }
-
- /**
- * Clean the queue and add a single invalidate action for all the removed
entries.
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void cleanTheQueue() {
- List<CacheAction> removed = removeOldest();
- for (Entry<CacheWriteQueue, Collection<CacheAction>> e :
groupByOwner(removed).entrySet()) {
- CacheWriteQueue owner = e.getKey();
- Collection<CacheAction> actions = e.getValue();
- List<Object> affectedKeys = cancelAll(actions);
- owner.addInvalidate(affectedKeys);
- }
- }
-
- /**
- * Remove {@link #ACTIONS_TO_REMOVE} oldest actions.
- *
- * @return A list of removed items.
- */
- @SuppressWarnings("rawtypes")
- private List<CacheAction> removeOldest() {
- List<CacheAction> removed = new ArrayList<CacheAction>();
- while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) {
- CacheAction toBeCanceled = queue.poll();
- if (toBeCanceled == null) {
- break;
- } else {
- removed.add(toBeCanceled);
- }
- }
- return removed;
- }
-
- /**
- * Group passed actions by their owners.
- *
- * @param actions to be grouped
- * @return map in which owner is the key and assigned action list is the
value
- */
- @SuppressWarnings("rawtypes")
- private static Map<CacheWriteQueue, Collection<CacheAction>>
groupByOwner(List<CacheAction> actions) {
- return index(actions, new Function<CacheAction, CacheWriteQueue>() {
- @Override
- public CacheWriteQueue apply(CacheAction input) {
- return input.getOwner();
- }
- }).asMap();
+ boolean add(CacheAction<?, ?> action) {
+ return queue.offer(action);
}
-
- /**
- * Cancel all passed actions.
- *
- * @param actions to cancel
- * @return list of affected keys
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private static List<Object> cancelAll(Collection<CacheAction> actions) {
- List<Object> cancelledKeys = new ArrayList<Object>();
- for (CacheAction action : actions) {
- action.cancel();
- Iterables.addAll(cancelledKeys, action.getAffectedKeys());
- }
- return cancelledKeys;
- }
-
- @SuppressWarnings("rawtypes")
- private void applyInvalidateActions() {
- CacheAction action;
- do {
- action = queue.poll();
- if (action instanceof InvalidateCacheAction) {
- action.execute();
- }
- } while (action != null);
- }
-
-}
\ No newline at end of file
+}
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
Tue Nov 29 09:52:19 2016
@@ -16,21 +16,10 @@
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
+import java.util.Map;
-/**
- * A fronted for the {@link CacheActionDispatcher} creating actions and
maintaining their state.
- *
- * @param <K> key type
- * @param <V> value type
- */
public class CacheWriteQueue<K, V> {
private final CacheActionDispatcher dispatcher;
@@ -39,65 +28,18 @@ public class CacheWriteQueue<K, V> {
private final Map<K, V> map;
- final Multiset<K> queuedKeys = HashMultiset.create();
-
- final Set<K> waitsForInvalidation = new HashSet<K>();
-
public CacheWriteQueue(CacheActionDispatcher dispatcher, PersistentCache
cache, Map<K, V> map) {
this.dispatcher = dispatcher;
this.cache = cache;
this.map = map;
}
- /**
- * Add new invalidate action.
- *
- * @param keys to be invalidated
- */
- public void addInvalidate(Iterable<K> keys) {
- synchronized(this) {
- for (K key : keys) {
- queuedKeys.add(key);
- waitsForInvalidation.add(key);
- }
- }
- dispatcher.add(new InvalidateCacheAction<K, V>(this, keys));
- }
-
- /**
- * Add new put action
- *
- * @param key to be put to cache
- * @param value to be put to cache
- */
- public void addPut(K key, V value) {
- synchronized(this) {
- queuedKeys.add(key);
- waitsForInvalidation.remove(key);
- }
- dispatcher.add(new PutToCacheAction<K, V>(this, key, value));
- }
-
- /**
- * Check if the last action added for this key was invalidate
- *
- * @param key to check
- * @return {@code true} if the last added action was invalidate
- */
- public synchronized boolean waitsForInvalidation(K key) {
- return waitsForInvalidation.contains(key);
+ public boolean addPut(K key, V value) {
+ return dispatcher.add(new PutToCacheAction<K, V>(key, value, this));
}
- /**
- * Remove the action state when it's finished or cancelled.
- *
- * @param key to be removed
- */
- synchronized void remove(K key) {
- queuedKeys.remove(key);
- if (!queuedKeys.contains(key)) {
- waitsForInvalidation.remove(key);
- }
+ public boolean addInvalidate(Iterable<K> keys) {
+ return dispatcher.add(new InvalidateCacheAction<K, V>(keys, this));
}
PersistentCache getCache() {
@@ -107,4 +49,4 @@ public class CacheWriteQueue<K, V> {
Map<K, V> getMap() {
return map;
}
-}
\ No newline at end of file
+}
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
Tue Nov 29 09:52:19 2016
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import java.util.Map;
+import com.google.common.collect.Iterables;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
/**
@@ -32,49 +33,26 @@ class InvalidateCacheAction<K, V> implem
private final Map<K, V> map;
- private final CacheWriteQueue<K, V> owner;
-
private final Iterable<K> keys;
- InvalidateCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, Iterable<K>
keys) {
- this.owner = cacheWriteQueue;
+ InvalidateCacheAction(Iterable<K> keys, CacheWriteQueue<K, V> queue) {
this.keys = keys;
- this.cache = cacheWriteQueue.getCache();
- this.map = cacheWriteQueue.getMap();
+ this.cache = queue.getCache();
+ this.map = queue.getMap();
}
@Override
public void execute() {
- try {
- if (map != null) {
- for (K key : keys) {
- cache.switchGenerationIfNeeded();
- map.remove(key);
- }
+ if (map != null) {
+ for (K key : keys) {
+ cache.switchGenerationIfNeeded();
+ map.remove(key);
}
- } finally {
- decrement();
}
}
@Override
- public void cancel() {
- decrement();
- }
-
- @Override
- public CacheWriteQueue<K, V> getOwner() {
- return owner;
- }
-
- @Override
- public Iterable<K> getAffectedKeys() {
- return keys;
- }
-
- private void decrement() {
- for (K key : keys) {
- owner.remove(key);
- }
+ public String toString() {
+ return new
StringBuilder("InvalidateCacheAction").append(Iterables.toString(keys)).toString();
}
-}
\ No newline at end of file
+}
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
Tue Nov 29 09:52:19 2016
@@ -20,6 +20,7 @@ import static java.util.Collections.sing
import java.util.Map;
+import com.google.common.collect.Iterables;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
/**
@@ -34,48 +35,27 @@ class PutToCacheAction<K, V> implements
private final Map<K, V> map;
- private final CacheWriteQueue<K, V> owner;
-
private final K key;
private final V value;
- PutToCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, K key, V value) {
- this.owner = cacheWriteQueue;
+ PutToCacheAction(K key, V value, CacheWriteQueue<K, V> queue) {
this.key = key;
this.value = value;
- this.cache = cacheWriteQueue.getCache();
- this.map = cacheWriteQueue.getMap();
+ this.cache = queue.getCache();
+ this.map = queue.getMap();
}
@Override
public void execute() {
- try {
- if (map != null) {
- cache.switchGenerationIfNeeded();
- map.put(key, value);
- }
- } finally {
- decrement();
+ if (map != null) {
+ cache.switchGenerationIfNeeded();
+ map.put(key, value);
}
}
@Override
- public void cancel() {
- decrement();
- }
-
- @Override
- public CacheWriteQueue<K, V> getOwner() {
- return owner;
- }
-
- @Override
- public Iterable<K> getAffectedKeys() {
- return singleton(key);
- }
-
- private void decrement() {
- owner.remove(key);
+ public String toString() {
+ return new
StringBuilder("PutToCacheAction[").append(key).append(']').toString();
}
-}
\ No newline at end of file
+}
Added:
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java?rev=1771871&view=auto
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
(added)
+++
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
Tue Nov 29 09:52:19 2016
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+
+import com.google.common.cache.RemovalCause;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider;
+import org.apache.jackrabbit.oak.plugins.document.PathRev;
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
+import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+
+public class AsyncQueueTest {
+
+ @Rule
+ public DocumentMKBuilderProvider builderProvider = new
DocumentMKBuilderProvider();
+
+ private static final StringValue VAL = new StringValue("xyz");
+
+ private PersistentCache pCache;
+
+ private List<PathRev> putActions;
+
+ private List<PathRev> invalidateActions;
+
+ private NodeCache<PathRev, StringValue> nodeCache;
+
+ private int id;
+
+ @Before
+ public void setup() throws IOException {
+ FileUtils.deleteDirectory(new File("target/cacheTest"));
+ pCache = new PersistentCache("target/cacheTest,+async");
+ final AtomicReference<NodeCache<PathRev, StringValue>> nodeCacheRef =
new AtomicReference<NodeCache<PathRev, StringValue>>();
+ CacheLIRS<PathRev, StringValue> cache = new CacheLIRS.Builder<PathRev,
StringValue>().maximumSize(1).evictionCallback(new
CacheLIRS.EvictionCallback<PathRev, StringValue>() {
+ @Override
+ public void evicted(@Nonnull PathRev key, @Nullable StringValue
value, @Nonnull RemovalCause cause) {
+ if (nodeCacheRef.get() != null) {
+ nodeCacheRef.get().evicted(key, value, cause);
+ }
+ }
+ }).build();
+ nodeCache = (NodeCache<PathRev, StringValue>)
pCache.wrap(builderProvider.newBuilder().getNodeStore(),
+ null, cache, CacheType.NODE);
+ nodeCacheRef.set(nodeCache);
+
+ CacheWriteQueueWrapper writeQueue = new
CacheWriteQueueWrapper(nodeCache.writeQueue);
+ nodeCache.writeQueue = writeQueue;
+
+ this.putActions = writeQueue.putActions;
+ this.invalidateActions = writeQueue.invalidateActions;
+ this.id = 0;
+ }
+
+ @After
+ public void teardown() {
+ if (pCache != null) {
+ pCache.close();
+ }
+ }
+
+ @Test
+ public void unusedItemsShouldntBePersisted() {
+ PathRev k = generatePathRev();
+ nodeCache.put(k, VAL);
+ flush();
+ assertEquals(emptyList(), putActions);
+ }
+
+ @Test
+ public void readItemsShouldntBePersistedAgain() {
+ PathRev k = generatePathRev();
+ nodeCache.put(k, VAL);
+ nodeCache.getIfPresent(k);
+ flush();
+ assertEquals(asList(k), putActions);
+
+ putActions.clear();
+ nodeCache.getIfPresent(k); // k should be loaded from persisted cache
+ flush();
+ assertEquals(emptyList(), putActions); // k is not persisted again
+ }
+
+ @Test
+ public void usedItemsShouldBePersisted() {
+ PathRev k = generatePathRev();
+ nodeCache.put(k, VAL);
+ nodeCache.getIfPresent(k);
+ flush();
+ assertEquals(asList(k), putActions);
+ }
+
+ private PathRev generatePathRev() {
+ return new PathRev("/" + id++, new Revision(0, 0, 0));
+ }
+
+ private void flush() {
+ for (int i = 0; i < 1024; i++) {
+ nodeCache.put(generatePathRev(), VAL); // cause eviction of k
+ }
+ }
+
+ private static class CacheWriteQueueWrapper extends
CacheWriteQueue<PathRev, StringValue> {
+
+ private final CacheWriteQueue<PathRev, StringValue> wrapped;
+
+ private final List<PathRev> putActions = newArrayList();
+
+ private final List<PathRev> invalidateActions = newArrayList();
+
+ public CacheWriteQueueWrapper(CacheWriteQueue<PathRev, StringValue>
wrapped) {
+ super(null, null, null);
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public boolean addPut(PathRev key, StringValue value) {
+ putActions.add(key);
+ return wrapped.addPut(key, value);
+ }
+
+ public boolean addInvalidate(Iterable<PathRev> keys) {
+ invalidateActions.addAll(newArrayList(keys));
+ return wrapped.addInvalidate(keys);
+ }
+ }
+
+}
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java?rev=1771871&r1=1771870&r2=1771871&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
Tue Nov 29 09:52:19 2016
@@ -18,33 +18,21 @@
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
-import static com.google.common.collect.ImmutableSet.of;
-import static com.google.common.collect.Iterables.size;
import static java.lang.String.valueOf;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
-import static
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE;
import static
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
-import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction;
-import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
-import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
-import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.InvalidateCacheAction;
import org.junit.Test;
import org.mockito.Mockito;
@@ -59,17 +47,8 @@ public class CacheActionDispatcherTest {
for (int i = 0; i < MAX_SIZE + 10; i++) {
dispatcher.add(createWriteAction(valueOf(i), queue));
}
- assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1,
dispatcher.queue.size());
- assertEquals(valueOf(ACTIONS_TO_REMOVE),
dispatcher.queue.peek().toString());
-
- InvalidateCacheAction<?, ?> invalidateAction = null;
- for (CacheAction<?, ?> action : dispatcher.queue) {
- if (action instanceof InvalidateCacheAction) {
- invalidateAction = (InvalidateCacheAction<?, ?>) action;
- }
- }
- assertNotNull(invalidateAction);
- assertEquals(ACTIONS_TO_REMOVE,
size(invalidateAction.getAffectedKeys()));
+ assertEquals(MAX_SIZE, dispatcher.queue.size());
+ assertEquals("0", dispatcher.queue.peek().toString());
}
@Test
@@ -128,31 +107,6 @@ public class CacheActionDispatcherTest {
assertFalse(queueThread.isAlive());
}
- @Test
- public void testExecuteInvalidatesOnShutdown() throws InterruptedException
{
- Map<String, Object> cacheMap = new HashMap<String, Object>();
- CacheActionDispatcher dispatcher = new CacheActionDispatcher();
- CacheWriteQueue<String, Object> queue = new CacheWriteQueue<String,
Object>(dispatcher,
- Mockito.mock(PersistentCache.class), cacheMap);
- Thread queueThread = new Thread(dispatcher);
- queueThread.start();
-
- cacheMap.put("2", new Object());
- cacheMap.put("3", new Object());
- cacheMap.put("4", new Object());
- dispatcher.add(new DummyCacheWriteAction("1", queue, 100));
- dispatcher.add(new InvalidateCacheAction<String, Object>(queue,
Collections.singleton("2")));
- dispatcher.add(new InvalidateCacheAction<String, Object>(queue,
Collections.singleton("3")));
- dispatcher.add(new InvalidateCacheAction<String, Object>(queue,
Collections.singleton("4")));
- Thread.sleep(10); // make sure the first action started
-
- dispatcher.stop();
- assertEquals(of("2", "3", "4"), cacheMap.keySet());
-
- queueThread.join();
- assertTrue(cacheMap.isEmpty());
- }
-
private DummyCacheWriteAction createWriteAction(String id,
CacheWriteQueue<String, Object> queue) {
return new DummyCacheWriteAction(id, queue);
}
@@ -188,22 +142,9 @@ public class CacheActionDispatcherTest {
}
@Override
- public void cancel() {
- }
-
- @Override
public String toString() {
return id;
}
- @Override
- public Iterable<String> getAffectedKeys() {
- return Collections.singleton(id);
- }
-
- @Override
- public CacheWriteQueue<String, Object> getOwner() {
- return queue;
- }
}
}