Author: thomasm Date: Thu Sep 17 10:24:07 2015 New Revision: 1703555 URL: http://svn.apache.org/r1703555 Log: OAK-3148 Online migration process for the binaries
Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/BlobStoreWrapper.java jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/SplitBlobStoreService.java jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/BlobIdSet.java jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/DefaultSplitBlobStore.java jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStore.java jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/WrappingSplitBlobStore.java jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/ jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStoreTest.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigration.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrationMBean.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrator.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIterator.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/AbstractMigratorTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIteratorTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DocumentToExternalMigrationTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ExternalToExternalMigrationTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/SegmentToExternalMigrationTest.java Modified: jackrabbit/oak/trunk/oak-blob/pom.xml jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/FileBlobStoreService.java jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/package-info.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Modified: jackrabbit/oak/trunk/oak-blob/pom.xml URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/pom.xml?rev=1703555&r1=1703554&r2=1703555&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-blob/pom.xml (original) +++ jackrabbit/oak/trunk/oak-blob/pom.xml Thu Sep 17 10:24:07 2015 @@ -45,7 +45,8 @@ * </Import-Package> <Export-Package> - org.apache.jackrabbit.oak.spi.blob + org.apache.jackrabbit.oak.spi.blob, + org.apache.jackrabbit.oak.spi.blob.split </Export-Package> </instructions> </configuration> Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/BlobStoreWrapper.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/BlobStoreWrapper.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/BlobStoreWrapper.java (added) +++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/BlobStoreWrapper.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.spi.blob; + +public interface BlobStoreWrapper extends BlobStore { + + void setBlobStore(BlobStore blobStore); + +} \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/FileBlobStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/FileBlobStoreService.java?rev=1703555&r1=1703554&r2=1703555&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/FileBlobStoreService.java (original) +++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/FileBlobStoreService.java Thu Sep 17 10:24:07 2015 @@ -19,7 +19,10 @@ package org.apache.jackrabbit.oak.spi.blob.osgi; +import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.PROP_SPLIT_BLOBSTORE; +import java.util.Dictionary; +import java.util.Hashtable; import java.util.Map; import org.apache.commons.io.FilenameUtils; @@ -54,10 +57,14 @@ public class FileBlobStoreService { } BlobStore blobStore = new FileBlobStore(FilenameUtils.concat(homeDir,"datastore")); PropertiesUtil.populate(blobStore, config, false); + Dictionary<String, Object> props = new Hashtable<String, Object>(); + if (context.getProperties().get(PROP_SPLIT_BLOBSTORE) != null) { + props.put(PROP_SPLIT_BLOBSTORE, context.getProperties().get(PROP_SPLIT_BLOBSTORE)); + } reg = context.getBundleContext().registerService(new String[]{ BlobStore.class.getName(), GarbageCollectableBlobStore.class.getName() - }, blobStore, null); + }, blobStore, props); } @Deactivate Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/SplitBlobStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/SplitBlobStoreService.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/SplitBlobStoreService.java (added) +++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/SplitBlobStoreService.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.spi.blob.osgi; + +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.Map; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.ConfigurationPolicy; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.PropertyOption; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.ReferencePolicy; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.split.DefaultSplitBlobStore; +import org.apache.jackrabbit.oak.spi.blob.split.WrappingSplitBlobStore; +import org.osgi.framework.BundleContext; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.BlobStoreType.*; + +@Component(policy = ConfigurationPolicy.REQUIRE) +public class SplitBlobStoreService { + private static final Logger log = LoggerFactory.getLogger(SplitBlobStoreService.class); + + @Property + private static final String PROP_HOME = "repository.home"; + + @Property(options = { @PropertyOption(name = "External", value = "EXTERNAL"), + @PropertyOption(name = "Internal - Segment", value = "SEGMENT"), + @PropertyOption(name = "Internal - Document", value = "DOCUMENT") }) + private static final String PROP_OLD_BLOB_STORE_TYPE = "split.old.blobstore.type"; + + public static final String PROP_SPLIT_BLOBSTORE = "split.blobstore"; + + public static final String ONLY_STANDALONE_TARGET = "(&(!(split.blobstore=old))(!(split.blobstore=new)))"; + + @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC, target = "(split.blobstore=old)") + private BlobStore oldBlobStore; + + @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC, target = "(split.blobstore=new)") + private BlobStore newBlobStore; + + private BundleContext ctx; + + private ServiceRegistration reg; + + private String homeDir; + + private BlobStoreType oldBlobStoreType; + + @Activate + protected void activate(ComponentContext context, Map<String, Object> config) throws InvalidSyntaxException { + String oldTypeName = lookup(context, PROP_OLD_BLOB_STORE_TYPE); + if (oldTypeName == null) { + oldBlobStoreType = BlobStoreType.EXTERNAL; + } else { + oldBlobStoreType = BlobStoreType.valueOf(oldTypeName); + } + homeDir = lookup(context, PROP_HOME); + if (homeDir != null) { + log.info("Initializing the SplitBlobStore with home [{}]", homeDir); + } else { + log.warn("Can't initialize SplitBlobStore - empty {}", PROP_HOME); + return; + } + ctx = context.getBundleContext(); + registerSplitBlobStore(); + } + + @Deactivate + protected void deactivate() { + unregisterSplitBlobStore(); + ctx = null; + } + + private void registerSplitBlobStore() { + if (oldBlobStore == null && oldBlobStoreType == BlobStoreType.EXTERNAL) { + log.info("No BlobStore with ({}=old)", PROP_SPLIT_BLOBSTORE); + return; + } + if (newBlobStore == null) { + log.info("No BlobStore with ({}=new)", PROP_SPLIT_BLOBSTORE); + return; + } + if (reg != null) { + log.info("SplitBlobStore already registered"); + return; + } + if (ctx == null) { + log.info("Component not activated yet"); + return; + } + log.info("Registering SplitBlobStore with old={} ({}) and new={}", oldBlobStore, oldBlobStoreType, + newBlobStore); + BlobStore blobStore; + if (oldBlobStoreType == EXTERNAL || oldBlobStoreType == SEGMENT) { + blobStore = new DefaultSplitBlobStore(homeDir, oldBlobStore, newBlobStore); + } else if (oldBlobStoreType == DOCUMENT) { + blobStore = new WrappingSplitBlobStore(homeDir, newBlobStore); + } else { + throw new IllegalStateException("Illegal blob store type value: " + oldBlobStoreType); + } + Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put("service.pid", "org.apache.jackrabbit.oak.spi.blob.split.SplitBlobStore"); + reg = ctx.registerService(new String[] { BlobStore.class.getName() }, blobStore, props); + } + + private void unregisterSplitBlobStore() { + if (reg != null) { + reg.unregister(); + } + reg = null; + } + + private static String lookup(ComponentContext context, String property) { + // Prefer property from BundleContext first + if (context.getBundleContext().getProperty(property) != null) { + return context.getBundleContext().getProperty(property); + } + + if (context.getProperties().get(property) != null) { + return context.getProperties().get(property).toString(); + } + return null; + } + + protected void bindOldBlobStore(BlobStore blobStore) { + this.oldBlobStore = blobStore; + registerSplitBlobStore(); + } + + protected void unbindOldBlobStore(BlobStore blobStore) { + this.oldBlobStore = null; + unregisterSplitBlobStore(); + } + + protected void bindNewBlobStore(BlobStore blobStore) { + this.newBlobStore = blobStore; + registerSplitBlobStore(); + } + + protected void unbindNewBlobStore(BlobStore blobStore) { + this.newBlobStore = null; + unregisterSplitBlobStore(); + } + + enum BlobStoreType { + EXTERNAL, DOCUMENT, SEGMENT + } +} Modified: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/package-info.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/package-info.java?rev=1703555&r1=1703554&r2=1703555&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/package-info.java (original) +++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/package-info.java Thu Sep 17 10:24:07 2015 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@Version("1.1") +@Version("1.2") @Export(optional = "provide:=true") package org.apache.jackrabbit.oak.spi.blob; Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/BlobIdSet.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/BlobIdSet.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/BlobIdSet.java (added) +++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/BlobIdSet.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.spi.blob.split; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; + +class BlobIdSet { + + private static final Logger log = LoggerFactory.getLogger(BlobIdSet.class); + + private final File store; + + private final BloomFilter<CharSequence> bloomFilter; + + private final Cache<String, Boolean> cache; + + BlobIdSet(String repositoryDir, String filename) { + store = new File(new File(repositoryDir), filename); + bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 9000000); // about 8MB + cache = CacheBuilder.newBuilder().maximumSize(1000).build(); + fillBloomFilter(); + } + + synchronized boolean contains(String blobId) throws IOException { + if (!bloomFilter.apply(blobId)) { + return false; + } + Boolean cached = cache.getIfPresent(blobId); + if (cached != null) { + return cached; + } + + if (isPresentInStore(blobId)) { + cache.put(blobId, Boolean.TRUE); + bloomFilter.put(blobId); + return true; + } else { + cache.put(blobId, Boolean.FALSE); + return false; + } + } + + synchronized void add(String blobId) throws IOException { + addToStore(blobId); + bloomFilter.put(blobId); + cache.put(blobId, Boolean.TRUE); + } + + private boolean isPresentInStore(String blobId) throws FileNotFoundException, IOException { + if (!store.exists()) { + return false; + } + BufferedReader reader = new BufferedReader(new FileReader(store)); + try { + String line; + while ((line = reader.readLine()) != null) { + if (line.equals(blobId)) { + return true; + } + } + } finally { + reader.close(); + } + return false; + } + + private void addToStore(String blobId) throws IOException { + FileWriter writer = new FileWriter(store.getPath(), true); + try { + writer.append(blobId).append('\n'); + } finally { + writer.close(); + } + } + + private void fillBloomFilter() { + if (!store.exists()) { + return; + } + BufferedReader reader = null; + try { + reader = new BufferedReader(new FileReader(store)); + String line; + while ((line = reader.readLine()) != null) { + bloomFilter.put(line); + } + } catch (IOException e) { + log.error("Can't fill bloom filter", e); + } finally { + IOUtils.closeQuietly(reader); + } + } +} Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/DefaultSplitBlobStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/DefaultSplitBlobStore.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/DefaultSplitBlobStore.java (added) +++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/DefaultSplitBlobStore.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.spi.blob.split; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultSplitBlobStore implements SplitBlobStore { + + private static final Logger log = LoggerFactory.getLogger(BlobIdSet.class); + + private static final String OLD_BLOBSTORE_PREFIX = "o_"; + + private static final String NEW_BLOBSTORE_PREFIX = "n_"; + + private final BlobStore oldBlobStore; + + private final BlobStore newBlobStore; + + private final BlobIdSet migratedBlobs; + + public DefaultSplitBlobStore(String repositoryDir, BlobStore oldBlobStore, BlobStore newBlobStore) { + this.oldBlobStore = oldBlobStore; + this.newBlobStore = newBlobStore; + this.migratedBlobs = new BlobIdSet(repositoryDir, "migrated_blobs.txt"); + } + + public boolean isMigrated(String blobId) throws IOException { + return migratedBlobs.contains(blobId); + } + + @Override + public String writeBlob(InputStream in) throws IOException { + String blobId = newBlobStore.writeBlob(in); + migratedBlobs.add(blobId); + return blobId; + } + + @Override + public int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws IOException { + return chooseBlobStoreByBlobId(blobId).readBlob(blobId, pos, buff, off, length); + } + + @Override + public long getBlobLength(String blobId) throws IOException { + return chooseBlobStoreByBlobId(blobId).getBlobLength(blobId); + } + + @Override + public InputStream getInputStream(String blobId) throws IOException { + return chooseBlobStoreByBlobId(blobId).getInputStream(blobId); + } + + @Override + public String getBlobId(String reference) { + if (reference.startsWith(NEW_BLOBSTORE_PREFIX)) { + return newBlobStore.getBlobId(reference.substring(NEW_BLOBSTORE_PREFIX.length())); + } else if (reference.startsWith(OLD_BLOBSTORE_PREFIX)) { + return oldBlobStore.getBlobId(reference.substring(OLD_BLOBSTORE_PREFIX.length())); + } else { + log.error("Invalid reference: {}", reference); + return null; + } + } + + @Override + public String getReference(String blobId) { + try { + if (isMigrated(blobId)) { + return NEW_BLOBSTORE_PREFIX + newBlobStore.getReference(blobId); + } else { + return OLD_BLOBSTORE_PREFIX + oldBlobStore.getReference(blobId); + } + } catch (IOException e) { + log.error("Can't get reference", e); + return null; + } + } + + private BlobStore chooseBlobStoreByBlobId(String blobId) throws IOException { + if (isMigrated(blobId) || oldBlobStore == null) { + return newBlobStore; + } else { + return oldBlobStore; + } + } + + @Override + public String toString() { + return String.format("SplitBlobStore[old={}, new={}]", oldBlobStore, newBlobStore); + } +} Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStore.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStore.java (added) +++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStore.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.spi.blob.split; + +import java.io.IOException; + +import org.apache.jackrabbit.oak.spi.blob.BlobStore; + +public interface SplitBlobStore extends BlobStore { + + boolean isMigrated(String blobId) throws IOException; + +} Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/WrappingSplitBlobStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/WrappingSplitBlobStore.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/WrappingSplitBlobStore.java (added) +++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/WrappingSplitBlobStore.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.spi.blob.split; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.BlobStoreWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WrappingSplitBlobStore implements BlobStoreWrapper, SplitBlobStore { + + private static final Logger log = LoggerFactory.getLogger(WrappingSplitBlobStore.class); + + private DefaultSplitBlobStore splitBlobStore; + + private final String repositoryDir; + + private final BlobStore newBlobStore; + + public WrappingSplitBlobStore(String repositoryDir, BlobStore newBlobStore) { + this.repositoryDir = repositoryDir; + this.newBlobStore = newBlobStore; + } + + @Override + public void setBlobStore(BlobStore blobStore) { + log.info("Internal blob store set: {}", blobStore); + splitBlobStore = new DefaultSplitBlobStore(repositoryDir, blobStore, newBlobStore); + } + + private SplitBlobStore getSplitBlobStore() { + if (splitBlobStore == null) { + throw new IllegalStateException("The old blob store hasn't been set yet."); + } + return splitBlobStore; + } + + @Override + public String writeBlob(InputStream in) throws IOException { + return getSplitBlobStore().writeBlob(in); + } + + @Override + public int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws IOException { + return getSplitBlobStore().readBlob(blobId, pos, buff, off, length); + } + + @Override + public long getBlobLength(String blobId) throws IOException { + return getSplitBlobStore().getBlobLength(blobId); + } + + @Override + public InputStream getInputStream(String blobId) throws IOException { + return getSplitBlobStore().getInputStream(blobId); + } + + @Override + public String getBlobId(String reference) { + return getSplitBlobStore().getBlobId(reference); + } + + @Override + public String getReference(String blobId) { + return getSplitBlobStore().getReference(blobId); + } + + @Override + public boolean isMigrated(String blobId) throws IOException { + return getSplitBlobStore().isMigrated(blobId); + } + +} Added: jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStoreTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStoreTest.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStoreTest.java (added) +++ jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStoreTest.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.spi.blob.split; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.FileBlobStore; +import org.apache.jackrabbit.oak.spi.blob.split.DefaultSplitBlobStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.io.Files; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertTrue; + +public class SplitBlobStoreTest { + + private static final int LENGTH = 1024; + + private final Random random = new Random(); + + private File repository; + + private BlobStore oldBlobStore; + + private BlobStore newBlobStore; + + private DefaultSplitBlobStore splitBlobStore; + + private List<String> oldBlobIds; + + private List<String> newBlobIds; + + @Before + public void setup() throws IOException { + repository = Files.createTempDir(); + oldBlobStore = new FileBlobStore(repository.getPath() + "/old"); + newBlobStore = new FileBlobStore(repository.getPath() + "/new"); + splitBlobStore = new DefaultSplitBlobStore(repository.getPath(), oldBlobStore, newBlobStore); + oldBlobIds = addBlobs(oldBlobStore); + newBlobIds = addBlobs(splitBlobStore); + } + + @After + public void teardown() throws IOException { + FileUtils.deleteDirectory(repository); + } + + @Test + public void testLength() throws IOException { + for (String id : oldBlobIds) { + assertEquals(LENGTH, splitBlobStore.getBlobLength(id)); + } + for (String id : newBlobIds) { + assertEquals(LENGTH, newBlobStore.getBlobLength(id)); + assertEquals(LENGTH, splitBlobStore.getBlobLength(id)); + } + } + + @Test + public void testIsMigrated() throws IOException { + for (String id : oldBlobIds) { + assertFalse(splitBlobStore.isMigrated(id)); + } + for (String id : newBlobIds) { + assertTrue(splitBlobStore.isMigrated(id)); + } + } + + @Test + public void testGetInputStream() throws IOException { + for (String id : oldBlobIds) { + assertStreamEquals(oldBlobStore.getInputStream(id), splitBlobStore.getInputStream(id)); + } + for (String id : newBlobIds) { + assertStreamEquals(newBlobStore.getInputStream(id), splitBlobStore.getInputStream(id)); + } + } + + @Test + public void testReadByte() throws IOException { + byte[] expected = new byte[LENGTH]; + byte[] actual = new byte[LENGTH]; + for (String id : oldBlobIds) { + oldBlobStore.readBlob(id, 0, expected, 0, LENGTH); + splitBlobStore.readBlob(id, 0, actual, 0, LENGTH); + assertArrayEquals(expected, actual); + } + for (String id : newBlobIds) { + newBlobStore.readBlob(id, 0, expected, 0, LENGTH); + splitBlobStore.readBlob(id, 0, actual, 0, LENGTH); + assertArrayEquals(expected, actual); + } + } + + @Test + public void testReferences() throws IOException { + for (String id : oldBlobIds) { + String reference = splitBlobStore.getReference(id); + assertEquals(id, splitBlobStore.getBlobId(reference)); + } + for (String id : newBlobIds) { + String reference = splitBlobStore.getReference(id); + assertEquals(id, splitBlobStore.getBlobId(reference)); + } + } + + private List<String> addBlobs(BlobStore blobStore) throws IOException { + List<String> ids = new ArrayList<String>(); + for (int i = 0; i < 5; i++) { + byte[] buffer = new byte[LENGTH]; + random.nextBytes(buffer); + InputStream bis = new ByteArrayInputStream(buffer); + ids.add(blobStore.writeBlob(bis)); + } + return ids; + } + + private static void assertStreamEquals(InputStream expected, InputStream actual) throws IOException { + while (true) { + int expectedByte = expected.read(); + int actualByte = actual.read(); + assertEquals(expectedByte, actualByte); + if (expectedByte == -1) { + break; + } + } + } + + private static void assertArrayEquals(byte[] expected, byte[] actual) throws IOException { + assertEquals(expected.length, actual.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], actual[i]); + } + } + +} \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java?rev=1703555&r1=1703554&r2=1703555&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java Thu Sep 17 10:24:07 2015 @@ -36,6 +36,8 @@ import org.osgi.service.component.Compon import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.PROP_SPLIT_BLOBSTORE; + public abstract class AbstractDataStoreService { private static final String PROP_HOME = "repository.home"; @@ -65,6 +67,9 @@ public abstract class AbstractDataStoreS Dictionary<String, Object> props = new Hashtable<String, Object>(); props.put(Constants.SERVICE_PID, ds.getClass().getName()); props.put(DESCRIPTION, getDescription()); + if (context.getProperties().get(PROP_SPLIT_BLOBSTORE) != null) { + props.put(PROP_SPLIT_BLOBSTORE, context.getProperties().get(PROP_SPLIT_BLOBSTORE)); + } reg = context.getBundleContext().registerService(new String[]{ BlobStore.class.getName(), Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigration.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigration.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigration.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigration.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.blob.migration; + +import static java.lang.System.nanoTime; +import static org.apache.jackrabbit.oak.management.ManagementOperation.done; +import static org.apache.jackrabbit.oak.management.ManagementOperation.newManagementOperation; +import static org.apache.jackrabbit.oak.management.ManagementOperation.Status.formatTime; +import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import javax.annotation.Nonnull; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean.StatusCode; +import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean; +import org.apache.jackrabbit.oak.management.ManagementOperation; +import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.split.SplitBlobStore; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.apache.jackrabbit.oak.spi.whiteboard.Registration; +import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; +import org.osgi.framework.BundleContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Component +public class BlobMigration extends AnnotatedStandardMBean implements BlobMigrationMBean { + + public static final String OP_NAME = "Blob migration"; + + private static final Logger log = LoggerFactory.getLogger(BlobMigrator.class); + + private ManagementOperation<String> migrationOp = done(OP_NAME, ""); + + private static final CompositeType TYPE; + + static { + CompositeType type; + try { + type = new CompositeType("BlobMigrationStatus", "Status of the blob migraiton", + new String[] { "isRunning", "migratedNodes", "lastProcessedPath", "operationStatus" }, + new String[] { "Migration in progress", "Total number of migrated nodes", "Last processed path", "Status of the operation" }, + new OpenType[] { SimpleType.BOOLEAN, SimpleType.INTEGER, SimpleType.STRING, ManagementOperation.Status.ITEM_TYPES }); + } catch (OpenDataException e) { + type = null; + log.error("Can't create a CompositeType", e); + } + TYPE = type; + } + + @Reference(target = "(service.pid=org.apache.jackrabbit.oak.spi.blob.split.SplitBlobStore)") + private BlobStore splitBlobStore; + + @Reference + private NodeStore nodeStore; + + private Executor executor = Executors.newSingleThreadExecutor(); + + private BlobMigrator migrator; + + private Registration mbeanReg; + + public BlobMigration() { + super(BlobMigrationMBean.class); + } + + @Activate + private void activate(BundleContext ctx) { + Whiteboard wb = new OsgiWhiteboard(ctx); + migrator = new BlobMigrator((SplitBlobStore) splitBlobStore, nodeStore); + mbeanReg = registerMBean(wb, BlobMigrationMBean.class, this, BlobMigrationMBean.TYPE, OP_NAME); + } + + @Deactivate + private void deactivate() throws InterruptedException { + if (migrator != null) { + migrator.stop(); + migrator = null; + } + if (mbeanReg != null) { + mbeanReg.unregister(); + mbeanReg = null; + } + } + + @Nonnull + @Override + public String startBlobMigration(final boolean resume) { + if (migrationOp.isDone()) { + migrationOp = newManagementOperation(OP_NAME, new Callable<String>() { + @Override + public String call() throws Exception { + long t0 = nanoTime(); + boolean finished; + if (resume) { + finished = migrator.migrate(); + } else { + finished = migrator.start(); + } + String duration = formatTime(nanoTime() - t0); + if (finished) { + return "All blobs migrated in " + duration; + } else { + return "Migration stopped manually after " + duration; + } + } + }); + executor.execute(migrationOp); + return "Migration started"; + } else { + return "Migration is already in progress"; + } + } + + @Nonnull + @Override + public String stopBlobMigration() { + migrator.stop(); + return "Migration will be stopped"; + } + + @Nonnull + @Override + public CompositeData getBlobMigrationStatus() throws OpenDataException { + Map<String, Object> status = new HashMap<String, Object>(); + status.put("isRunning", migrationOp.getStatus().getCode() == StatusCode.RUNNING); + status.put("migratedNodes", migrator.getTotalMigratedNodes()); + status.put("lastProcessedPath", migrator.getLastProcessedPath()); + status.put("operationStatus", migrationOp.getStatus().toCompositeData()); + return new CompositeDataSupport(TYPE, status); + } +} Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrationMBean.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrationMBean.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrationMBean.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrationMBean.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.blob.migration; + +import javax.annotation.Nonnull; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; + +import org.apache.jackrabbit.oak.commons.jmx.Description; +import org.apache.jackrabbit.oak.commons.jmx.Name; + +public interface BlobMigrationMBean { + + String TYPE = "BlobMigration"; + + @Nonnull + @Description("Start or resume the blob migration") + String startBlobMigration( + @Name("resume") @Description("true to resume stopped migration or false to start it from scratch") boolean resume); + + @Nonnull + @Description("Stop the blob migration") + String stopBlobMigration(); + + @Nonnull + CompositeData getBlobMigrationStatus() throws OpenDataException; + +} Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrator.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrator.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrator.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrator.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.blob.migration; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; +import org.apache.jackrabbit.oak.plugins.memory.PropertyBuilder; +import org.apache.jackrabbit.oak.spi.blob.split.SplitBlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BlobMigrator { + + private static final Logger log = LoggerFactory.getLogger(BlobMigrator.class); + + private static final int MERGE_LIMIT = 100; + + private static final int MERGE_TIMEOUT = 30; + + private final SplitBlobStore blobStore; + + private final NodeStore nodeStore; + + private final AtomicBoolean stopMigration = new AtomicBoolean(false); + + private DepthFirstNodeIterator nodeIterator; + + private NodeBuilder rootBuilder; + + private long lastCommit; + + private int migratedNodes; + + private volatile String lastPath; + + private volatile int totalMigratedNodes; + + public BlobMigrator(SplitBlobStore blobStore, NodeStore nodeStore) { + this.blobStore = blobStore; + this.nodeStore = nodeStore; + refreshAndReset(); + } + + public boolean start() throws IOException { + totalMigratedNodes = 0; + refreshAndReset(); + return migrate(); + } + + public boolean migrate() throws IOException { + do { + while (nodeIterator.hasNext()) { + lastPath = nodeIterator.getPath(); + if (stopMigration.getAndSet(false)) { + if (migratedNodes > 0) { + tryCommit(); + } + return false; + } + migrateNode(rootBuilder, nodeIterator); + if (timeToCommit()) { + tryCommit(); + } + } + // at this point we iterated over the whole repository + // the last thing to do is to check if we don't have + // any nodes waiting to be migrated. if the operation + // fails we have to start from the beginning + } while (migratedNodes > 0 && !tryCommit()); + return true; + } + + private boolean tryCommit() { + try { + nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + totalMigratedNodes += migratedNodes; + log.info("{} nodes merged succesfully. Nodes migrated in this session: {}", migratedNodes, totalMigratedNodes); + lastCommit = System.currentTimeMillis(); + migratedNodes = 0; + return true; + } catch (CommitFailedException e) { + log.error("Can't commit. Resetting the migrator", e); + refreshAndReset(); + return false; + } + } + + private boolean timeToCommit() { + long changesMerged = (System.currentTimeMillis() - lastCommit) / 1000; + if (migratedNodes >= MERGE_LIMIT) { + log.info("Migrated nodes count: {}. Merging changes.", migratedNodes); + return true; + } else if (migratedNodes > 0 && changesMerged >= MERGE_TIMEOUT) { + log.info("Changes have been merged {}s ago. Merging {} nodes.", changesMerged, migratedNodes); + return true; + } + return false; + } + + public void stop() { + stopMigration.set(true); + } + + public String getLastProcessedPath() { + return lastPath; + } + + public int getTotalMigratedNodes() { + return totalMigratedNodes; + } + + private void refreshAndReset() { + NodeState rootState = nodeStore.getRoot(); + rootBuilder = rootState.builder(); + nodeIterator = new DepthFirstNodeIterator(rootState); + lastPath = null; + lastCommit = System.currentTimeMillis(); + migratedNodes = 0; + } + + private void migrateNode(NodeBuilder rootBuilder, DepthFirstNodeIterator iterator) throws IOException { + ChildNodeEntry node = iterator.next(); + NodeState state = node.getNodeState(); + for (PropertyState property : state.getProperties()) { + PropertyState newProperty; + if (property.getType() == Type.BINARY) { + newProperty = migrateProperty(property); + } else if (property.getType() == Type.BINARIES) { + newProperty = migrateMultiProperty(property); + } else { + newProperty = null; + } + if (newProperty != null) { + NodeBuilder builder = iterator.getBuilder(rootBuilder); + if (builder.exists()) { + builder.setProperty(newProperty); + migratedNodes++; + log.debug("Migrated property {}/{}", lastPath, property.getName()); + } else { + log.warn("Can't migrate blobs for a non-existing node: {}", lastPath); + } + } + } + } + + private PropertyState migrateProperty(PropertyState propertyState) throws IOException { + Blob oldBlob = propertyState.getValue(Type.BINARY); + String blobId = oldBlob.getContentIdentity(); + if (blobStore.isMigrated(blobId)) { + return null; + } + + String newBlobId = blobStore.writeBlob(oldBlob.getNewStream()); + Blob newBlob = new BlobStoreBlob(blobStore, newBlobId); + PropertyBuilder<Blob> builder = new PropertyBuilder<Blob>(Type.BINARY); + builder.assignFrom(propertyState); + builder.setValue(newBlob); + return builder.getPropertyState(); + } + + private PropertyState migrateMultiProperty(PropertyState propertyState) throws IOException { + Iterable<Blob> oldBlobs = propertyState.getValue(Type.BINARIES); + List<Blob> newBlobs = new ArrayList<Blob>(); + PropertyBuilder<Blob> builder = new PropertyBuilder<Blob>(Type.BINARY); + builder.assignFrom(propertyState); + boolean blobUpdated = false; + for (Blob oldBlob : oldBlobs) { + String blobId = oldBlob.getContentIdentity(); + if (blobStore.isMigrated(blobId)) { + newBlobs.add(new BlobStoreBlob(blobStore, blobId)); + } else { + String newBlobId = blobStore.writeBlob(oldBlob.getNewStream()); + Blob newBlob = new BlobStoreBlob(blobStore, newBlobId); + newBlobs.add(newBlob); + blobUpdated = true; + } + } + if (blobUpdated) { + builder.setValues(newBlobs); + return builder.getPropertyState(); + } else { + return null; + } + } +} \ No newline at end of file Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIterator.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIterator.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIterator.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIterator.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.blob.migration; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Iterator; + +import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; + +import com.google.common.base.Joiner; +import com.google.common.collect.AbstractIterator; + +public class DepthFirstNodeIterator extends AbstractIterator<ChildNodeEntry> { + + private final Deque<Iterator<? extends ChildNodeEntry>> itQueue = new ArrayDeque<Iterator<? extends ChildNodeEntry>>(); + + private final Deque<String> nameQueue = new ArrayDeque<String>(); + + private final NodeState root; + + public DepthFirstNodeIterator(NodeState root) { + this.root = root; + reset(); + } + + public void reset() { + itQueue.clear(); + nameQueue.clear(); + itQueue.add(root.getChildNodeEntries().iterator()); + } + + @Override + protected ChildNodeEntry computeNext() { + if (itQueue.isEmpty()) { + return endOfData(); + } + if (itQueue.peekLast().hasNext()) { + ChildNodeEntry next = itQueue.peekLast().next(); + itQueue.add(next.getNodeState().getChildNodeEntries().iterator()); + nameQueue.add(next.getName()); + return next; + } else { + itQueue.pollLast(); + if (!nameQueue.isEmpty()) { + nameQueue.pollLast(); + } + return computeNext(); + } + } + + public NodeBuilder getBuilder(NodeBuilder rootBuilder) { + NodeBuilder builder = rootBuilder; + for (String name : nameQueue) { + builder = builder.getChildNode(name); + } + return builder; + } + + public String getPath() { + StringBuilder path = new StringBuilder("/"); + return Joiner.on('/').appendTo(path, nameQueue).toString(); + } + +} \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1703555&r1=1703554&r2=1703555&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java Thu Sep 17 10:24:07 2015 @@ -27,6 +27,7 @@ import static org.apache.jackrabbit.oak. import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.Builder.DEFAULT_DIFF_CACHE_PERCENTAGE; import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.Builder.DEFAULT_DOC_CHILDREN_CACHE_PERCENTAGE; import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.Builder.DEFAULT_NODE_CACHE_PERCENTAGE; +import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET; import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; import java.io.ByteArrayInputStream; @@ -70,6 +71,7 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.BlobStoreWrapper; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.apache.jackrabbit.oak.spi.state.RevisionGC; @@ -240,7 +242,7 @@ public class DocumentNodeStoreService { private WhiteboardExecutor executor; @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, - policy = ReferencePolicy.DYNAMIC) + policy = ReferencePolicy.DYNAMIC, target = ONLY_STANDALONE_TARGET) private volatile BlobStore blobStore; @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, @@ -389,8 +391,10 @@ public class DocumentNodeStoreService { mkBuilder.setPersistentCache(persistentCache); } + boolean wrappingCustomBlobStore = customBlobStore && blobStore instanceof BlobStoreWrapper; + //Set blobstore before setting the DB - if (customBlobStore) { + if (customBlobStore && !wrappingCustomBlobStore) { checkNotNull(blobStore, "Use of custom BlobStore enabled via [%s] but blobStore reference not " + "initialized", CUSTOM_BLOB_STORE); mkBuilder.setBlobStore(blobStore); @@ -431,6 +435,12 @@ public class DocumentNodeStoreService { log.info("Connected to database {}", mongoDB); } + //Set wrapping blob store after setting the DB + if (wrappingCustomBlobStore) { + ((BlobStoreWrapper) blobStore).setBlobStore(mkBuilder.getBlobStore()); + mkBuilder.setBlobStore(blobStore); + } + mkBuilder.setExecutor(executor); mk = mkBuilder.open(); Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1703555&r1=1703554&r2=1703555&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Thu Sep 17 10:24:07 2015 @@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin import static com.google.common.base.Preconditions.checkState; import static java.util.Collections.emptyMap; +import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET; import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean; import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toInteger; import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong; @@ -250,7 +251,7 @@ public class SegmentNodeStoreService ext private ComponentContext context; @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, - policy = ReferencePolicy.DYNAMIC) + policy = ReferencePolicy.DYNAMIC, target = ONLY_STANDALONE_TARGET) private volatile BlobStore blobStore; private ServiceRegistration storeRegistration; Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/AbstractMigratorTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/AbstractMigratorTest.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/AbstractMigratorTest.java (added) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/AbstractMigratorTest.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.blob.migration; + +import static junit.framework.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Random; + +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob; +import org.apache.jackrabbit.oak.plugins.memory.PropertyBuilder; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.split.DefaultSplitBlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.io.Files; + +public abstract class AbstractMigratorTest { + + private static final int LENGTH = 1024 * 16; + + private static final Random RANDOM = new Random(); + + private File repository; + + private NodeStore nodeStore; + + private BlobStore newBlobStore; + + private BlobMigrator migrator; + + @Before + public void setup() throws CommitFailedException, IllegalArgumentException, IOException { + repository = Files.createTempDir(); + BlobStore oldBlobStore = createOldBlobStore(repository); + NodeStore originalNodeStore = createNodeStore(oldBlobStore, repository); + createContent(originalNodeStore); + closeNodeStore(); + + newBlobStore = createNewBlobStore(repository); + DefaultSplitBlobStore splitBlobStore = new DefaultSplitBlobStore(repository.getPath(), oldBlobStore, newBlobStore); + nodeStore = createNodeStore(splitBlobStore, repository); + migrator = new BlobMigrator(splitBlobStore, nodeStore); + } + + protected abstract NodeStore createNodeStore(BlobStore blobStore, File repository) throws IOException; + + protected abstract void closeNodeStore(); + + protected abstract BlobStore createOldBlobStore(File repository); + + protected abstract BlobStore createNewBlobStore(File repository); + + @After + public void teardown() throws IOException { + closeNodeStore(); + FileUtils.deleteDirectory(repository); + } + + @Test + public void blobsExistsOnTheNewBlobStore() throws IOException, CommitFailedException { + migrator.migrate(); + NodeState root = nodeStore.getRoot(); + for (int i = 1; i <= 3; i++) { + assertPropertyOnTheNewStore(root.getChildNode("node" + i).getProperty("prop")); + } + } + + @Test + public void blobsCanBeReadAfterSwitchingBlobStore() throws IOException, CommitFailedException { + migrator.migrate(); + closeNodeStore(); + + nodeStore = createNodeStore(newBlobStore, repository); + NodeState root = nodeStore.getRoot(); + for (int i = 1; i <= 3; i++) { + assertPropertyExists(root.getChildNode("node" + i).getProperty("prop")); + } + } + + private void assertPropertyExists(PropertyState property) { + if (property.isArray()) { + for (Blob blob : property.getValue(Type.BINARIES)) { + assertEquals(LENGTH, blob.length()); + } + } else { + assertEquals(LENGTH, property.getValue(Type.BINARY).length()); + } + } + + private void assertPropertyOnTheNewStore(PropertyState property) throws IOException { + if (property.isArray()) { + for (Blob blob : property.getValue(Type.BINARIES)) { + assertPropertyOnTheNewStore(blob); + } + } else { + assertPropertyOnTheNewStore(property.getValue(Type.BINARY)); + } + } + + private void assertPropertyOnTheNewStore(Blob blob) throws IOException { + String blobId = blob.getContentIdentity(); + assertStreamEquals(blob.getNewStream(), newBlobStore.getInputStream(blobId)); + } + + private static void createContent(NodeStore nodeStore) throws IOException, CommitFailedException { + NodeBuilder rootBuilder = nodeStore.getRoot().builder(); + rootBuilder.child("node1").setProperty("prop", createBlob(nodeStore)); + rootBuilder.child("node2").setProperty("prop", createBlob(nodeStore)); + PropertyBuilder<Blob> builder = PropertyBuilder.array(Type.BINARY, "prop"); + builder.addValue(createBlob(nodeStore)); + builder.addValue(createBlob(nodeStore)); + builder.addValue(createBlob(nodeStore)); + rootBuilder.child("node3").setProperty(builder.getPropertyState()); + nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + private static Blob createBlob(NodeStore nodeStore) throws IOException { + byte[] buffer = new byte[LENGTH]; + RANDOM.nextBytes(buffer); + return new ArrayBasedBlob(buffer); + } + + private static void assertStreamEquals(InputStream expected, InputStream actual) throws IOException { + while (true) { + int expectedByte = expected.read(); + int actualByte = actual.read(); + assertEquals(expectedByte, actualByte); + if (expectedByte == -1) { + break; + } + } + } +} Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIteratorTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIteratorTest.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIteratorTest.java (added) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIteratorTest.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.blob.migration; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.Before; +import org.junit.Test; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +public class DepthFirstNodeIteratorTest { + + private NodeStore store; + + @Before + public void setup() throws CommitFailedException { + store = SegmentNodeStore.newSegmentNodeStore(new MemoryStore()).create(); + NodeBuilder rootBuilder = store.getRoot().builder(); + NodeBuilder countries = rootBuilder.child("countries"); + countries.child("uk").child("cities").child("london").child("districts").child("frognal"); + countries.child("germany"); + countries.child("france").child("cities").child("paris"); + store.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + // The order of the returned nodes is not defined, that's why we have to + // create 3 subtrees. + @Test + public void testIterate() { + Map<String, String[]> subtrees = new HashMap<String, String[]>(); + subtrees.put("uk", new String[] { "cities", "london", "districts", "frognal" }); + subtrees.put("germany", new String[] {}); + subtrees.put("france", new String[] { "cities", "paris" }); + + DepthFirstNodeIterator iterator = new DepthFirstNodeIterator(store.getRoot()); + assertTrue(iterator.hasNext()); + assertEquals("countries", iterator.next().getName()); + + for (int i = 0; i < 3; i++) { + assertTrue(iterator.hasNext()); + String country = iterator.next().getName(); + for (String node : subtrees.remove(country)) { + assertTrue(iterator.hasNext()); + assertEquals(node, iterator.next().getName()); + } + } + assertFalse(iterator.hasNext()); + assertTrue(subtrees.isEmpty()); + } + + @Test + public void testGetPath() { + Map<String, String> nameToPath = new HashMap<String, String>(); + nameToPath.put("countries", "/countries"); + nameToPath.put("uk", "/countries/uk"); + nameToPath.put("frognal", "/countries/uk/cities/london/districts/frognal"); + nameToPath.put("paris", "/countries/france/cities/paris"); + + DepthFirstNodeIterator iterator = new DepthFirstNodeIterator(store.getRoot()); + while (iterator.hasNext()) { + String expectedPath = nameToPath.remove(iterator.next().getName()); + if (expectedPath == null) { + continue; + } + assertEquals(expectedPath, iterator.getPath()); + } + assertTrue(nameToPath.isEmpty()); + } +} Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DocumentToExternalMigrationTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DocumentToExternalMigrationTest.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DocumentToExternalMigrationTest.java (added) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DocumentToExternalMigrationTest.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.blob.migration; + +import java.io.File; +import java.io.IOException; + +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; +import org.apache.jackrabbit.oak.plugins.document.MongoUtils; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobStore; +import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.FileBlobStore; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.Assume; +import org.junit.BeforeClass; + +public class DocumentToExternalMigrationTest extends AbstractMigratorTest { + + private DocumentNodeStore nodeStore;; + + @Override + protected NodeStore createNodeStore(BlobStore blobStore, File repository) throws IOException { + MongoConnection connection = MongoUtils.getConnection(); + Assume.assumeNotNull(connection); + DocumentMK.Builder builder = new DocumentMK.Builder(); + if (blobStore != null) { + builder.setBlobStore(blobStore); + } + builder.setMongoDB(connection.getDB()); + return nodeStore = builder.getNodeStore(); + } + + @Override + protected void closeNodeStore() { + if (nodeStore != null) { + nodeStore.dispose(); + nodeStore = null; + } + } + + @BeforeClass + public static void checkMongoDbAvailable() { + Assume.assumeNotNull(MongoUtils.getConnection()); + } + + @Override + protected BlobStore createOldBlobStore(File repository) { + MongoConnection connection = MongoUtils.getConnection(); + return new MongoBlobStore(connection.getDB()); + } + + @Override + protected BlobStore createNewBlobStore(File repository) { + return new FileBlobStore(repository.getPath() + "/new"); + } + +} Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ExternalToExternalMigrationTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ExternalToExternalMigrationTest.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ExternalToExternalMigrationTest.java (added) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ExternalToExternalMigrationTest.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.blob.migration; + +import java.io.File; +import java.io.IOException; + +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.FileBlobStore; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +public class ExternalToExternalMigrationTest extends AbstractMigratorTest { + + private SegmentStore segmentStore; + + @Override + protected NodeStore createNodeStore(BlobStore blobStore, File repository) throws IOException { + File segmentDir = new File(repository, "segmentstore"); + segmentStore = FileStore.newFileStore(segmentDir).withBlobStore(blobStore).create(); + return SegmentNodeStore.newSegmentNodeStore(segmentStore).create(); + } + + @Override + protected void closeNodeStore() { + segmentStore.close(); + } + + @Override + protected BlobStore createOldBlobStore(File repository) { + return new FileBlobStore(repository.getPath() + "/old"); + } + + @Override + protected BlobStore createNewBlobStore(File repository) { + return new FileBlobStore(repository.getPath() + "/new"); + } + +} Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/SegmentToExternalMigrationTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/SegmentToExternalMigrationTest.java?rev=1703555&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/SegmentToExternalMigrationTest.java (added) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/SegmentToExternalMigrationTest.java Thu Sep 17 10:24:07 2015 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.blob.migration; + +import java.io.File; +import java.io.IOException; + +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.FileBlobStore; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +public class SegmentToExternalMigrationTest extends AbstractMigratorTest { + + private SegmentStore segmentStore; + + @Override + protected NodeStore createNodeStore(BlobStore blobStore, File repository) throws IOException { + File segmentDir = new File(repository, "segmentstore"); + FileStore.Builder builder = FileStore.newFileStore(segmentDir); + if (blobStore != null) { + builder.withBlobStore(blobStore); + } + segmentStore = builder.create(); + return SegmentNodeStore.newSegmentNodeStore(segmentStore).create(); + } + + @Override + protected void closeNodeStore() { + segmentStore.close(); + } + + @Override + protected BlobStore createOldBlobStore(File repository) { + return null; + } + + @Override + protected BlobStore createNewBlobStore(File repository) { + return new FileBlobStore(repository.getPath() + "/new"); + } + +} \ No newline at end of file