Author: chetanm
Date: Fri Mar 14 08:42:38 2014
New Revision: 1577449
URL: http://svn.apache.org/r1577449
Log:
OAK-1333 - SegmentMK: Support for Blobs in external storage
Add support for using BlobStore in SgementStore for managing external blobs
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/ExternalBlobTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1577449&r1=1577448&r2=1577449&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
Fri Mar 14 08:42:38 2014
@@ -23,22 +23,28 @@ import java.io.File;
import java.io.IOException;
import java.util.Dictionary;
+import org.apache.commons.io.FilenameUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.ConfigurationPolicy;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Service;
import org.apache.jackrabbit.oak.osgi.ObserverTracker;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Component(policy = ConfigurationPolicy.REQUIRE)
-@Service(NodeStore.class)
public class SegmentNodeStoreService extends ProxyNodeStore
implements Observable {
@@ -57,6 +63,13 @@ public class SegmentNodeStoreService ext
@Property(description="Cache size (MB)", intValue=256)
public static final String CACHE = "cache";
+ /**
+ * Boolean value indicating a blobStore is to be used
+ */
+ public static final String CUSTOM_BLOB_STORE = "customBlobStore";
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
private String name;
private SegmentStore store;
@@ -65,18 +78,33 @@ public class SegmentNodeStoreService ext
private ObserverTracker observerTracker;
+ private ComponentContext context;
+
+ private ServiceRegistration registration;
+
+ private ServiceTracker blobStoreTracker;
+
@Override
protected synchronized SegmentNodeStore getNodeStore() {
checkState(delegate != null, "service must be activated when used");
return delegate;
}
- public SegmentStore getSegmentStore() {
- return store;
+ @Activate
+ public void activate(ComponentContext context) throws IOException {
+ this.context = context;
+
+ if(Boolean.parseBoolean(lookup(context, CUSTOM_BLOB_STORE))){
+ log.info("BlobStore use enabled. SegmentNodeStore would be
initialized when BlobStore would be available");
+ blobStoreTracker = new ServiceTracker(context.getBundleContext(),
+ BlobStore.class.getName(), new BlobStoreTracker());
+ blobStoreTracker.open();
+ }else{
+ initialize(context, null);
+ }
}
- @Activate
- public synchronized void activate(ComponentContext context)
+ public synchronized void initialize(ComponentContext context, BlobStore
blobStore)
throws IOException {
Dictionary<?, ?> properties = context.getProperties();
name = "" + properties.get(NAME);
@@ -84,6 +112,8 @@ public class SegmentNodeStoreService ext
String directory = lookup(context, DIRECTORY);
if (directory == null) {
directory = "tarmk";
+ }else{
+ directory = FilenameUtils.concat(directory, "segmentstore");
}
String mode = lookup(context, MODE);
@@ -98,12 +128,15 @@ public class SegmentNodeStoreService ext
}
store = new FileStore(
+ blobStore,
new File(directory),
Integer.parseInt(size), "64".equals(mode));
delegate = new SegmentNodeStore(store);
observerTracker = new ObserverTracker(delegate);
observerTracker.start(context.getBundleContext());
+
+ registration =
context.getBundleContext().registerService(NodeStore.class.getName(), this,
null);
}
private static String lookup(ComponentContext context, String property) {
@@ -111,18 +144,30 @@ public class SegmentNodeStoreService ext
return context.getProperties().get(property).toString();
}
if (context.getBundleContext().getProperty(property) != null) {
- return context.getBundleContext().getProperty(property).toString();
+ return context.getBundleContext().getProperty(property);
}
return null;
}
@Deactivate
public synchronized void deactivate() {
+ unregisterNodeStore();
+
observerTracker.stop();
delegate = null;
store.close();
store = null;
+
+ blobStoreTracker.close();
+ blobStoreTracker = null;
+ }
+
+ private void unregisterNodeStore() {
+ if(registration != null){
+ registration.unregister();
+ registration = null;
+ }
}
//------------------------------------------------------------< Observable
>---
@@ -139,4 +184,29 @@ public class SegmentNodeStoreService ext
return name + ": " + delegate;
}
+ private class BlobStoreTracker implements ServiceTrackerCustomizer {
+
+ @Override
+ public Object addingService(ServiceReference reference) {
+ BlobStore blobStore = (BlobStore)
context.getBundleContext().getService(reference);
+ try {
+ initialize(context, blobStore);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return blobStore;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference reference, Object
service) {
+
+ }
+
+ @Override
+ public void removedService(ServiceReference reference, Object service)
{
+ log.info("BlobStore services unregistered. Unregistered the
SegmentNodeStore");
+ unregisterNodeStore();
+ }
+ }
+
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java?rev=1577449&r1=1577448&r2=1577449&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
Fri Mar 14 08:42:38 2014
@@ -20,6 +20,7 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
public interface SegmentStore {
@@ -74,4 +75,10 @@ public interface SegmentStore {
*/
Blob readBlob(String reference);
+ /**
+ * Returns the external BlobStore (if configured) with this store
+ */
+ @CheckForNull
+ BlobStore getBlobStore();
+
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1577449&r1=1577448&r2=1577449&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Fri Mar 14 08:42:38 2014
@@ -38,8 +38,10 @@ import static org.apache.jackrabbit.oak.
import static
org.apache.jackrabbit.oak.plugins.segment.Segment.MAX_SEGMENT_SIZE;
import static org.apache.jackrabbit.oak.plugins.segment.Segment.align;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.SequenceInputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
@@ -57,6 +59,7 @@ import org.apache.jackrabbit.oak.api.Blo
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.DefaultNodeStateDiff;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -712,6 +715,7 @@ public class SegmentWriter {
private RecordId internalWriteStream(InputStream stream)
throws IOException {
+ BlobStore blobStore = store.getBlobStore();
byte[] data = new byte[MAX_SEGMENT_SIZE];
int n = ByteStreams.read(stream, data, 0, data.length);
@@ -719,6 +723,9 @@ public class SegmentWriter {
// store them directly as small- or medium-sized value records
if (n < Segment.MEDIUM_LIMIT) {
return writeValueRecord(n, data);
+ }else if (blobStore != null){
+ String blobId = blobStore.writeBlob(new SequenceInputStream(new
ByteArrayInputStream(data, 0, n), stream));
+ return writeValueRecord(blobId, blobStore.getBlobLength(blobId));
}
long length = n;
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1577449&r1=1577448&r2=1577449&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Fri Mar 14 08:42:38 2014
@@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.atomic.AtomicReference;
import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
@@ -40,6 +41,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
@@ -63,6 +65,8 @@ public class FileStore implements Segmen
private final File directory;
+ private final BlobStore blobStore;
+
private final int maxFileSize;
private final boolean memoryMapping;
@@ -96,20 +100,26 @@ public class FileStore implements Segmen
*/
private final CountDownLatch timeToClose = new CountDownLatch(1);
+ public FileStore(BlobStore blobStore, File directory, int maxFileSizeMB,
boolean memoryMapping)
+ throws IOException {
+ this(blobStore, directory, EMPTY_NODE, maxFileSizeMB,
DEFAULT_MEMORY_CACHE_SIZE, memoryMapping);
+ }
+
public FileStore(File directory, int maxFileSizeMB, boolean memoryMapping)
throws IOException {
- this(directory, EMPTY_NODE, maxFileSizeMB, DEFAULT_MEMORY_CACHE_SIZE,
memoryMapping);
+ this(null, directory, EMPTY_NODE, maxFileSizeMB,
DEFAULT_MEMORY_CACHE_SIZE, memoryMapping);
}
public FileStore(File directory, int maxFileSizeMB, int cacheSizeMB,
boolean memoryMapping) throws IOException {
- this(directory, EMPTY_NODE, maxFileSizeMB, cacheSizeMB, memoryMapping);
+ this(null,directory, EMPTY_NODE, maxFileSizeMB, cacheSizeMB,
memoryMapping);
}
public FileStore(
- final File directory, NodeState initial, int maxFileSizeMB,
+ final BlobStore blobStore, final File directory, NodeState
initial, int maxFileSizeMB,
int cacheSizeMB, boolean memoryMapping) throws IOException {
checkNotNull(directory).mkdirs();
+ this.blobStore = blobStore;
this.directory = directory;
this.maxFileSize = maxFileSizeMB * MB;
this.memoryMapping = memoryMapping;
@@ -373,7 +383,15 @@ public class FileStore implements Segmen
@Override
public Blob readBlob(String reference) {
- return new FileBlob(reference); // FIXME: proper reference lookup
+ if(blobStore != null){
+ return new BlobStoreBlob(blobStore, reference);
+ }
+ throw new IllegalStateException("Attempt to read external reference
["+reference+"] " +
+ "without specifying BlobStore");
}
+ @Override
+ public BlobStore getBlobStore() {
+ return blobStore;
+ }
}
\ No newline at end of file
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java?rev=1577449&r1=1577448&r2=1577449&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/http/HttpStore.java
Fri Mar 14 08:42:38 2014
@@ -40,6 +40,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
import com.google.common.io.ByteStreams;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
public class HttpStore implements SegmentStore {
@@ -163,4 +164,9 @@ public class HttpStore implements Segmen
return null;
}
+ @Override @CheckForNull
+ public BlobStore getBlobStore() {
+ return null;
+ }
+
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java?rev=1577449&r1=1577448&r2=1577449&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
Fri Mar 14 08:42:38 2014
@@ -30,6 +30,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -123,4 +124,9 @@ public class MemoryStore implements Segm
return null;
}
+ @Override
+ public BlobStore getBlobStore() {
+ return null;
+ }
+
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/ExternalBlobTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/ExternalBlobTest.java?rev=1577449&r1=1577448&r2=1577449&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/ExternalBlobTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/ExternalBlobTest.java
Fri Mar 14 08:42:38 2014
@@ -17,13 +17,15 @@
package org.apache.jackrabbit.oak.plugins.segment;
import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.FileDataStore;
import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob;
import org.apache.jackrabbit.oak.plugins.segment.file.FileBlob;
import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -31,11 +33,15 @@ import org.apache.jackrabbit.oak.spi.sta
import org.junit.After;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Random;
import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
public class ExternalBlobTest {
@@ -44,9 +50,35 @@ public class ExternalBlobTest {
private FileBlob fileBlob;
@Test
- public void testCreateAndRead() throws Exception {
- SegmentNodeStore nodeStore = getNodeStore();
+ public void testFileBlob() throws Exception {
+ nodeStore = getNodeStore(new TestBlobStore());
+ testCreateAndRead(getFileBlob());
+ }
+
+ @Test
+ public void testDataStoreBlob() throws Exception {
+ FileDataStore fds = new FileDataStore();
+ fds.setMinRecordLength(4092);
+ fds.init(getWorkDir().getAbsolutePath());
+ DataStoreBlobStore dbs = new DataStoreBlobStore(fds);
+ nodeStore = getNodeStore(dbs);
+
+ //Test for Blob which get inlined
+ byte[] data = new byte[fds.getMinRecordLength()-2];
+ new Random().nextBytes(data);
+ Blob b1 = testCreateAndRead(nodeStore.createBlob(new
ByteArrayInputStream(data)));
+ assertTrue(b1 instanceof SegmentBlob);
+ assertNull(b1.getReference());
+
+ //Test for Blob which need to be pushed to BlobStore
+ byte[] data2 = new byte[Segment.MEDIUM_LIMIT + 1];
+ new Random().nextBytes(data2);
+ Blob b2 = testCreateAndRead(nodeStore.createBlob(new
ByteArrayInputStream(data2)));
+ assertNotNull(b2.getReference());
+ assertNotNull(dbs.getRecordIfStored(new
DataIdentifier(b2.getReference())));
+ }
+ public Blob testCreateAndRead(Blob blob) throws Exception {
NodeState state = nodeStore.getRoot().getChildNode("hello");
if (!state.exists()) {
NodeBuilder builder = nodeStore.getRoot().builder();
@@ -54,7 +86,6 @@ public class ExternalBlobTest {
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
- Blob blob = getFileBlob();
NodeBuilder builder = nodeStore.getRoot().builder();
builder.getChildNode("hello").setProperty("world", blob);
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
@@ -63,24 +94,30 @@ public class ExternalBlobTest {
blob = state.getProperty("world").getValue(Type.BINARY);
assertTrue("Blob written and read must be equal",
- AbstractBlob.equal(blob, getFileBlob()));
+ AbstractBlob.equal(blob, blob));
+ return blob;
}
@After
- public void close() {
+ public void close() throws IOException {
if (store != null) {
store.close();
}
+ FileUtils.cleanDirectory(getWorkDir());
}
- protected SegmentNodeStore getNodeStore() throws IOException {
+ protected SegmentNodeStore getNodeStore(BlobStore blobStore) throws
IOException {
if (nodeStore == null) {
- store = new FileStore(new File("target", "ExternalBlobTest"), 256,
false);
+ store = new FileStore(blobStore, getWorkDir(), 256, false);
nodeStore = new SegmentNodeStore(store);
}
return nodeStore;
}
+ private File getWorkDir(){
+ return new File("target", "ExternalBlobTest");
+ }
+
private FileBlob getFileBlob() throws IOException {
if (fileBlob == null) {
File file = File.createTempFile("blob", "tmp");
@@ -94,4 +131,29 @@ public class ExternalBlobTest {
}
return fileBlob;
}
+
+ private class TestBlobStore implements BlobStore {
+ @Override
+ public String writeBlob(InputStream in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int readBlob(String blobId, long pos, byte[] buff, int off, int
length) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getBlobLength(String blobId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InputStream getInputStream(String blobId) throws IOException {
+ if(blobId.equals(fileBlob.getReference())){
+ return fileBlob.getNewStream();
+ }
+ return null;
+ }
+ }
}