Author: thomasm
Date: Mon Nov 4 13:24:55 2013
New Revision: 1538589
URL: http://svn.apache.org/r1538589
Log:
OAK-1080: MongoMK: improved concurrency
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoBlob.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoBlob.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoBlob.java?rev=1538589&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoBlob.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoBlob.java
Mon Nov 4 13:24:55 2013
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.io.InputStream;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.mk.blobs.BlobStore;
+import org.apache.jackrabbit.mk.blobs.BlobStoreInputStream;
+import org.apache.jackrabbit.oak.api.Blob;
+
+/**
+ * A blob implementation.
+ */
+public class MongoBlob implements Blob {
+
+ private final BlobStore blobStore;
+ private final String id;
+
+ public MongoBlob(BlobStore blobStore, String id) {
+ this.blobStore = blobStore;
+ this.id = id;
+ }
+
+ @Override
+ @Nonnull
+ public InputStream getNewStream() {
+ return new BlobStoreInputStream(blobStore, id, 0);
+ }
+
+ @Override
+ public long length() {
+ try {
+ return blobStore.getBlobLength(id);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid blob id: " + id);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+
+}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1538589&r1=1538588&r2=1538589&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
Mon Nov 4 13:24:55 2013
@@ -80,11 +80,6 @@ public class MongoMK implements MicroKer
protected final DocumentStore store;
/**
- * The MongoDB blob store.
- */
- private final BlobStore blobStore;
-
- /**
* Diff cache.
*/
private final Cache<String, Diff> diffCache;
@@ -93,7 +88,6 @@ public class MongoMK implements MicroKer
MongoMK(Builder builder) {
this.nodeStore = builder.getNodeStore();
this.store = nodeStore.getDocumentStore();
- this.blobStore = builder.getBlobStore();
diffCache = builder.buildCache(builder.getDiffCacheSize());
diffCacheStats = new CacheStats(diffCache, "MongoMk-DiffCache",
@@ -453,7 +447,7 @@ public class MongoMK implements MicroKer
@Override
public long getLength(String blobId) throws MicroKernelException {
try {
- return blobStore.getBlobLength(blobId);
+ return nodeStore.getBlob(blobId).length();
} catch (Exception e) {
throw new MicroKernelException(e);
}
@@ -463,7 +457,7 @@ public class MongoMK implements MicroKer
public int read(String blobId, long pos, byte[] buff, int off, int length)
throws MicroKernelException {
try {
- return blobStore.readBlob(blobId, pos, buff, off, length);
+ return nodeStore.getBlobStore().readBlob(blobId, pos, buff, off,
length);
} catch (Exception e) {
throw new MicroKernelException(e);
}
@@ -472,7 +466,7 @@ public class MongoMK implements MicroKer
@Override
public String write(InputStream in) throws MicroKernelException {
try {
- return blobStore.writeBlob(in);
+ return nodeStore.getBlobStore().writeBlob(in);
} catch (Exception e) {
throw new MicroKernelException(e);
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1538589&r1=1538588&r2=1538589&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
Mon Nov 4 13:24:55 2013
@@ -49,6 +49,7 @@ import com.google.common.cache.Cache;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.mk.blobs.BlobStore;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.cache.CacheStats;
@@ -113,17 +114,17 @@ public final class MongoNodeStore
protected final ChangeDispatcher dispatcher;
/**
- * Whether this instance is disposed.
- */
- private final AtomicBoolean isDisposed = new AtomicBoolean();
-
- /**
* The delay for asynchronous operations (delayed commit propagation and
* cache update).
*/
protected int asyncDelay = 1000;
/**
+ * Whether this instance is disposed.
+ */
+ private final AtomicBoolean isDisposed = new AtomicBoolean();
+
+ /**
* The cluster instance info.
*/
private final ClusterNodeInfo clusterNodeInfo;
@@ -219,8 +220,14 @@ public final class MongoNodeStore
*/
private final Cache<String, NodeDocument.Children> docChildrenCache;
private final CacheStats docChildrenCacheStats;
+
+ /**
+ * The MongoDB blob store.
+ */
+ private final BlobStore blobStore;
public MongoNodeStore(MongoMK.Builder builder) {
+ this.blobStore = builder.getBlobStore();
if (builder.isUseSimpleRevision()) {
this.simpleRevisionCounter = new AtomicInteger(0);
}
@@ -475,7 +482,7 @@ public final class MongoNodeStore
* given revision.
*/
@CheckForNull
- Node getNode(final @Nonnull String path, final @Nonnull Revision rev) {
+ Node getNode(@Nonnull final String path, @Nonnull final Revision rev) {
checkRevisionAge(checkNotNull(rev), checkNotNull(path));
try {
String key = path + "@" + rev;
@@ -855,8 +862,7 @@ public final class MongoNodeStore
*/
@Nonnull
Blob getBlob(String blobId) {
- // TODO: implement blob handling
- return null;
+ return new MongoBlob(blobStore, blobId);
}
//------------------------< Observable
>------------------------------------
@@ -896,8 +902,15 @@ public final class MongoNodeStore
@Override
public Blob createBlob(InputStream inputStream) throws IOException {
- // TODO: implement
- return null;
+ String id;
+ try {
+ id = blobStore.writeBlob(inputStream);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Could not write blob", e);
+ }
+ return new MongoBlob(blobStore, id);
}
@Nonnull
@@ -1229,4 +1242,9 @@ public final class MongoNodeStore
}
}
}
+
+ public BlobStore getBlobStore() {
+ return blobStore;
+ }
+
}