Author: mreutegg
Date: Thu Jun 21 08:54:38 2018
New Revision: 1833987
URL: http://svn.apache.org/viewvc?rev=1833987&view=rev
Log:
OAK-7559: CacheActionDispatcher not memory bound
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCacheTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java?rev=1833987&r1=1833986&r2=1833987&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
Thu Jun 21 08:54:38 2018
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionExc
import javax.annotation.Nullable;
+import org.apache.jackrabbit.oak.cache.CacheValue;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache.GenerationCache;
@@ -54,7 +55,8 @@ import com.google.common.collect.Immutab
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class NodeCache<K, V> implements Cache<K, V>, GenerationCache,
EvictionListener<K, V> {
+class NodeCache<K extends CacheValue, V extends CacheValue>
+ implements Cache<K, V>, GenerationCache, EvictionListener<K, V> {
static final Logger LOG = LoggerFactory.getLogger(NodeCache.class);
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1833987&r1=1833986&r2=1833987&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
Thu Jun 21 08:54:38 2018
@@ -25,6 +25,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.jackrabbit.oak.cache.CacheValue;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
@@ -380,14 +381,14 @@ public class PersistentCache implements
writeBuffer.remove();
}
- public synchronized <K, V> Cache<K, V> wrap(
+ public synchronized <K extends CacheValue, V extends CacheValue> Cache<K,
V> wrap(
DocumentNodeStore docNodeStore,
DocumentStore docStore,
Cache<K, V> base, CacheType type) {
return wrap(docNodeStore, docStore, base, type,
StatisticsProvider.NOOP);
}
- public synchronized <K, V> Cache<K, V> wrap(
+ public synchronized <K extends CacheValue, V extends CacheValue> Cache<K,
V> wrap(
DocumentNodeStore docNodeStore,
DocumentStore docStore,
Cache<K, V> base, CacheType type,
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java?rev=1833987&r1=1833986&r2=1833987&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
Thu Jun 21 08:54:38 2018
@@ -18,15 +18,16 @@ package org.apache.jackrabbit.oak.plugin
/**
* Object represents an action on the cache (eg. put or invalidate).
- *
- * @param <K> key type
- * @param <V> value type
*/
-interface CacheAction<K, V> {
+interface CacheAction {
/**
* Execute the action
*/
void execute();
+ /**
+ * @return the size of the memory in bytes this cache action occupies.
+ */
+ int getMemory();
}
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java?rev=1833987&r1=1833986&r2=1833987&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
Thu Jun 21 08:54:38 2018
@@ -33,20 +33,51 @@ public class CacheActionDispatcher imple
private static final Logger LOG =
LoggerFactory.getLogger(CacheActionDispatcher.class);
/**
- * What's the length of the queue.
+ * Default maximum memory for the queue: 32 MB.
+ */
+ private static final long DEFAULT_MAX_MEMORY = 32 * 1024 * 1024;
+
+ /**
+ * The maximum length of the queue.
*/
static final int MAX_SIZE = 16 * 1024;
- final BlockingQueue<CacheAction<?, ?>> queue = new
ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE);
+ final BlockingQueue<CacheAction> queue = new
ArrayBlockingQueue<>(MAX_SIZE);
+
+ /**
+ * The maximum memory for all cache actions currently in the queue.
+ */
+ private final long maxMemory;
+
+ /**
+ * The current memory usage of the cache actions in the queue.
+ */
+ private long memory = 0;
+
+ /**
+ * Monitor object for synchronization.
+ */
+ private final Object monitor = new Object();
private volatile boolean isRunning = true;
+ public CacheActionDispatcher() {
+ this(DEFAULT_MAX_MEMORY);
+ }
+
+ CacheActionDispatcher(long maxMemory) {
+ this.maxMemory = maxMemory;
+ }
+
@Override
public void run() {
while (isRunning) {
try {
- CacheAction<?, ?> action = queue.poll(10,
TimeUnit.MILLISECONDS);
+ CacheAction action = queue.poll(10, TimeUnit.MILLISECONDS);
if (action != null && isRunning) {
+ synchronized (monitor) {
+ memory -= action.getMemory();
+ }
action.execute();
}
} catch (InterruptedException e) {
@@ -67,7 +98,24 @@ public class CacheActionDispatcher imple
*
* @param action to be added
*/
- boolean add(CacheAction<?, ?> action) {
- return queue.offer(action);
+ boolean add(CacheAction action) {
+ int m = action.getMemory();
+ synchronized (monitor) {
+ // check if the queue reached memory limit and accepts action
+ if (memory + m <= maxMemory && queue.offer(action)) {
+ memory += m;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Exposed for tests only.
+ *
+ * @return the current memory usage of the pending cache actions.
+ */
+ long getMemory() {
+ return memory;
}
}
\ No newline at end of file
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java?rev=1833987&r1=1833986&r2=1833987&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
Thu Jun 21 08:54:38 2018
@@ -16,11 +16,12 @@
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
+import org.apache.jackrabbit.oak.cache.CacheValue;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
import java.util.Map;
-public class CacheWriteQueue<K, V> {
+public class CacheWriteQueue<K extends CacheValue, V extends CacheValue> {
private final CacheActionDispatcher dispatcher;
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java?rev=1833987&r1=1833986&r2=1833987&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
Thu Jun 21 08:54:38 2018
@@ -19,6 +19,8 @@ package org.apache.jackrabbit.oak.plugin
import java.util.Map;
import com.google.common.collect.Iterables;
+
+import org.apache.jackrabbit.oak.cache.CacheValue;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
/**
@@ -27,7 +29,8 @@ import org.apache.jackrabbit.oak.plugins
* @param <K> key type
* @param <V> value type
*/
-class InvalidateCacheAction<K, V> implements CacheAction<K, V> {
+class InvalidateCacheAction<K extends CacheValue, V extends CacheValue>
+ implements CacheAction {
private final PersistentCache cache;
@@ -35,6 +38,8 @@ class InvalidateCacheAction<K, V> implem
private final Iterable<K> keys;
+ private int memory = 0;
+
InvalidateCacheAction(Iterable<K> keys, CacheWriteQueue<K, V> queue) {
this.keys = keys;
this.cache = queue.getCache();
@@ -52,6 +57,19 @@ class InvalidateCacheAction<K, V> implem
}
@Override
+ public int getMemory() {
+ long m = memory;
+ if (m == 0) {
+ for (K key : keys) {
+ m += key.getMemory();
+ }
+ m = Math.min(Integer.MAX_VALUE, m);
+ memory = (int) m;
+ }
+ return (int) m;
+ }
+
+ @Override
public String toString() {
return new
StringBuilder("InvalidateCacheAction").append(Iterables.toString(keys)).toString();
}
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java?rev=1833987&r1=1833986&r2=1833987&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
Thu Jun 21 08:54:38 2018
@@ -16,11 +16,9 @@
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
-import static java.util.Collections.singleton;
-
import java.util.Map;
-import com.google.common.collect.Iterables;
+import org.apache.jackrabbit.oak.cache.CacheValue;
import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
/**
@@ -29,7 +27,8 @@ import org.apache.jackrabbit.oak.plugins
* @param <K> key type
* @param <V> value type
*/
-class PutToCacheAction<K, V> implements CacheAction<K, V> {
+class PutToCacheAction<K extends CacheValue, V extends CacheValue>
+ implements CacheAction {
private final PersistentCache cache;
@@ -55,6 +54,13 @@ class PutToCacheAction<K, V> implements
}
@Override
+ public int getMemory() {
+ long mem = key.getMemory();
+ mem += value.getMemory();
+ return (int) Math.min(Integer.MAX_VALUE, mem);
+ }
+
+ @Override
public String toString() {
return new
StringBuilder("PutToCacheAction[").append(key).append(']').toString();
}
Modified:
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCacheTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCacheTest.java?rev=1833987&r1=1833986&r2=1833987&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCacheTest.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCacheTest.java
Thu Jun 21 08:54:38 2018
@@ -28,6 +28,8 @@ import java.util.function.Consumer;
import com.google.common.base.Predicate;
import com.google.common.cache.RemovalCause;
import com.google.common.collect.Lists;
+
+import org.apache.jackrabbit.oak.cache.CacheValue;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.json.JsopDiff;
import org.apache.jackrabbit.oak.plugins.document.AbstractDocumentNodeState;
@@ -236,15 +238,15 @@ public class NodeCacheTest {
}
- private static <V> void assertContains(NodeCache<PathRev, V> cache, String
path) {
+ private static <V extends CacheValue> void
assertContains(NodeCache<PathRev, V> cache, String path) {
assertPathRevs(cache, path, true);
}
- private static <V> void assertNotContains(NodeCache<PathRev, V> cache,
String path) {
+ private static <V extends CacheValue> void
assertNotContains(NodeCache<PathRev, V> cache, String path) {
assertPathRevs(cache, path, false);
}
- private static <V> void assertPathRevs(NodeCache<PathRev, V> cache, String
path, boolean contains) {
+ private static <V extends CacheValue> void
assertPathRevs(NodeCache<PathRev, V> cache, String path, boolean contains) {
List<PathRev> revs = getPathRevs(cache, path);
List<PathRev> matchingRevs = Lists.newArrayList();
for (PathRev pr : revs) {
@@ -262,7 +264,7 @@ public class NodeCacheTest {
}
}
- private static <V> List<PathRev> getPathRevs(NodeCache<PathRev, V> cache,
String path) {
+ private static <V extends CacheValue> List<PathRev>
getPathRevs(NodeCache<PathRev, V> cache, String path) {
List<PathRev> revs = Lists.newArrayList();
for (PathRev pr : cache.asMap().keySet()) {
if (pr.getPath().equals(path)) {
Modified:
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java?rev=1833987&r1=1833986&r2=1833987&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
Thu Jun 21 08:54:38 2018
@@ -24,28 +24,25 @@ import static java.lang.Thread.sleep;
import static
org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
-import
org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
+import org.apache.jackrabbit.oak.commons.StringUtils;
import org.junit.Test;
-import org.mockito.Mockito;
public class CacheActionDispatcherTest {
@Test
public void testMaxQueueSize() {
CacheActionDispatcher dispatcher = new CacheActionDispatcher();
- @SuppressWarnings({ "unchecked", "rawtypes" })
- CacheWriteQueue<String, Object> queue = new
CacheWriteQueue(dispatcher, mock(PersistentCache.class), null);
for (int i = 0; i < MAX_SIZE + 10; i++) {
- dispatcher.add(createWriteAction(valueOf(i), queue));
+ dispatcher.add(createWriteAction(valueOf(i)));
}
assertEquals(MAX_SIZE, dispatcher.queue.size());
assertEquals("0", dispatcher.queue.peek().toString());
@@ -56,8 +53,6 @@ public class CacheActionDispatcherTest {
int threads = 5;
int actionsPerThread = 100;
- @SuppressWarnings("unchecked")
- CacheWriteQueue<String, Object> queue =
Mockito.mock(CacheWriteQueue.class);
final CacheActionDispatcher dispatcher = new CacheActionDispatcher();
Thread queueThread = new Thread(dispatcher);
queueThread.start();
@@ -67,7 +62,7 @@ public class CacheActionDispatcherTest {
for (int i = 0; i < threads; i++) {
final List<DummyCacheWriteAction> threadActions = new
ArrayList<DummyCacheWriteAction>();
for (int j = 0; j < actionsPerThread; j++) {
- DummyCacheWriteAction action = new
DummyCacheWriteAction(String.format("%d_%d", i, j), queue);
+ DummyCacheWriteAction action = new
DummyCacheWriteAction(String.format("%d_%d", i, j));
threadActions.add(action);
allActions.add(action);
}
@@ -107,13 +102,58 @@ public class CacheActionDispatcherTest {
assertFalse(queueThread.isAlive());
}
- private DummyCacheWriteAction createWriteAction(String id,
CacheWriteQueue<String, Object> queue) {
- return new DummyCacheWriteAction(id, queue);
+ @Test
+ public void maxMemory() throws Exception {
+ // calculate memory for a few actions and use as memory maximum
+ long maxMemory = 0;
+ List<CacheAction> actions = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ CacheAction a = new DummyCacheWriteAction("id-" + i, 0);
+ actions.add(a);
+ maxMemory += a.getMemory();
+ }
+ CacheActionDispatcher dispatcher = new
CacheActionDispatcher(maxMemory);
+
+ // adding actions to the queue must all succeed
+ for (CacheAction a : actions) {
+ assertTrue(dispatcher.add(a));
+ }
+
+ // adding more must be rejected
+ assertFalse(dispatcher.add(new DummyCacheWriteAction("foo", 0)));
+
+ // drain the queue
+ Thread t = new Thread(dispatcher);
+ t.start();
+
+ for (int i = 0; i < 100; i++) {
+ if (dispatcher.getMemory() == 0) {
+ break;
+ }
+ Thread.sleep(20);
+ }
+ assertEquals(0, dispatcher.getMemory());
+ dispatcher.stop();
+ t.join();
+
+ // must be able to add again
+ assertTrue(dispatcher.add(actions.get(0)));
+
+ // but not if it exceeds the maximum memory
+ String id = "abcdef";
+ CacheAction big;
+ do {
+ big = new DummyCacheWriteAction(id, 0);
+ id = id + id;
+ } while (big.getMemory() < maxMemory);
+ assertFalse(dispatcher.add(big));
}
- private class DummyCacheWriteAction implements CacheAction<String, Object>
{
+ private DummyCacheWriteAction createWriteAction(String id) {
+ return new DummyCacheWriteAction(id);
+ }
- private final CacheWriteQueue<String, Object> queue;
+ private class DummyCacheWriteAction implements CacheAction {
private final String id;
@@ -121,12 +161,11 @@ public class CacheActionDispatcherTest {
private volatile boolean finished;
- private DummyCacheWriteAction(String id, CacheWriteQueue<String,
Object> queue) {
- this(id, queue, new Random().nextInt(10));
+ private DummyCacheWriteAction(String id) {
+ this(id, new Random().nextInt(10));
}
- private DummyCacheWriteAction(String id, CacheWriteQueue<String,
Object> queue, long delay) {
- this.queue = queue;
+ private DummyCacheWriteAction(String id, long delay) {
this.id = id;
this.delay = delay;
}
@@ -142,6 +181,11 @@ public class CacheActionDispatcherTest {
}
@Override
+ public int getMemory() {
+ return StringUtils.estimateMemoryUsage(id);
+ }
+
+ @Override
public String toString() {
return id;
}