Author: chetanm
Date: Tue May 19 10:23:31 2015
New Revision: 1680232

URL: http://svn.apache.org/r1680232
Log:
OAK-2882 - Support migration without access to DataStore

Added:
    
jackrabbit/oak/trunk/oak-upgrade/src/main/java/org/apache/jackrabbit/oak/upgrade/blob/
    
jackrabbit/oak/trunk/oak-upgrade/src/main/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStore.java
   (with props)
    
jackrabbit/oak/trunk/oak-upgrade/src/test/java/org/apache/jackrabbit/oak/upgrade/blob/
    
jackrabbit/oak/trunk/oak-upgrade/src/test/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStoreTest.java
   (with props)

Added: 
jackrabbit/oak/trunk/oak-upgrade/src/main/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-upgrade/src/main/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStore.java?rev=1680232&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-upgrade/src/main/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStore.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-upgrade/src/main/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStore.java
 Tue May 19 10:23:31 2015
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.upgrade.blob;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.jcr.RepositoryException;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.jackrabbit.core.data.AbstractDataRecord;
+import org.apache.jackrabbit.core.data.AbstractDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.commons.PropertiesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A DelegatingDataStore can avoid performing expensive file system access by 
making
+ * use of pre computed data related to files in DataStore.
+ * <p/>
+ * <p>During repository migration actual blob content is not accessed and 
instead
+ * only the blob length and blob references are accessed. DelegatingDataStore 
can be
+ * configured with a mapping file which would be used to determine the length 
of given
+ * blob reference.</p>
+ * <p/>
+ * Mapping file format
+ * <pre><![CDATA[
+ *     #< length >| < identifier >
+ *     4432|dd10bca036f3134352c63e534d4568a3d2ac2fdc
+ *     32167|dd10bca036f3134567c63e534d4568a3d2ac2fdc
+ * ]]></pre>
+ * <p/>
+ * The Configuration:
+ * <p/>
+ * <pre><![CDATA[
+ *  <DataStore 
class="org.apache.jackrabbit.oak.upgrade.blob.LengthCachingDataStore">
+ *      <param name="mappingFilePath" value="/path/to/mapping/file" />
+ *      <param name="delegateClass" 
value="org.apache.jackrabbit.core.data.FileDataStore" />
+ *  </DataStore>
+ * ]]></pre>
+ */
+public class LengthCachingDataStore extends AbstractDataStore {
+    private static final Logger log = 
LoggerFactory.getLogger(LengthCachingDataStore.class);
+    /**
+     * Separator used while writing length and identifier to the mapping file
+     */
+    public static final char SEPARATOR = '|';
+
+    //TODO For now using an in memory map. For very large repositories
+    //this might consume lots of memory. For such case we would need to switch 
to
+    //some off heap map
+    private Map<String, Long> existingMappings = Collections.emptyMap();
+    private Map<String, Long> newMappings = Maps.newConcurrentMap();
+
+    private String mappingFilePath = "datastore-list.txt";
+    private String delegateClass;
+    private String delegateConfigFilePath;
+    private DataStore delegate;
+    private boolean readOnly = true;
+    private File mappingFile;
+
+    @Override
+    public void init(String homeDir) throws RepositoryException {
+        initializeDelegate(homeDir);
+        initializeMappingData(homeDir);
+    }
+
+    @Override
+    public DataRecord getRecordIfStored(DataIdentifier dataIdentifier) throws 
DataStoreException {
+        if (existingMappings.containsKey(dataIdentifier.toString())) {
+            return new DelegateDataRecord(this, dataIdentifier, 
existingMappings);
+        } else if (newMappings.containsKey(dataIdentifier.toString())) {
+            return new DelegateDataRecord(this, dataIdentifier, newMappings);
+        }
+        DataRecord result = getDelegate().getRecordIfStored(dataIdentifier);
+        addNewMapping(result);
+        return result;
+    }
+
+    @Override
+    public DataRecord addRecord(InputStream inputStream) throws 
DataStoreException {
+        checkIfReadOnly();
+        DataRecord result = getDelegate().addRecord(inputStream);
+        addNewMapping(result);
+        return result;
+    }
+
+    @Override
+    public void updateModifiedDateOnAccess(long before) {
+        checkIfReadOnly();
+        getDelegate().updateModifiedDateOnAccess(before);
+
+    }
+
+    @Override
+    public int deleteAllOlderThan(long min) throws DataStoreException {
+        checkIfReadOnly();
+        return getDelegate().deleteAllOlderThan(min);
+    }
+
+    @Override
+    public Iterator<DataIdentifier> getAllIdentifiers() throws 
DataStoreException {
+        return getDelegate().getAllIdentifiers();
+    }
+
+    @Override
+    public int getMinRecordLength() {
+        return getDelegate().getMinRecordLength();
+    }
+
+    @Override
+    public void close() throws DataStoreException {
+        existingMappings.clear();
+        saveNewMappingsToFile();
+
+        if (delegate != null) {
+            delegate.close();
+        }
+    }
+
+    @Override
+    public void clearInUse() {
+        getDelegate().clearInUse();
+    }
+
+    File getMappingFile() {
+        return mappingFile;
+    }
+
+    //~---------------------------------< Setters >
+
+    public void setMappingFilePath(String mappingFilePath) {
+        this.mappingFilePath = mappingFilePath;
+    }
+
+    public void setReadOnly(boolean readOnly) {
+        this.readOnly = readOnly;
+    }
+
+    public void setDelegateClass(String delegateClass) {
+        this.delegateClass = delegateClass;
+    }
+
+    public void setDelegateConfigFilePath(String delegateConfigFilePath) {
+        this.delegateConfigFilePath = delegateConfigFilePath;
+    }
+
+    //~---------------------------------< DelegateDataRecord >
+
+    private class DelegateDataRecord extends AbstractDataRecord {
+        private final Map<String, Long> mapping;
+        private DataRecord delegateRecord;
+
+        public DelegateDataRecord(AbstractDataStore store, DataIdentifier 
identifier,
+                                  Map<String, Long> recordSizeMapping) {
+            super(store, identifier);
+            this.mapping = recordSizeMapping;
+        }
+
+        public long getLength() throws DataStoreException {
+            Long size = mapping.get(getIdentifier().toString());
+            if (size == null) {
+                log.info("No size mapping found for {}. Checking with 
delegate", getIdentifier());
+                return getDelegateRecord().getLength();
+            }
+            return size;
+        }
+
+        @Override
+        public InputStream getStream() throws DataStoreException {
+            return getDelegateRecord().getStream();
+        }
+
+        @Override
+        public long getLastModified() {
+            try {
+                return getDelegateRecord().getLastModified();
+            } catch (DataStoreException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private DataRecord getDelegateRecord() throws DataStoreException {
+            //Lazily load the delegateRecord to avoid FS access
+            if (delegateRecord == null) {
+                delegateRecord = getDelegate().getRecord(getIdentifier());
+            }
+            return delegateRecord;
+        }
+    }
+
+    //~---------------------------------< internal >
+
+    private void checkIfReadOnly() {
+        checkState(!readOnly, "Read only DataStore in use");
+    }
+
+    private DataStore getDelegate() {
+        return checkNotNull(delegate, "Delegate DataStore not configured");
+    }
+
+    private void addNewMapping(DataRecord dr) throws DataStoreException {
+        if (dr != null) {
+            newMappings.put(dr.getIdentifier().toString(), dr.getLength());
+        }
+    }
+
+    private void initializeMappingData(String homeDir) {
+        mappingFile = new File(FilenameUtils.concat(homeDir, mappingFilePath));
+        if (mappingFile.exists()) {
+            try {
+                existingMappings = loadMappingData(mappingFile);
+            } catch (FileNotFoundException e) {
+                throw new RuntimeException("Failed to read mapping data from " 
+ mappingFile, e);
+            }
+        } else {
+            log.info("Mapping file {} not found. Would create a new one.", 
mappingFile);
+        }
+    }
+
+    private void initializeDelegate(String homeDir) throws RepositoryException 
{
+        checkNotNull(delegateClass, "No delegate DataStore class defined via 
'delegateClass' property");
+        try {
+            delegate = (DataStore) 
getClass().getClassLoader().loadClass(delegateClass).newInstance();
+        } catch (InstantiationException e) {
+            throw new RepositoryException("Cannot load delegate class " + 
delegateClass, e);
+        } catch (IllegalAccessException e) {
+            throw new RepositoryException("Cannot load delegate class " + 
delegateClass, e);
+        } catch (ClassNotFoundException e) {
+            throw new RepositoryException("Cannot load delegate class " + 
delegateClass, e);
+        }
+
+        log.info("Using {} as the delegating DataStore", delegateClass);
+        if (delegateConfigFilePath != null) {
+            File configFile = new File(delegateConfigFilePath);
+            checkArgument(configFile.exists(), "Delegate DataStore config file 
%s does not exist", configFile.getAbsolutePath());
+
+            InputStream is = null;
+            try {
+                Properties props = new Properties();
+                is = Files.newInputStreamSupplier(configFile).getInput();
+                props.load(is);
+                PropertiesUtil.populate(delegate, propsToMap(props), false);
+                log.info("Configured the delegating DataStore via {}", 
configFile.getAbsolutePath());
+            } catch (IOException e) {
+                throw new RepositoryException("Error reading from config file 
" + configFile.getAbsolutePath(), e);
+            } finally {
+                IOUtils.closeQuietly(is);
+            }
+        }
+
+        delegate.init(homeDir);
+    }
+
+    private void saveNewMappingsToFile() {
+        if (!newMappings.isEmpty()) {
+            BufferedWriter w = null;
+            try {
+                w = new BufferedWriter(
+                        new OutputStreamWriter(new 
FileOutputStream(mappingFile, true), Charsets.UTF_8));
+                for (Map.Entry<String, Long> e : newMappings.entrySet()) {
+                    w.write(String.valueOf(e.getValue()));
+                    w.write(SEPARATOR);
+                    w.write(e.getKey());
+                    w.newLine();
+                }
+                log.info("Added {} new entries to the mapping file {}", 
newMappings.size(), mappingFile);
+                newMappings.clear();
+            } catch (IOException e) {
+                log.warn("Error occurred while writing mapping data to {}", 
mappingFile, e);
+            } finally {
+                IOUtils.closeQuietly(w);
+            }
+        }
+    }
+
+    private static Map<String, Long> loadMappingData(File mappingFile) throws 
FileNotFoundException {
+        Map<String, Long> mapping = new HashMap<String, Long>();
+        log.info("Reading mapping data from {}", 
mappingFile.getAbsolutePath());
+        LineIterator itr = new LineIterator(Files.newReader(mappingFile, 
Charsets.UTF_8));
+        try {
+            while (itr.hasNext()) {
+                String line = itr.nextLine();
+                int indexOfBar = line.indexOf(SEPARATOR);
+                checkState(indexOfBar > 0, "Malformed entry found [%s]", line);
+                String length = line.substring(0, indexOfBar);
+                String id = line.substring(indexOfBar + 1);
+                mapping.put(id.trim(), Long.valueOf(length));
+            }
+            log.info("Total {} mapping entries found", mapping.size());
+        } finally {
+            itr.close();
+        }
+        return mapping;
+    }
+
+    private static Map<String, Object> propsToMap(Properties p) {
+        Map<String, Object> result = Maps.newHashMap();
+        for (String keyName : p.stringPropertyNames()) {
+            result.put(keyName, p.getProperty(keyName));
+        }
+        return result;
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-upgrade/src/main/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-upgrade/src/test/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-upgrade/src/test/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStoreTest.java?rev=1680232&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-upgrade/src/test/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStoreTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-upgrade/src/test/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStoreTest.java
 Tue May 19 10:23:31 2015
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.upgrade.blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Properties;
+import java.util.Random;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.FileDataStore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class LengthCachingDataStoreTest {
+
+    @Rule
+    public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Test
+    public void mappingFileData() throws Exception {
+        File root = tempFolder.getRoot();
+        File mappingFile = new File(root, "mapping.txt");
+        String text = "1000|foo\n2000|bar";
+        Files.write(text, mappingFile, Charset.defaultCharset());
+
+        LengthCachingDataStore fds = new LengthCachingDataStore();
+        fds.setDelegateClass(FileDataStore.class.getName());
+        fds.setMappingFilePath(mappingFile.getAbsolutePath());
+        fds.init(tempFolder.getRoot().getAbsolutePath());
+
+        DataRecord dr = fds.getRecord(new DataIdentifier("foo"));
+        assertNotNull(dr);
+        assertEquals(1000, dr.getLength());
+
+        assertEquals(2000, fds.getRecord(new 
DataIdentifier("bar")).getLength());
+    }
+
+    @Test
+    public void configDelegate() throws Exception{
+        //1. Store the config in a file
+        Properties p = new Properties();
+        p.setProperty("minRecordLength", "4972");
+        File configFile = tempFolder.newFile();
+        FileOutputStream fos = new FileOutputStream(configFile);
+        p.store(fos, null);
+        fos.close();
+
+        //2. Configure the delegate and config file
+        LengthCachingDataStore fds = new LengthCachingDataStore();
+        fds.setDelegateClass(FileDataStore.class.getName());
+        fds.setDelegateConfigFilePath(configFile.getAbsolutePath());
+        fds.init(tempFolder.getRoot().getAbsolutePath());
+
+        assertEquals(4972, fds.getMinRecordLength());
+    }
+
+    @Test
+    public void delegateRecordTest() throws Exception{
+        FileDataStore ds = new FileDataStore();
+        byte[] data = bytes(ds.getMinRecordLength() + 10);
+        ds.init(tempFolder.getRoot().getAbsolutePath());
+        DataRecord dr = ds.addRecord(new ByteArrayInputStream(data));
+
+        File mappingFile = new File(tempFolder.getRoot(), "mapping.txt");
+        String text = String.format("%s|%s", data.length, 
dr.getIdentifier().toString());
+        Files.write(text, mappingFile, Charset.defaultCharset());
+
+        LengthCachingDataStore fds = new LengthCachingDataStore();
+        fds.setDelegateClass(FileDataStore.class.getName());
+        fds.setMappingFilePath(mappingFile.getAbsolutePath());
+        fds.init(tempFolder.getRoot().getAbsolutePath());
+
+        DataRecord dr2 = fds.getRecordIfStored(dr.getIdentifier());
+        assertEquals(dr, dr2);
+
+        assertEquals(dr.getLength(), dr2.getLength());
+        assertTrue(ByteStreams.equal(supplier(dr), supplier(dr2)));
+    }
+
+    @Test
+    public void writeBackNewEntries() throws Exception{
+        //1. Add some entries to FDS
+        FileDataStore fds1 = new FileDataStore();
+        File fds1Dir = tempFolder.newFolder();
+        int minSize = fds1.getMinRecordLength();
+        fds1.init(fds1Dir.getAbsolutePath());
+        DataRecord dr1 = fds1.addRecord(byteStream(minSize + 10));
+        DataRecord dr2 = fds1.addRecord(byteStream(minSize + 100));
+
+        //2. Try reading them so as to populate the new mappings
+        LengthCachingDataStore fds2 = new LengthCachingDataStore();
+        fds2.setDelegateClass(FileDataStore.class.getName());
+        fds2.init(fds1Dir.getAbsolutePath());
+
+        fds2.getRecord(new DataIdentifier(dr1.getIdentifier().toString()));
+        fds2.getRecord(new DataIdentifier(dr2.getIdentifier().toString()));
+
+        File mappingFile = fds2.getMappingFile();
+
+        //3. Get the mappings pushed to file
+        fds2.close();
+
+        //4. Open a new FDS pointing to new directory. Read should still work 
fine
+        //as they would be served by the mapping data
+        LengthCachingDataStore fds3 = new LengthCachingDataStore();
+        fds3.setDelegateClass(FileDataStore.class.getName());
+        fds3.setMappingFilePath(mappingFile.getAbsolutePath());
+        fds3.init(tempFolder.newFolder().getAbsolutePath());
+        fds3.setReadOnly(false);
+
+        assertEquals(dr1.getLength(), 
fds3.getRecord(dr1.getIdentifier()).getLength());
+        assertEquals(dr2.getLength(), 
fds3.getRecord(dr2.getIdentifier()).getLength());
+
+        DataRecord dr3 = fds3.addRecord(byteStream(minSize + 200));
+        //5. Close again so see if update of existing file works
+        fds3.close();
+
+        LengthCachingDataStore fds4 = new LengthCachingDataStore();
+        fds4.setDelegateClass(FileDataStore.class.getName());
+        fds4.setMappingFilePath(mappingFile.getAbsolutePath());
+        fds4.init(tempFolder.newFolder().getAbsolutePath());
+
+        assertEquals(dr3.getLength(), 
fds4.getRecord(dr3.getIdentifier()).getLength());
+        assertEquals(dr2.getLength(), 
fds4.getRecord(dr2.getIdentifier()).getLength());
+
+
+    }
+
+    private InputStream byteStream(int size) {
+        return new ByteArrayInputStream(bytes(size));
+    }
+
+    private byte[] bytes(int size) {
+        byte[] data = new byte[size];
+        new Random().nextBytes(data);
+        return data;
+    }
+
+    private static InputSupplier<InputStream> supplier(final DataRecord dr) {
+        return new InputSupplier<InputStream>() {
+            @Override
+            public InputStream getInput() throws IOException {
+                try {
+                    return dr.getStream();
+                } catch (DataStoreException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-upgrade/src/test/java/org/apache/jackrabbit/oak/upgrade/blob/LengthCachingDataStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to