Author: thomasm
Date: Thu Mar 13 14:58:26 2014
New Revision: 1577184

URL: http://svn.apache.org/r1577184
Log:
OAK-1509 AbstractBlobStore: use a concurrent cache

Added:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CachingBlobStore.java
Modified:
    
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStore.java
    
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java
    
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java

Modified: 
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStore.java
 Thu Mar 13 14:58:26 2014
@@ -92,11 +92,6 @@ public abstract class AbstractBlobStore 
     private int blockSize = 2 * 1024 * 1024;
 
     /**
-     * The cache (16 MB).
-     */
-    private Cache<AbstractBlobStore.BlockId, Data> cache = 
Cache.newInstance(this, 16 * 1024 * 1024);
-
-    /**
      * The byte array is re-used if possible, to avoid having to create a new,
      * large byte array each time a (potentially very small) binary is stored.
      */
@@ -173,11 +168,6 @@ public abstract class AbstractBlobStore 
         inUse.clear();
     }
 
-    @Override
-    public void clearCache() {
-        cache.clear();
-    }
-
     private void convertBlobToId(InputStream in, ByteArrayOutputStream 
idStream, int level, long totalLength) throws IOException {
         int count = 0;
         // try to re-use the block (but not concurrently)
@@ -323,7 +313,7 @@ public abstract class AbstractBlobStore 
 
     byte[] readBlock(byte[] digest, long pos) {
         BlockId id = new BlockId(digest, pos);
-        return cache.get(id).data;
+        return load(id).data;
     }
 
     @Override
@@ -410,13 +400,12 @@ public abstract class AbstractBlobStore 
                 }
                 byte[] digest = new byte[IOUtils.readVarInt(idStream)];
                 IOUtils.readFully(idStream, digest, 0, digest.length);
+                BlockId id = new BlockId(digest, 0);
+                mark(id);
                 if (level > 0) {
                     byte[] block = readBlock(digest, 0);
                     idStream = new ByteArrayInputStream(block);
                     mark(idStream);
-                } else {
-                    BlockId id = new BlockId(digest, 0);
-                    mark(id);
                 }
             } else {
                 throw new IOException("Unknown blobs id type " + type);

Modified: 
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java
 Thu Mar 13 14:58:26 2014
@@ -262,4 +262,9 @@ public class FileBlobStore extends Abstr
             }
         };
     }
+
+    @Override
+    public void clearCache() {
+        // no cache
+    }
 }
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java
 Thu Mar 13 14:58:26 2014
@@ -117,5 +117,10 @@ public class MemoryBlobStore extends Abs
             }
         };
     }
+    
+    @Override
+    public void clearCache() {
+        // no cache
+    }
 
 }

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CachingBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CachingBlobStore.java?rev=1577184&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CachingBlobStore.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CachingBlobStore.java
 Thu Mar 13 14:58:26 2014
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
+
+import com.google.common.cache.Weigher;
+
+/**
+ * A blob store with a cache.
+ */
+public abstract class CachingBlobStore extends AbstractBlobStore {
+    
+    protected final CacheLIRS<String, byte[]> cache = 
+            CacheLIRS.newBuilder().
+                maximumWeight(16 * 1024 * 1024).
+                averageWeight(getBlockSize() / 2).
+                weigher(new Weigher<String, byte[]>() {
+                    @Override
+                    public int weigh(String key, byte[] value) {
+                        return value.length;
+                    }
+                }).build();
+
+    @Override
+    public void clearCache() {
+        cache.invalidateAll();
+    }
+    
+}

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java
 Thu Mar 13 14:58:26 2014
@@ -26,7 +26,7 @@ import com.google.common.base.Preconditi
 import com.google.common.collect.Maps;
 import com.google.common.io.ByteStreams;
 import org.apache.jackrabbit.oak.commons.StringUtils;
-import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
 import org.jclouds.ContextBuilder;
 import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.domain.Blob;
@@ -44,7 +44,7 @@ import static org.jclouds.blobstore.opti
  * <p>
  * Extends {@link AbstractBlobStore} and breaks the the binary to chunks for 
easier management.
  */
