Author: thomasm
Date: Tue Feb 16 10:39:18 2016
New Revision: 1730650
URL: http://svn.apache.org/viewvc?rev=1730650&view=rev
Log:
OAK-2761 Persistent cache: add data in a different thread
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1730650&r1=1730649&r2=1730650&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
Tue Feb 16 10:39:18 2016
@@ -17,11 +17,17 @@
package org.apache.jackrabbit.oak.plugins.document;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.cache.RemovalCause.COLLECTED;
+import static com.google.common.cache.RemovalCause.EXPIRED;
+import static com.google.common.cache.RemovalCause.SIZE;
import java.io.InputStream;
+import java.lang.ref.Reference;
import java.net.UnknownHostException;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -32,12 +38,16 @@ import javax.sql.DataSource;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.common.cache.Weigher;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.mongodb.DB;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.cache.CacheLIRS.EvictionCallback;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.cache.CacheValue;
import org.apache.jackrabbit.oak.cache.EmpiricalWeigher;
@@ -59,6 +69,7 @@ import org.apache.jackrabbit.oak.plugins
import
org.apache.jackrabbit.oak.plugins.document.mongo.MongoMissingLastRevSeeker;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoVersionGCSupport;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.EvictionListener;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
import org.apache.jackrabbit.oak.plugins.document.rdb.RDBBlobStore;
import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
@@ -1008,19 +1019,24 @@ public class DocumentMK {
return new NodeDocumentCache(nodeDocumentsCache,
nodeDocumentsCacheStats, prevDocumentsCache, prevDocumentsCacheStats, locks);
}
+ @SuppressWarnings("unchecked")
private <K extends CacheValue, V extends CacheValue> Cache<K, V>
buildCache(
CacheType cacheType,
long maxWeight,
DocumentNodeStore docNodeStore,
DocumentStore docStore
) {
- Cache<K, V> cache = buildCache(cacheType.name(), maxWeight);
+ Set<EvictionListener<K, V>> listeners = new
CopyOnWriteArraySet<EvictionListener<K,V>>();
+ Cache<K, V> cache = buildCache(cacheType.name(), maxWeight,
listeners);
PersistentCache p = getPersistentCache();
if (p != null) {
if (docNodeStore != null) {
docNodeStore.setPersistentCache(p);
}
cache = p.wrap(docNodeStore, docStore, cache, cacheType);
+ if (cache instanceof EvictionListener) {
+ listeners.add((EvictionListener<K, V>) cache);
+ }
}
return cache;
}
@@ -1042,7 +1058,8 @@ public class DocumentMK {
private <K extends CacheValue, V extends CacheValue> Cache<K, V>
buildCache(
String module,
- long maxWeight) {
+ long maxWeight,
+ final Set<EvictionListener<K, V>> listeners) {
// by default, use the LIRS cache when using the persistent cache,
// but don't use it otherwise
boolean useLirs = persistentCacheURI != null;
@@ -1064,6 +1081,14 @@ public class DocumentMK {
segmentCount(cacheSegmentCount).
stackMoveDistance(cacheStackMoveDistance).
recordStats().
+ evictionCallback(new EvictionCallback<K, V>() {
+ @Override
+ public void evicted(K key, V value, RemovalCause
cause) {
+ for (EvictionListener<K, V> l : listeners) {
+ l.evicted(key, value, cause);
+ }
+ }
+ }).
build();
}
return CacheBuilder.newBuilder().
@@ -1071,6 +1096,14 @@ public class DocumentMK {
weigher(weigher).
maximumWeight(maxWeight).
recordStats().
+ removalListener(new RemovalListener<K, V>() {
+ @Override
+ public void onRemoval(RemovalNotification<K, V>
notification) {
+ for (EvictionListener<K, V> l : listeners) {
+ l.evicted(notification.getKey(),
notification.getValue(), notification.getCause());
+ }
+ }
+ }).
build();
}
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java?rev=1730650&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java
Tue Feb 16 10:39:18 2016
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+
+import com.google.common.cache.RemovalCause;
+
+/**
+ * A listener that gets notified of entries that were removed from the cache.
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface EvictionListener<K, V> {
+
+ void evicted(K key, V value, RemovalCause removalCause);
+
+}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java?rev=1730650&r1=1730649&r2=1730650&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
Tue Feb 16 10:39:18 2016
@@ -16,8 +16,14 @@
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+import static com.google.common.cache.RemovalCause.COLLECTED;
+import static com.google.common.cache.RemovalCause.EXPIRED;
+import static com.google.common.cache.RemovalCause.SIZE;
+import static java.util.Collections.singleton;
+
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -27,6 +33,8 @@ import javax.annotation.Nullable;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache.GenerationCache;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.DataType;
@@ -34,22 +42,29 @@ import org.h2.mvstore.type.DataType;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheStats;
+import com.google.common.cache.RemovalCause;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+class NodeCache<K, V> implements Cache<K, V>, GenerationCache,
EvictionListener<K, V> {
+
+ private static final Set<RemovalCause> EVICTION_CAUSES =
ImmutableSet.of(COLLECTED, EXPIRED, SIZE);
-class NodeCache<K, V> implements Cache<K, V>, GenerationCache {
-
private final PersistentCache cache;
private final Cache<K, V> memCache;
private final MultiGenerationMap<K, V> map;
private final CacheType type;
private final DataType keyType;
private final DataType valueType;
-
+ private final CacheWriteQueue<K, V> writerQueue;
+
NodeCache(
PersistentCache cache,
Cache<K, V> memCache,
DocumentNodeStore docNodeStore,
- DocumentStore docStore, CacheType type) {
+ DocumentStore docStore,
+ CacheType type,
+ CacheActionDispatcher dispatcher) {
this.cache = cache;
this.memCache = memCache;
this.type = type;
@@ -57,6 +72,7 @@ class NodeCache<K, V> implements Cache<K
map = new MultiGenerationMap<K, V>();
keyType = new KeyDataType(type);
valueType = new ValueDataType(docNodeStore, docStore, type);
+ this.writerQueue = new CacheWriteQueue<K, V>(dispatcher, cache, map);
}
@Override
@@ -82,47 +98,31 @@ class NodeCache<K, V> implements Cache<K
}
private V readIfPresent(K key) {
+ if (writerQueue.waitsForInvalidation(key)) {
+ return null;
+ }
cache.switchGenerationIfNeeded();
V v = map.get(key);
return v;
}
-
- private void write(final K key, final V value) {
- write(key, value, true);
- }
- private void writeWithoutBroadcast(final K key, final V value) {
- write(key, value, false);
- }
-
- private void write(final K key, final V value, boolean broadcast) {
- cache.switchGenerationIfNeeded();
- if (broadcast) {
- cache.broadcast(type, new Function<WriteBuffer, Void>() {
- @Override
- @Nullable
- public Void apply(@Nullable WriteBuffer buffer) {
- keyType.write(buffer, key);
- if (value == null) {
- buffer.put((byte) 0);
- } else {
- buffer.put((byte) 1);
- valueType.write(buffer, value);
- }
- return null;
+ private void broadcast(final K key, final V value) {
+ cache.broadcast(type, new Function<WriteBuffer, Void>() {
+ @Override
+ @Nullable
+ public Void apply(@Nullable WriteBuffer buffer) {
+ keyType.write(buffer, key);
+ if (value == null) {
+ buffer.put((byte) 0);
+ } else {
+ buffer.put((byte) 1);
+ valueType.write(buffer, value);
}
- });
- }
- MultiGenerationMap<K, V> m = map;
- if (m != null) {
- if (value == null) {
- m.remove(key);
- } else {
- m.put(key, value);
+ return null;
}
- }
+ });
}
-
+
@SuppressWarnings("unchecked")
@Override
@Nullable
@@ -147,7 +147,7 @@ class NodeCache<K, V> implements Cache<K
return value;
}
value = memCache.get(key, valueLoader);
- write(key, value);
+ broadcast(key, value);
return value;
}
@@ -160,14 +160,15 @@ class NodeCache<K, V> implements Cache<K
@Override
public void put(K key, V value) {
memCache.put(key, value);
- write(key, value);
+ broadcast(key, value);
}
@SuppressWarnings("unchecked")
@Override
public void invalidate(Object key) {
memCache.invalidate(key);
- write((K) key, (V) null);
+ writerQueue.addInvalidate(singleton((K) key));
+ broadcast((K) key, null);
}
@Override
@@ -218,7 +219,17 @@ class NodeCache<K, V> implements Cache<K
value = (V) valueType.read(buff);
memCache.put(key, value);
}
- writeWithoutBroadcast(key, value);
+ }
+
+ /**
+ * Invoked on the eviction from the {@link #memCache}
+ */
+ @Override
+ public void evicted(K key, V value, RemovalCause cause) {
+ if (EVICTION_CAUSES.contains(cause) && value != null) {
+ // invalidations are handled separately
+ writerQueue.addPut(key, value);
+ }
}
}
\ No newline at end of file
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1730650&r1=1730649&r2=1730650&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
Tue Feb 16 10:39:18 2016
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
@@ -82,6 +83,8 @@ public class PersistentCache implements
private ThreadLocal<WriteBuffer> writeBuffer = new
ThreadLocal<WriteBuffer>();
private final byte[] broadcastId;
private DynamicBroadcastConfig broadcastConfig;
+ private CacheActionDispatcher writeDispatcher;
+ private Thread writeDispatcherThread;
{
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
@@ -192,6 +195,11 @@ public class PersistentCache implements
}
writeStore = createMapFactory(writeGeneration, false);
initBroadcast(broadcast);
+
+ writeDispatcher = new CacheActionDispatcher();
+ writeDispatcherThread = new Thread(writeDispatcher, "Oak
CacheWriteQueue");
+ writeDispatcherThread.setDaemon(true);
+ writeDispatcherThread.start();
}
private void initBroadcast(String broadcast) {
@@ -338,6 +346,13 @@ public class PersistentCache implements
}
public void close() {
+ writeDispatcher.stop();
+ try {
+ writeDispatcherThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("Can't join the {}", writeDispatcherThread.getName(), e);
+ }
+
if (writeStore != null) {
writeStore.closeStore();
}
@@ -395,7 +410,7 @@ public class PersistentCache implements
}
if (wrap) {
NodeCache<K, V> c = new NodeCache<K, V>(this,
- base, docNodeStore, docStore, type);
+ base, docNodeStore, docStore, type, writeDispatcher);
initGenerationCache(c);
return c;
}
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java?rev=1730650&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
Tue Feb 16 10:39:18 2016
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+/**
+ * Object represents an action on the cache (eg. put or invalidate).
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+interface CacheAction<K, V> {
+
+ /**
+ * Execute the action
+ */
+ void execute();
+
+ /**
+ * Cancel the action without executing it
+ */
+ void cancel();
+
+ /**
+ * Return the keys affected by this action
+ *
+ * @return keys affected by this action
+ */
+ Iterable<K> getAffectedKeys();
+
+ /**
+ * Return the owner of this action
+ *
+ * @return {@link CacheWriteQueue} executing this action
+ */
+ CacheWriteQueue<K, V> getOwner();
+
+}
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java?rev=1730650&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
Tue Feb 16 10:39:18 2016
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import static com.google.common.collect.Multimaps.index;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+/**
+ * An asynchronous buffer of the CacheAction objects. The buffer removes
+ * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than
+ * {@link #MAX_SIZE}.
+ */
+public class CacheActionDispatcher implements Runnable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CacheActionDispatcher.class);
+
+ /**
+ * What's the length of the queue.
+ */
+ static final int MAX_SIZE = 1024;
+
+ /**
+ * How many actions remove once the queue is longer than {@link #MAX_SIZE}.
+ */
+ static final int ACTIONS_TO_REMOVE = 256;
+
+ final BlockingQueue<CacheAction<?, ?>> queue = new
ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE * 2);
+
+ private volatile boolean isRunning = true;
+
+ @Override
+ public void run() {
+ while (isRunning) {
+ try {
+ CacheAction<?, ?> action = queue.poll(10,
TimeUnit.MILLISECONDS);
+ if (action != null && isRunning) {
+ action.execute();
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted the queue.poll()", e);
+ }
+ }
+ applyInvalidateActions();
+ }
+
+ /**
+ * Stop the processing.
+ */
+ public void stop() {
+ isRunning = false;
+ }
+
+ /**
+ * Adds the new action and cleans the queue if necessary.
+ *
+ * @param action to be added
+ */
+ synchronized void add(CacheAction<?, ?> action) {
+ if (queue.size() >= MAX_SIZE) {
+ cleanTheQueue();
+ }
+ queue.offer(action);
+ }
+
+ /**
+ * Clean the queue and add a single invalidate action for all the removed
entries.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void cleanTheQueue() {
+ List<CacheAction> removed = removeOldest();
+ for (Entry<CacheWriteQueue, Collection<CacheAction>> e :
groupByOwner(removed).entrySet()) {
+ CacheWriteQueue owner = e.getKey();
+ Collection<CacheAction> actions = e.getValue();
+ List<Object> affectedKeys = cancelAll(actions);
+ owner.addInvalidate(affectedKeys);
+ }
+ }
+
+ /**
+ * Remove {@link #ACTIONS_TO_REMOVE} oldest actions.
+ *
+ * @return A list of removed items.
+ */
+ @SuppressWarnings("rawtypes")
+ private List<CacheAction> removeOldest() {
+ List<CacheAction> removed = new ArrayList<CacheAction>();
+ while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) {
+ CacheAction toBeCanceled = queue.poll();
+ if (toBeCanceled == null) {
+ break;
+ } else {
+ removed.add(toBeCanceled);
+ }
+ }
+ return removed;
+ }
+
+ /**
+ * Group passed actions by their owners.
+ *
+ * @param actions to be grouped
+ * @return map in which owner is the key and assigned action list is the
value
+ */
+ @SuppressWarnings("rawtypes")
+ private static Map<CacheWriteQueue, Collection<CacheAction>>
groupByOwner(List<CacheAction> actions) {
+ return index(actions, new Function<CacheAction, CacheWriteQueue>() {
+ @Override
+ public CacheWriteQueue apply(CacheAction input) {
+ return input.getOwner();
+ }
+ }).asMap();
+ }
+
+ /**
+ * Cancel all passed actions.
+ *
+ * @param actions to cancel
+ * @return list of affected keys
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private static List<Object> cancelAll(Collection<CacheAction> actions) {
+ List<Object> cancelledKeys = new ArrayList<Object>();
+ for (CacheAction action : actions) {
+ action.cancel();
+ Iterables.addAll(cancelledKeys, action.getAffectedKeys());
+ }
+ return cancelledKeys;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void applyInvalidateActions() {
+ CacheAction action;
+ do {
+ action = queue.poll();
+ if (action instanceof InvalidateCacheAction) {
+ action.execute();
+ }
+ } while (action != null);
+ }
+
+}
\ No newline at end of file
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java?rev=1730650&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
Tue Feb 16 10:39:18 2016
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+
+/**
+ * A fronted for the {@link CacheActionDispatcher} creating actions and
maintaining their state.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class CacheWriteQueue<K, V> {
+
+ private final CacheActionDispatcher dispatcher;
+
+ private final PersistentCache cache;
+
+ private final Map<K, V> map;
+
+ final Multiset<K> queuedKeys = HashMultiset.create();
+
+ final Set<K> waitsForInvalidation = new HashSet<K>();
+
+ public CacheWriteQueue(CacheActionDispatcher dispatcher, PersistentCache
cache, Map<K, V> map) {
+ this.dispatcher = dispatcher;
+ this.cache = cache;
+ this.map = map;
+ }
+
+ /**
+ * Add new invalidate action.
+ *
+ * @param keys to be invalidated
+ */
+ public void addInvalidate(Iterable<K> keys) {
+ synchronized(this) {
+ for (K key : keys) {
+ queuedKeys.add(key);
+ waitsForInvalidation.add(key);
+ }
+ }
+ dispatcher.add(new InvalidateCacheAction<K, V>(this, keys));
+ }
+
+ /**
+ * Add new put action
+ *
+ * @param key to be put to cache
+ * @param value to be put to cache
+ */
+ public void addPut(K key, V value) {
+ synchronized(this) {
+ queuedKeys.add(key);
+ waitsForInvalidation.remove(key);
+ }
+ dispatcher.add(new PutToCacheAction<K, V>(this, key, value));
+ }
+
+ /**
+ * Check if the last action added for this key was invalidate
+ *
+ * @param key to check
+ * @return {@code true} if the last added action was invalidate
+ */
+ public synchronized boolean waitsForInvalidation(K key) {
+ return waitsForInvalidation.contains(key);
+ }
+
+ /**
+ * Remove the action state when it's finished or cancelled.
+ *
+ * @param key to be removed
+ */
+ synchronized void remove(K key) {
+ queuedKeys.remove(key);
+ if (!queuedKeys.contains(key)) {
+ waitsForInvalidation.remove(key);
+ }
+ }
+
+ PersistentCache getCache() {
+ return cache;
+ }
+
+ Map<K, V> getMap() {
+ return map;
+ }
+}
\ No newline at end of file
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java?rev=1730650&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
Tue Feb 16 10:39:18 2016
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import java.util.Map;
+
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+
+/**
+ * An invalidate cache action.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+class InvalidateCacheAction<K, V> implements CacheAction<K, V> {
+
+ private final PersistentCache cache;
+
+ private final Map<K, V> map;
+
+ private final CacheWriteQueue<K, V> owner;
+
+ private final Iterable<K> keys;
+
+ InvalidateCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, Iterable<K>
keys) {
+ this.owner = cacheWriteQueue;
+ this.keys = keys;
+ this.cache = cacheWriteQueue.getCache();
+ this.map = cacheWriteQueue.getMap();
+ }
+
+ @Override
+ public void execute() {
+ try {
+ if (map != null) {
+ for (K key : keys) {
+ cache.switchGenerationIfNeeded();
+ map.remove(key);
+ }
+ }
+ } finally {
+ decrement();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ decrement();
+ }
+
+ @Override
+ public CacheWriteQueue<K, V> getOwner() {
+ return owner;
+ }
+
+ @Override
+ public Iterable<K> getAffectedKeys() {
+ return keys;
+ }
+
+ private void decrement() {
+ for (K key : keys) {
+ owner.remove(key);
+ }
+ }
+}
\ No newline at end of file
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java?rev=1730650&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
Tue Feb 16 10:39:18 2016
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import static java.util.Collections.singleton;
+
+import java.util.Map;
+
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+
+/**
+ * Put to cache action
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+class PutToCacheAction<K, V> implements CacheAction<K, V> {
+
+ private final PersistentCache cache;
+
+ private final Map<K, V> map;
+
+ private final CacheWriteQueue<K, V> owner;
+
+ private final K key;
+
+ private final V value;
+
+ PutToCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, K key, V value) {
+ this.owner = cacheWriteQueue;
+ this.key = key;
+ this.value = value;
+ this.cache = cacheWriteQueue.getCache();
+ this.map = cacheWriteQueue.getMap();
+ }
+
+ @Override
+ public void execute() {
+ try {
+ if (map != null) {
+ cache.switchGenerationIfNeeded();
+ map.put(key, value);
+ }
+ } finally {
+ decrement();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ decrement();
+ }
+
+ @Override
+ public CacheWriteQueue<K, V> getOwner() {
+ return owner;
+ }
+
+ @Override
+ public Iterable<K> getAffectedKeys() {
+ return singleton(key);
+ }
+
+ private void decrement() {
+ owner.remove(key);
+ }
+}
\ No newline at end of file
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java?rev=1730650&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java
Tue Feb 16 10:39:18 2016
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.io.File;
+
+import com.google.common.cache.Cache;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNull;
+
+public class AsyncCacheTest {
+
+ @Test
+ public void invalidateWhileInQueue() throws Exception {
+ FileUtils.deleteDirectory(new File("target/cacheTest"));
+ DocumentMK.Builder builder = new DocumentMK.Builder();
+ builder.setPersistentCache("target/cacheTest");
+ Cache<PathRev, DocumentNodeState.Children> cache =
builder.buildChildrenCache();
+ DocumentNodeState.Children c = new DocumentNodeState.Children();
+ for (int i = 0; i < 100; i++) {
+ c.children.add("node-" + i);
+ }
+ PathRev key = null;
+ for (int i = 0; i < 1000; i++) {
+ key = new PathRev("/foo/bar", new RevisionVector(new Revision(i,
0, 1)));
+ cache.put(key, c);
+ }
+ cache.invalidate(key);
+ // give the write queue some time to write back entries
+ Thread.sleep(200);
+ assertNull(cache.getIfPresent(key));
+ builder.getPersistentCache().close();
+ }
+}
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java?rev=1730650&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
Tue Feb 16 10:39:18 2016
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import static com.google.common.collect.ImmutableSet.of;
+import static com.google.common.collect.Iterables.size;
+import static java.lang.String.valueOf;
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+import static
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE;
+import static
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.InvalidateCacheAction;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class CacheActionDispatcherTest {
+
+ @Test
+ public void testMaxQueueSize() {
+ CacheActionDispatcher dispatcher = new CacheActionDispatcher();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ CacheWriteQueue<String, Object> queue = new
CacheWriteQueue(dispatcher, mock(PersistentCache.class), null);
+
+ for (int i = 0; i < MAX_SIZE + 10; i++) {
+ dispatcher.add(createWriteAction(valueOf(i), queue));
+ }
+ assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1,
dispatcher.queue.size());
+ assertEquals(valueOf(ACTIONS_TO_REMOVE),
dispatcher.queue.peek().toString());
+
+ InvalidateCacheAction<?, ?> invalidateAction = null;
+ for (CacheAction<?, ?> action : dispatcher.queue) {
+ if (action instanceof InvalidateCacheAction) {
+ invalidateAction = (InvalidateCacheAction<?, ?>) action;
+ }
+ }
+ assertNotNull(invalidateAction);
+ assertEquals(ACTIONS_TO_REMOVE,
size(invalidateAction.getAffectedKeys()));
+ }
+
+ @Test
+ public void testQueue() throws InterruptedException {
+ int threads = 5;
+ int actionsPerThread = 100;
+
+ @SuppressWarnings("unchecked")
+ CacheWriteQueue<String, Object> queue =
Mockito.mock(CacheWriteQueue.class);
+ final CacheActionDispatcher dispatcher = new CacheActionDispatcher();
+ Thread queueThread = new Thread(dispatcher);
+ queueThread.start();
+
+ List<DummyCacheWriteAction> allActions = new
ArrayList<DummyCacheWriteAction>();
+ List<Thread> producerThreads = new ArrayList<Thread>();
+ for (int i = 0; i < threads; i++) {
+ final List<DummyCacheWriteAction> threadActions = new
ArrayList<DummyCacheWriteAction>();
+ for (int j = 0; j < actionsPerThread; j++) {
+ DummyCacheWriteAction action = new
DummyCacheWriteAction(String.format("%d_%d", i, j), queue);
+ threadActions.add(action);
+ allActions.add(action);
+ }
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (DummyCacheWriteAction a : threadActions) {
+ dispatcher.add(a);
+ }
+ }
+ });
+ producerThreads.add(t);
+ }
+
+ for (Thread t : producerThreads) {
+ t.start();
+ }
+ for (Thread t : producerThreads) {
+ t.join();
+ }
+
+ long start = currentTimeMillis();
+ while (!allActions.isEmpty()) {
+ Iterator<DummyCacheWriteAction> it = allActions.iterator();
+ while (it.hasNext()) {
+ if (it.next().finished) {
+ it.remove();
+ }
+ }
+ if (currentTimeMillis() - start > 10000) {
+ fail("Following actions hasn't been executed: " + allActions);
+ }
+ }
+
+ dispatcher.stop();
+ queueThread.join();
+ assertFalse(queueThread.isAlive());
+ }
+
+ @Test
+ public void testExecuteInvalidatesOnShutdown() throws InterruptedException
{
+ Map<String, Object> cacheMap = new HashMap<String, Object>();
+ CacheActionDispatcher dispatcher = new CacheActionDispatcher();
+ CacheWriteQueue<String, Object> queue = new CacheWriteQueue<String,
Object>(dispatcher,
+ Mockito.mock(PersistentCache.class), cacheMap);
+ Thread queueThread = new Thread(dispatcher);
+ queueThread.start();
+
+ cacheMap.put("2", new Object());
+ cacheMap.put("3", new Object());
+ cacheMap.put("4", new Object());
+ dispatcher.add(new DummyCacheWriteAction("1", queue, 100));
+ dispatcher.add(new InvalidateCacheAction<String, Object>(queue,
Collections.singleton("2")));
+ dispatcher.add(new InvalidateCacheAction<String, Object>(queue,
Collections.singleton("3")));
+ dispatcher.add(new InvalidateCacheAction<String, Object>(queue,
Collections.singleton("4")));
+ Thread.sleep(10); // make sure the first action started
+
+ dispatcher.stop();
+ assertEquals(of("2", "3", "4"), cacheMap.keySet());
+
+ queueThread.join();
+ assertTrue(cacheMap.isEmpty());
+ }
+
+ private DummyCacheWriteAction createWriteAction(String id,
CacheWriteQueue<String, Object> queue) {
+ return new DummyCacheWriteAction(id, queue);
+ }
+
+ private class DummyCacheWriteAction implements CacheAction<String, Object>
{
+
+ private final CacheWriteQueue<String, Object> queue;
+
+ private final String id;
+
+ private final long delay;
+
+ private volatile boolean finished;
+
+ private DummyCacheWriteAction(String id, CacheWriteQueue<String,
Object> queue) {
+ this(id, queue, new Random().nextInt(10));
+ }
+
+ private DummyCacheWriteAction(String id, CacheWriteQueue<String,
Object> queue, long delay) {
+ this.queue = queue;
+ this.id = id;
+ this.delay = delay;
+ }
+
+ @Override
+ public void execute() {
+ try {
+ sleep(delay);
+ } catch (InterruptedException e) {
+ fail("Interrupted");
+ }
+ finished = true;
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+
+ @Override
+ public Iterable<String> getAffectedKeys() {
+ return Collections.singleton(id);
+ }
+
+ @Override
+ public CacheWriteQueue<String, Object> getOwner() {
+ return queue;
+ }
+ }
+}
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java?rev=1730650&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
Tue Feb 16 10:39:18 2016
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
+import
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class CacheWriteQueueTest {
+
+ private CacheWriteQueue<String, Object> queue;
+
+ @SuppressWarnings("rawtypes")
+ private List<CacheAction> actions = Collections.synchronizedList(new
ArrayList<CacheAction>());
+
+ @Before
+ public void initQueue() {
+ actions.clear();
+
+ CacheActionDispatcher dispatcher = new CacheActionDispatcher() {
+ public void add(CacheAction<?, ?> action) {
+ actions.add(action);
+ }
+ };
+
+ PersistentCache cache = Mockito.mock(PersistentCache.class);
+ queue = new CacheWriteQueue<String, Object>(dispatcher, cache, null);
+ }
+
+ @Test
+ public void testCounters() throws InterruptedException {
+ final int threadCount = 10;
+ final int actionsPerThread = 50;
+
+ final Map<String, AtomicInteger> counters = new HashMap<String,
AtomicInteger>();
+ for (int i = 0; i < 10; i++) {
+ String key = "key_" + i;
+ counters.put(key, new AtomicInteger());
+ }
+
+ final Random random = new Random();
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < threadCount; i++) {
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int j = 0; j < actionsPerThread; j++) {
+ for (String key : counters.keySet()) {
+ if (random.nextBoolean()) {
+ queue.addPut(key, null);
+ } else {
+ queue.addPut(key, new Object());
+ }
+ counters.get(key).incrementAndGet();
+ }
+ }
+ }
+ });
+ threads.add(t);
+ }
+
+ for (Thread t : threads) {
+ t.start();
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ for (String key : counters.keySet()) {
+ assertEquals(queue.queuedKeys.count(key), counters.get(key).get());
+ }
+
+ for (CacheAction<?, ?> action : actions) {
+ if (random.nextBoolean()) {
+ action.execute();
+ } else {
+ action.cancel();
+ }
+ }
+
+ assertTrue(queue.queuedKeys.isEmpty());
+ assertTrue(queue.waitsForInvalidation.isEmpty());
+ }
+
+ @Test
+ public void testWaitsForInvalidation() {
+ assertFalse(queue.waitsForInvalidation("key"));
+
+ queue.addInvalidate(singleton("key"));
+ assertTrue(queue.waitsForInvalidation("key"));
+
+ queue.addPut("key", new Object());
+ assertFalse(queue.waitsForInvalidation("key"));
+
+ queue.addInvalidate(singleton("key"));
+ assertTrue(queue.waitsForInvalidation("key"));
+
+ int i;
+ for (i = 0; i < actions.size() - 1; i++) {
+ actions.get(i).execute();
+ assertTrue(queue.waitsForInvalidation("key"));
+ }
+
+ actions.get(i).execute();
+ assertFalse(queue.waitsForInvalidation("key"));
+ }
+
+}
\ No newline at end of file