Author: thomasm
Date: Thu Mar 13 14:58:26 2014
New Revision: 1577184
URL: http://svn.apache.org/r1577184
Log:
OAK-1509 AbstractBlobStore: use a concurrent cache
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CachingBlobStore.java
Modified:
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStore.java
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
Modified:
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStore.java
Thu Mar 13 14:58:26 2014
@@ -92,11 +92,6 @@ public abstract class AbstractBlobStore
private int blockSize = 2 * 1024 * 1024;
/**
- * The cache (16 MB).
- */
- private Cache<AbstractBlobStore.BlockId, Data> cache =
Cache.newInstance(this, 16 * 1024 * 1024);
-
- /**
* The byte array is re-used if possible, to avoid having to create a new,
* large byte array each time a (potentially very small) binary is stored.
*/
@@ -173,11 +168,6 @@ public abstract class AbstractBlobStore
inUse.clear();
}
- @Override
- public void clearCache() {
- cache.clear();
- }
-
private void convertBlobToId(InputStream in, ByteArrayOutputStream
idStream, int level, long totalLength) throws IOException {
int count = 0;
// try to re-use the block (but not concurrently)
@@ -323,7 +313,7 @@ public abstract class AbstractBlobStore
byte[] readBlock(byte[] digest, long pos) {
BlockId id = new BlockId(digest, pos);
- return cache.get(id).data;
+ return load(id).data;
}
@Override
@@ -410,13 +400,12 @@ public abstract class AbstractBlobStore
}
byte[] digest = new byte[IOUtils.readVarInt(idStream)];
IOUtils.readFully(idStream, digest, 0, digest.length);
+ BlockId id = new BlockId(digest, 0);
+ mark(id);
if (level > 0) {
byte[] block = readBlock(digest, 0);
idStream = new ByteArrayInputStream(block);
mark(idStream);
- } else {
- BlockId id = new BlockId(digest, 0);
- mark(id);
}
} else {
throw new IOException("Unknown blobs id type " + type);
Modified:
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java
Thu Mar 13 14:58:26 2014
@@ -262,4 +262,9 @@ public class FileBlobStore extends Abstr
}
};
}
+
+ @Override
+ public void clearCache() {
+ // no cache
+ }
}
\ No newline at end of file
Modified:
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java
Thu Mar 13 14:58:26 2014
@@ -117,5 +117,10 @@ public class MemoryBlobStore extends Abs
}
};
}
+
+ @Override
+ public void clearCache() {
+ // no cache
+ }
}
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CachingBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CachingBlobStore.java?rev=1577184&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CachingBlobStore.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CachingBlobStore.java
Thu Mar 13 14:58:26 2014
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
+
+import com.google.common.cache.Weigher;
+
+/**
+ * A blob store with a cache.
+ */
+public abstract class CachingBlobStore extends AbstractBlobStore {
+
+ protected final CacheLIRS<String, byte[]> cache =
+ CacheLIRS.newBuilder().
+ maximumWeight(16 * 1024 * 1024).
+ averageWeight(getBlockSize() / 2).
+ weigher(new Weigher<String, byte[]>() {
+ @Override
+ public int weigh(String key, byte[] value) {
+ return value.length;
+ }
+ }).build();
+
+ @Override
+ public void clearCache() {
+ cache.invalidateAll();
+ }
+
+}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java
Thu Mar 13 14:58:26 2014
@@ -26,7 +26,7 @@ import com.google.common.base.Preconditi
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import org.apache.jackrabbit.oak.commons.StringUtils;
-import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
@@ -44,7 +44,7 @@ import static org.jclouds.blobstore.opti
* <p>
* Extends {@link AbstractBlobStore} and breaks the the binary to chunks for
easier management.
*/
-public class CloudBlobStore extends AbstractBlobStore {
+public class CloudBlobStore extends CachingBlobStore {
/**
* Logger instance.
*/
@@ -130,7 +130,8 @@ public class CloudBlobStore extends Abst
Preconditions.checkNotNull(context);
String id = StringUtils.convertBytesToHex(digest);
-
+ cache.put(id, data);
+
org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore();
if (!blobStore.blobExists(cloudContainer, id)) {
@@ -156,32 +157,33 @@ public class CloudBlobStore extends Abst
Preconditions.checkNotNull(context);
String id = StringUtils.convertBytesToHex(blockId.getDigest());
-
- Blob cloudBlob = context.getBlobStore().getBlob(cloudContainer, id);
- if (cloudBlob == null) {
- String message = "Did not find block " + id;
- LOG.error(message);
- throw new IOException(message);
- }
-
- Payload payload = cloudBlob.getPayload();
- try {
- byte[] data = ByteStreams.toByteArray(payload.getInput());
-
- if (blockId.getPos() == 0) {
- return data;
+ byte[] data = cache.get(id);
+ if (data == null) {
+ Blob cloudBlob = context.getBlobStore().getBlob(cloudContainer,
id);
+ if (cloudBlob == null) {
+ String message = "Did not find block " + id;
+ LOG.error(message);
+ throw new IOException(message);
}
-
- int len = (int) (data.length - blockId.getPos());
- if (len < 0) {
- return new byte[0];
+
+ Payload payload = cloudBlob.getPayload();
+ try {
+ data = ByteStreams.toByteArray(payload.getInput());
+ cache.put(id, data);
+ } finally {
+ payload.close();
}
- byte[] d2 = new byte[len];
- System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
- return d2;
- } finally {
- payload.close();
}
+ if (blockId.getPos() == 0) {
+ return data;
+ }
+ int len = (int) (data.length - blockId.getPos());
+ if (len < 0) {
+ return new byte[0];
+ }
+ byte[] d2 = new byte[len];
+ System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
+ return d2;
}
/**
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java
Thu Mar 13 14:58:26 2014
@@ -25,16 +25,16 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
-import com.google.common.collect.AbstractIterator;
-
import org.apache.jackrabbit.oak.commons.StringUtils;
-import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
import org.h2.jdbcx.JdbcConnectionPool;
+import com.google.common.collect.AbstractIterator;
+
/**
* A database blob store.
*/
-public class DbBlobStore extends AbstractBlobStore {
+public class DbBlobStore extends CachingBlobStore {
private JdbcConnectionPool cp;
private long minLastModified;
@@ -61,9 +61,10 @@ public class DbBlobStore extends Abstrac
}
private void storeBlockToDatabase(byte[] digest, int level, byte[] data)
throws SQLException {
+ String id = StringUtils.convertBytesToHex(digest);
+ cache.put(id, data);
Connection conn = cp.getConnection();
try {
- String id = StringUtils.convertBytesToHex(digest);
long now = System.currentTimeMillis();
PreparedStatement prep = conn.prepareStatement(
"update datastore_meta set lastMod = ? where id = ?");
@@ -111,35 +112,39 @@ public class DbBlobStore extends Abstrac
@Override
protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
- Connection conn = cp.getConnection();
- try {
- PreparedStatement prep = conn.prepareStatement(
- "select data from datastore_data where id = ?");
+ String id = StringUtils.convertBytesToHex(blockId.getDigest());
+ byte[] data = cache.get(id);
+ if (data == null) {
+ Connection conn = cp.getConnection();
try {
- String id = StringUtils.convertBytesToHex(blockId.getDigest());
- prep.setString(1, id);
- ResultSet rs = prep.executeQuery();
- if (!rs.next()) {
- throw new IOException("Datastore block " + id + " not
found");
- }
- byte[] data = rs.getBytes(1);
- // System.out.println(" read block " + id + " blockLen: " +
data.length + " [0]: " + data[0]);
- if (blockId.getPos() == 0) {
- return data;
- }
- int len = (int) (data.length - blockId.getPos());
- if (len < 0) {
- return new byte[0];
+ PreparedStatement prep = conn.prepareStatement(
+ "select data from datastore_data where id = ?");
+ try {
+ prep.setString(1, id);
+ ResultSet rs = prep.executeQuery();
+ if (!rs.next()) {
+ throw new IOException("Datastore block " + id + " not
found");
+ }
+ data = rs.getBytes(1);
+ } finally {
+ prep.close();
}
- byte[] d2 = new byte[len];
- System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
- return d2;
+ cache.put(id, data);
} finally {
- prep.close();
+ conn.close();
}
- } finally {
- conn.close();
}
+ // System.out.println(" read block " + id + " blockLen: " +
data.length + " [0]: " + data[0]);
+ if (blockId.getPos() == 0) {
+ return data;
+ }
+ int len = (int) (data.length - blockId.getPos());
+ if (len < 0) {
+ return new byte[0];
+ }
+ byte[] d2 = new byte[len];
+ System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
+ return d2;
}
@Override
@@ -289,4 +294,5 @@ public class DbBlobStore extends Abstrac
}
};
}
+
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
Thu Mar 13 14:58:26 2014
@@ -19,6 +19,11 @@ package org.apache.jackrabbit.oak.plugin
import java.io.IOException;
import java.util.Iterator;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.collect.AbstractIterator;
import com.mongodb.BasicDBObject;
import com.mongodb.Bytes;
@@ -31,11 +36,6 @@ import com.mongodb.QueryBuilder;
import com.mongodb.ReadPreference;
import com.mongodb.WriteResult;
-import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
-import org.apache.jackrabbit.oak.commons.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Implementation of blob store for the MongoDB extending from
* {@link AbstractBlobStore}. It saves blobs into a separate collection in
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
* FIXME: -Do we need to create commands for retry etc.? -Not sure if this is
* going to work for multiple MKs talking to same MongoDB?
*/
-public class MongoBlobStore extends AbstractBlobStore {
+public class MongoBlobStore extends CachingBlobStore {
public static final String COLLECTION_BLOBS = "blobs";
@@ -52,7 +52,7 @@ public class MongoBlobStore extends Abst
private final DB db;
private long minLastModified;
-
+
/**
* Constructs a new {@code MongoBlobStore}
*
@@ -66,6 +66,7 @@ public class MongoBlobStore extends Abst
@Override
protected void storeBlock(byte[] digest, int level, byte[] data) throws
IOException {
String id = StringUtils.convertBytesToHex(digest);
+ cache.put(id, data);
// Check if it already exists?
MongoBlob mongoBlob = new MongoBlob();
mongoBlob.setId(id);
@@ -84,18 +85,20 @@ public class MongoBlobStore extends Abst
@Override
protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
String id = StringUtils.convertBytesToHex(blockId.getDigest());
- MongoBlob blobMongo = getBlob(id, 0);
- if (blobMongo == null) {
- String message = "Did not find block " + id;
- LOG.error(message);
- throw new IOException(message);
+ byte[] data = cache.get(id);
+ if (data == null) {
+ MongoBlob blobMongo = getBlob(id, 0);
+ if (blobMongo == null) {
+ String message = "Did not find block " + id;
+ LOG.error(message);
+ throw new IOException(message);
+ }
+ data = blobMongo.getData();
+ cache.put(id, data);
}
- byte[] data = blobMongo.getData();
-
if (blockId.getPos() == 0) {
return data;
}
-
int len = (int) (data.length - blockId.getPos());
if (len < 0) {
return new byte[0];
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
Thu Mar 13 14:58:26 2014
@@ -32,12 +32,12 @@ import javax.sql.DataSource;
import com.google.common.collect.AbstractIterator;
import org.apache.jackrabbit.mk.api.MicroKernelException;
-import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RDBBlobStore extends AbstractBlobStore implements Closeable {
+public class RDBBlobStore extends CachingBlobStore implements Closeable {
/**
* Creates a {@linkplain RDBBlobStore} instance using an embedded H2
* database in in-memory mode.
@@ -142,8 +142,9 @@ public class RDBBlobStore extends Abstra
}
private void storeBlockInDatabase(byte[] digest, int level, byte[] data)
throws SQLException {
+ String id = StringUtils.convertBytesToHex(digest);
+ cache.put(id, data);
try {
- String id = StringUtils.convertBytesToHex(digest);
long now = System.currentTimeMillis();
PreparedStatement prep = connection.prepareStatement("update
datastore_meta set lastMod = ? where id = ?");
int count;
@@ -188,34 +189,36 @@ public class RDBBlobStore extends Abstra
@Override
protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
+ String id = StringUtils.convertBytesToHex(blockId.getDigest());
+ byte[] data = cache.get(id);
try {
PreparedStatement prep = connection.prepareStatement("select data
from datastore_data where id = ?");
try {
- String id = StringUtils.convertBytesToHex(blockId.getDigest());
prep.setString(1, id);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw new IOException("Datastore block " + id + " not
found");
}
- byte[] data = rs.getBytes(1);
- // System.out.println(" read block " + id + " blockLen: " +
- // data.length + " [0]: " + data[0]);
- if (blockId.getPos() == 0) {
- return data;
- }
- int len = (int) (data.length - blockId.getPos());
- if (len < 0) {
- return new byte[0];
- }
- byte[] d2 = new byte[len];
- System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
- return d2;
+ data = rs.getBytes(1);
} finally {
prep.close();
}
+ cache.put(id, data);
} finally {
connection.commit();
}
+ // System.out.println(" read block " + id + " blockLen: " +
+ // data.length + " [0]: " + data[0]);
+ if (blockId.getPos() == 0) {
+ return data;
+ }
+ int len = (int) (data.length - blockId.getPos());
+ if (len < 0) {
+ return new byte[0];
+ }
+ byte[] d2 = new byte[len];
+ System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
+ return d2;
}
@Override