-public class CloudBlobStore extends AbstractBlobStore {
+public class CloudBlobStore extends CachingBlobStore {
     /**
      * Logger instance.
      */
@@ -130,7 +130,8 @@ public class CloudBlobStore extends Abst
         Preconditions.checkNotNull(context);
 
         String id = StringUtils.convertBytesToHex(digest);
-
+        cache.put(id, data);
+        
         org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore();
 
         if (!blobStore.blobExists(cloudContainer, id)) {
@@ -156,32 +157,33 @@ public class CloudBlobStore extends Abst
         Preconditions.checkNotNull(context);
 
         String id = StringUtils.convertBytesToHex(blockId.getDigest());
-
-        Blob cloudBlob = context.getBlobStore().getBlob(cloudContainer, id);
-        if (cloudBlob == null) {
-            String message = "Did not find block " + id;
-            LOG.error(message);
-            throw new IOException(message);
-        }
-
-        Payload payload = cloudBlob.getPayload();
-        try {
-            byte[] data = ByteStreams.toByteArray(payload.getInput());
-
-            if (blockId.getPos() == 0) {
-                return data;
+        byte[] data = cache.get(id);
+        if (data == null) {
+            Blob cloudBlob = context.getBlobStore().getBlob(cloudContainer, 
id);
+            if (cloudBlob == null) {
+                String message = "Did not find block " + id;
+                LOG.error(message);
+                throw new IOException(message);
             }
-
-            int len = (int) (data.length - blockId.getPos());
-            if (len < 0) {
-                return new byte[0];
+    
+            Payload payload = cloudBlob.getPayload();
+            try {
+                data = ByteStreams.toByteArray(payload.getInput());
+                cache.put(id, data);        
+            } finally {
+                payload.close();
             }
-            byte[] d2 = new byte[len];
-            System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
-            return d2;
-        } finally {
-            payload.close();
         }
+        if (blockId.getPos() == 0) {
+            return data;
+        }
+        int len = (int) (data.length - blockId.getPos());
+        if (len < 0) {
+            return new byte[0];
+        }
+        byte[] d2 = new byte[len];
+        System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
+        return d2;
     }
 
     /**

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java
 Thu Mar 13 14:58:26 2014
@@ -25,16 +25,16 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Iterator;
 
-import com.google.common.collect.AbstractIterator;
-
 import org.apache.jackrabbit.oak.commons.StringUtils;
-import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
 import org.h2.jdbcx.JdbcConnectionPool;
 
+import com.google.common.collect.AbstractIterator;
+
 /**
  * A database blob store.
  */
-public class DbBlobStore extends AbstractBlobStore {
+public class DbBlobStore extends CachingBlobStore {
 
     private JdbcConnectionPool cp;
     private long minLastModified;
@@ -61,9 +61,10 @@ public class DbBlobStore extends Abstrac
     }
     
     private void storeBlockToDatabase(byte[] digest, int level, byte[] data) 
throws SQLException {
+        String id = StringUtils.convertBytesToHex(digest);
+        cache.put(id, data);
         Connection conn = cp.getConnection();
         try {
-            String id = StringUtils.convertBytesToHex(digest);
             long now = System.currentTimeMillis();
             PreparedStatement prep = conn.prepareStatement(
                     "update datastore_meta set lastMod = ? where id = ?");
@@ -111,35 +112,39 @@ public class DbBlobStore extends Abstrac
 
     @Override
     protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
-        Connection conn = cp.getConnection();
-        try {
-            PreparedStatement prep = conn.prepareStatement(
-                    "select data from datastore_data where id = ?");
+        String id = StringUtils.convertBytesToHex(blockId.getDigest());
+        byte[] data = cache.get(id);
+        if (data == null) {        
+            Connection conn = cp.getConnection();
             try {
-                String id = StringUtils.convertBytesToHex(blockId.getDigest());
-                prep.setString(1, id);
-                ResultSet rs = prep.executeQuery();
-                if (!rs.next()) {
-                    throw new IOException("Datastore block " + id + " not 
found");
-                }
-                byte[] data = rs.getBytes(1);
-                // System.out.println("    read block " + id + " blockLen: " + 
data.length + " [0]: " + data[0]);
-                if (blockId.getPos() == 0) {
-                    return data;
-                }
-                int len = (int) (data.length - blockId.getPos());
-                if (len < 0) {
-                    return new byte[0];
+                PreparedStatement prep = conn.prepareStatement(
+                        "select data from datastore_data where id = ?");
+                try {
+                    prep.setString(1, id);
+                    ResultSet rs = prep.executeQuery();
+                    if (!rs.next()) {
+                        throw new IOException("Datastore block " + id + " not 
found");
+                    }
+                    data = rs.getBytes(1);
+                } finally {
+                    prep.close();
                 }
-                byte[] d2 = new byte[len];
-                System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
-                return d2;
+                cache.put(id, data);
             } finally {
-                prep.close();
+                conn.close();
             }
-        } finally {
-            conn.close();
         }
+        // System.out.println("    read block " + id + " blockLen: " + 
data.length + " [0]: " + data[0]);
+        if (blockId.getPos() == 0) {
+            return data;
+        }
+        int len = (int) (data.length - blockId.getPos());
+        if (len < 0) {
+            return new byte[0];
+        }
+        byte[] d2 = new byte[len];
+        System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
+        return d2;
     }
 
     @Override
@@ -289,4 +294,5 @@ public class DbBlobStore extends Abstrac
             }
         };
     }
+    
 }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java
 Thu Mar 13 14:58:26 2014
@@ -19,6 +19,11 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.collect.AbstractIterator;
 import com.mongodb.BasicDBObject;
 import com.mongodb.Bytes;
@@ -31,11 +36,6 @@ import com.mongodb.QueryBuilder;
 import com.mongodb.ReadPreference;
 import com.mongodb.WriteResult;
 
-import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
-import org.apache.jackrabbit.oak.commons.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Implementation of blob store for the MongoDB extending from
  * {@link AbstractBlobStore}. It saves blobs into a separate collection in
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
  * FIXME: -Do we need to create commands for retry etc.? -Not sure if this is
  * going to work for multiple MKs talking to same MongoDB?
  */
-public class MongoBlobStore extends AbstractBlobStore {
+public class MongoBlobStore extends CachingBlobStore {
 
     public static final String COLLECTION_BLOBS = "blobs";
 
@@ -52,7 +52,7 @@ public class MongoBlobStore extends Abst
 
     private final DB db;
     private long minLastModified;
-
+    
     /**
      * Constructs a new {@code MongoBlobStore}
      *
@@ -66,6 +66,7 @@ public class MongoBlobStore extends Abst
     @Override
     protected void storeBlock(byte[] digest, int level, byte[] data) throws 
IOException {
         String id = StringUtils.convertBytesToHex(digest);
+        cache.put(id, data);
         // Check if it already exists?
         MongoBlob mongoBlob = new MongoBlob();
         mongoBlob.setId(id);
@@ -84,18 +85,20 @@ public class MongoBlobStore extends Abst
     @Override
     protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
         String id = StringUtils.convertBytesToHex(blockId.getDigest());
-        MongoBlob blobMongo = getBlob(id, 0);
-        if (blobMongo == null) {
-            String message = "Did not find block " + id;
-            LOG.error(message);
-            throw new IOException(message);
+        byte[] data = cache.get(id);
+        if (data == null) {
+            MongoBlob blobMongo = getBlob(id, 0);
+            if (blobMongo == null) {
+                String message = "Did not find block " + id;
+                LOG.error(message);
+                throw new IOException(message);
+            }
+            data = blobMongo.getData();
+            cache.put(id, data);
         }
-        byte[] data = blobMongo.getData();
-
         if (blockId.getPos() == 0) {
             return data;
         }
-
         int len = (int) (data.length - blockId.getPos());
         if (len < 0) {
             return new byte[0];

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java?rev=1577184&r1=1577183&r2=1577184&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java
 Thu Mar 13 14:58:26 2014
@@ -32,12 +32,12 @@ import javax.sql.DataSource;
 import com.google.common.collect.AbstractIterator;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
-import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
 import org.apache.jackrabbit.oak.commons.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RDBBlobStore extends AbstractBlobStore implements Closeable {
+public class RDBBlobStore extends CachingBlobStore implements Closeable {
     /**
      * Creates a {@linkplain RDBBlobStore} instance using an embedded H2
      * database in in-memory mode.
@@ -142,8 +142,9 @@ public class RDBBlobStore extends Abstra
     }
 
     private void storeBlockInDatabase(byte[] digest, int level, byte[] data) 
throws SQLException {
+        String id = StringUtils.convertBytesToHex(digest);
+        cache.put(id, data);
         try {
-            String id = StringUtils.convertBytesToHex(digest);
             long now = System.currentTimeMillis();
             PreparedStatement prep = connection.prepareStatement("update 
datastore_meta set lastMod = ? where id = ?");
             int count;
@@ -188,34 +189,36 @@ public class RDBBlobStore extends Abstra
 
     @Override
     protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
+        String id = StringUtils.convertBytesToHex(blockId.getDigest());
+        byte[] data = cache.get(id);
         try {
             PreparedStatement prep = connection.prepareStatement("select data 
from datastore_data where id = ?");
             try {
-                String id = StringUtils.convertBytesToHex(blockId.getDigest());
                 prep.setString(1, id);
                 ResultSet rs = prep.executeQuery();
                 if (!rs.next()) {
                     throw new IOException("Datastore block " + id + " not 
found");
                 }
-                byte[] data = rs.getBytes(1);
-                // System.out.println("    read block " + id + " blockLen: " +
-                // data.length + " [0]: " + data[0]);
-                if (blockId.getPos() == 0) {
-                    return data;
-                }
-                int len = (int) (data.length - blockId.getPos());
-                if (len < 0) {
-                    return new byte[0];
-                }
-                byte[] d2 = new byte[len];
-                System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
-                return d2;
+                data = rs.getBytes(1);
             } finally {
                 prep.close();
             }
+            cache.put(id, data);
         } finally {
             connection.commit();
         }
+        // System.out.println("    read block " + id + " blockLen: " +
+        // data.length + " [0]: " + data[0]);
+        if (blockId.getPos() == 0) {
+            return data;
+        }
+        int len = (int) (data.length - blockId.getPos());
+        if (len < 0) {
+            return new byte[0];
+        }
+        byte[] d2 = new byte[len];
+        System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
+        return d2;
     }
 
     @Override


Reply via email to