Author: amitj Date: Thu Dec 1 08:42:02 2016 New Revision: 1772153 URL: http://svn.apache.org/viewvc?rev=1772153&view=rev Log: OAK-5201: Support upgrade of DataStore cache
- Upgrade utility class exposes methods to move older cached files to newer cache areas. - UploadStagingCache and FIleCache now upgrade the previous version of cache files if available to their respective caches. Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java (with props) jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java (with props) Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1772153&r1=1772152&r2=1772153&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java Thu Dec 1 08:42:02 2016 @@ -159,8 +159,10 @@ public abstract class AbstractSharedCach this.backend = createBackend(); backend.init(); + String home = FilenameUtils.normalizeNoEndSeparator(new File(homeDir).getAbsolutePath()); this.cache = - new CompositeDataStoreCache(path, cacheSize, stagingSplitPercentage, uploadThreads, + new CompositeDataStoreCache(path, new File(home), cacheSize, stagingSplitPercentage, + uploadThreads, new CacheLoader<String, InputStream>() { @Override public InputStream load(String key) throws Exception { InputStream is = null; Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java?rev=1772153&r1=1772152&r2=1772153&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java Thu Dec 1 08:42:02 2016 @@ -59,7 +59,7 @@ public class CompositeDataStoreCache ext */ private final File directory; - public CompositeDataStoreCache(String path, long size, int uploadSplitPercentage, + public CompositeDataStoreCache(String path, File home, long size, int uploadSplitPercentage, int uploadThreads, CacheLoader<String, InputStream> loader, final StagingUploader uploader, StatisticsProvider statsProvider, ListeningExecutorService executor, ScheduledExecutorService scheduledExecutor /* purge scheduled executor */, @@ -73,11 +73,11 @@ public class CompositeDataStoreCache ext long uploadSize = (size * uploadSplitPercentage) / 100; - this.downloadCache = FileCache.build((size - uploadSize), directory, loader, null); - this.stagingCache = UploadStagingCache - .build(directory, uploadThreads, uploadSize, uploader, downloadCache, statsProvider, + .build(directory, home, uploadThreads, uploadSize, uploader, null, statsProvider, executor, scheduledExecutor, purgeInterval, stagingRetryInterval); + this.downloadCache = FileCache.build((size - uploadSize), directory, loader, null); + stagingCache.setDownloadCache(downloadCache); } @Nullable Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java?rev=1772153&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java Thu Dec 1 08:42:02 2016 @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUtils.recursiveDelete; + +/** + * Utility methods to upgrade Old DataStore cache + * {@link org.apache.jackrabbit.core.data.CachingDataStore}. + */ +public class DataStoreCacheUpgradeUtils { + private static final Logger LOG = LoggerFactory.getLogger(DataStoreCacheUpgradeUtils.class); + + static final String UPLOAD_MAP = "async-pending-uploads.ser"; + static final String UPLOAD_STAGING_DIR = UploadStagingCache.UPLOAD_STAGING_DIR; + static final String DOWNLOAD_DIR = FileCache.DOWNLOAD_DIR; + + private static Map<String, Long> deSerializeUploadMap(File homeDir) { + Map<String, Long> asyncUploadMap = Maps.newHashMap(); + + File asyncUploadMapFile = new File(homeDir, UPLOAD_MAP); + if (asyncUploadMapFile.exists()) { + String path = asyncUploadMapFile.getAbsolutePath(); + + InputStream fis = null; + try { + fis = new BufferedInputStream(new FileInputStream(path)); + ObjectInput input = new ObjectInputStream(fis); + asyncUploadMap = (Map<String, Long>) input.readObject(); + } catch (Exception e) { + LOG.warn("Error in reading pending uploads map [{}] from location [{}]", UPLOAD_MAP, + homeDir); + } finally { + IOUtils.closeQuietly(fis); + } + LOG.debug("AsyncUploadMap read [{}]", asyncUploadMap); + } + return asyncUploadMap; + } + + private static void deleteSerializedUploadMap(File homeDir) { + File uploadMap = new File(homeDir, UPLOAD_MAP); + FileUtils.deleteQuietly(uploadMap); + LOG.info("Deleted asyncUploadMap [{}] from [{}]", UPLOAD_MAP, homeDir); + } + + private static boolean notInExceptions(File file, List<String> exceptions) { + String parent = file.getParent(); + for (String exception : exceptions) { + if (parent.contains(exception)) { + return true; + } + } + return false; + } + + public static void moveDownloadCache(final File path) { + final List<String> exceptions = ImmutableList.of("tmp", UPLOAD_STAGING_DIR, DOWNLOAD_DIR); + File newDownloadDir = new File(path, DOWNLOAD_DIR); + + Iterator<File> iterator = + Files.fileTreeTraverser().postOrderTraversal(path) + .filter(new Predicate<File>() { + @Override public boolean apply(File input) { + return input.isFile() + && !input.getParentFile().equals(path) + && !notInExceptions(input, exceptions); + } + }).iterator(); + + while (iterator.hasNext()) { + File download = iterator.next(); + LOG.trace("Download cache file absolute pre-upgrade path " + download); + + File newDownload = new File(newDownloadDir, + download.getAbsolutePath().substring(path.getAbsolutePath().length())); + newDownload.getParentFile().mkdirs(); + LOG.trace("Downloaded cache file absolute post-upgrade path " + newDownload); + + try { + FileUtils.moveFile(download, newDownload); + LOG.info("Download cache file [{}] moved to [{}]", download, newDownload); + recursiveDelete(download, path); + } catch (Exception e) { + LOG.warn("Unable to move download cache file [{}] to [{}]", download, newDownload); + } + } + } + + public static void movePendingUploadsToStaging(File homeDir, File path, boolean deleteMap) { + File newUploadDir = new File(path, UPLOAD_STAGING_DIR); + + Map<String, Long> pendingUploads = deSerializeUploadMap(homeDir); + Iterator<String> pendingFileIter = pendingUploads.keySet().iterator(); + + while(pendingFileIter.hasNext()) { + String file = pendingFileIter.next(); + File upload = new File(path, file); + LOG.trace("Pending upload absolute path " + upload.getAbsolutePath()); + + File newUpload = new File(newUploadDir, file); + LOG.trace("Pending upload upgrade absolute path " + newUpload.getAbsolutePath()); + + newUpload.getParentFile().mkdirs(); + + if (upload.exists()) { + LOG.trace(upload + " File exists"); + try { + FileUtils.moveFile(upload, newUpload); + LOG.info("Pending upload file [{}] moved to [{}]", upload, newUpload); + recursiveDelete(upload, path); + } catch (Exception e) { + LOG.warn("Unable to move pending upload file [{}] to [{}]", upload, newUpload); + } + } else { + LOG.warn("File [{}] does not exist", upload); + } + } + + if (deleteMap) { + deleteSerializedUploadMap(homeDir); + } + } + + public static void upgrade(File homeDir, File path, boolean moveCache, boolean deleteMap) { + movePendingUploadsToStaging(homeDir, path, deleteMap); + + if (moveCache) { + moveDownloadCache(path); + } + } +} Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java?rev=1772153&r1=1772152&r2=1772153&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java Thu Dec 1 08:42:02 2016 @@ -62,6 +62,13 @@ public class FileCache extends AbstractC */ private static final Logger LOG = LoggerFactory.getLogger(FileCache.class); + protected static final String DOWNLOAD_DIR = "download"; + + /** + * Parent of the cache root directory + */ + private File parent; + /** * The cacheRoot directory of the cache. */ @@ -91,7 +98,8 @@ public class FileCache extends AbstractC private FileCache(long maxSize /* bytes */, File root, final CacheLoader<String, InputStream> loader, @Nullable final ExecutorService executor) { - this.cacheRoot = new File(root, "download"); + this.parent = root; + this.cacheRoot = new File(root, DOWNLOAD_DIR); /* convert to 4 KB block */ long size = Math.round(maxSize / (1024L * 4)); @@ -286,6 +294,10 @@ public class FileCache extends AbstractC */ private int build() { int count = 0; + + // Move older generation cache downloaded files to the new folder + DataStoreCacheUpgradeUtils.moveDownloadCache(parent); + // Iterate over all files in the cache folder Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(cacheRoot) .filter(new Predicate<File>() { Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1772153&r1=1772152&r2=1772153&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Thu Dec 1 08:42:02 2016 @@ -63,6 +63,8 @@ import org.slf4j.LoggerFactory; import static com.google.common.base.Objects.toStringHelper; import static java.lang.String.format; import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; +import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils + .movePendingUploadsToStaging; /** * Cache for staging async uploads. This serves as a temporary cache for serving local @@ -78,6 +80,8 @@ public class UploadStagingCache implemen */ private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCache.class); + protected static final String UPLOAD_STAGING_DIR = "upload"; + //Rough estimate of the in-memory key, value pair private final Weigher<String, File> memWeigher = new Weigher<String, File>() { @Override public int weigh(String key, File value) { @@ -141,7 +145,7 @@ public class UploadStagingCache implemen */ private LinkedBlockingQueue<String> retryQueue; - private UploadStagingCache(File dir, int uploadThreads, long size /* bytes */, + private UploadStagingCache(File dir, File home, int uploadThreads, long size /* bytes */, StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor, @Nullable ScheduledExecutorService scheduledExecutor, @@ -169,7 +173,7 @@ public class UploadStagingCache implemen this.cacheStats = new StagingCacheStats(this, statisticsProvider, size); this.downloadCache = cache; - build(); + build(home, dir); this.scheduledExecutor .scheduleAtFixedRate(new RemoveJob(), purgeInterval, purgeInterval, TimeUnit.SECONDS); @@ -180,13 +184,13 @@ public class UploadStagingCache implemen private UploadStagingCache() { } - public static UploadStagingCache build(File dir, int uploadThreads, long size + public static UploadStagingCache build(File dir, File home, int uploadThreads, long size /* bytes */, StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor, @Nullable ScheduledExecutorService scheduledExecutor, int purgeInterval /* secs */, int retryInterval /* secs */) { if (size > 0) { - return new UploadStagingCache(dir, uploadThreads, size, uploader, cache, + return new UploadStagingCache(dir, home, uploadThreads, size, uploader, cache, statisticsProvider, executor, scheduledExecutor, purgeInterval, retryInterval); } return new UploadStagingCache() { @@ -216,9 +220,13 @@ public class UploadStagingCache implemen /** * Retrieves all the files staged in the staging area and schedules them for uploads. + * @param home the home of the repo + * @param rootPath the parent of the cache */ - private void build() { + private void build(File home, File rootPath) { LOG.info("Scheduling pending uploads"); + // Move any older cache pending uploads + movePendingUploadsToStaging(home, rootPath, true); Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(uploadCacheSpace) .filter(new Predicate<File>() { @@ -487,6 +495,10 @@ public class UploadStagingCache implemen new ExecutorCloser(scheduledExecutor).close(); } + protected void setDownloadCache(@Nullable FileCache downloadCache) { + this.downloadCache = downloadCache; + } + /** * Class which calls remove on all */ Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java Thu Dec 1 08:42:02 2016 @@ -18,12 +18,17 @@ */ package org.apache.jackrabbit.oak.plugins.blob; +import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.OutputStream; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -47,6 +52,7 @@ import com.google.common.util.concurrent import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.core.data.DataIdentifier; import org.apache.jackrabbit.core.data.DataRecord; import org.apache.jackrabbit.core.data.DataStoreException; @@ -334,7 +340,7 @@ public class AbstractDataStoreCacheTest return new ByteArrayInputStream(data); } - private static File getFile(String id, File root) { + protected static File getFile(String id, File root) { File file = root; file = new File(file, id.substring(0, 2)); file = new File(file, id.substring(2, 4)); @@ -345,4 +351,17 @@ public class AbstractDataStoreCacheTest FileUtils.copyInputStreamToFile(stream, file); return file; } + + static void serializeMap(Map<String,Long> pendingupload, File file) throws IOException { + OutputStream fos = new FileOutputStream(file); + OutputStream buffer = new BufferedOutputStream(fos); + ObjectOutput output = new ObjectOutputStream(buffer); + try { + output.writeObject(pendingupload); + output.flush(); + } finally { + output.close(); + IOUtils.closeQuietly(buffer); + } + } } Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java Thu Dec 1 08:42:02 2016 @@ -81,8 +81,8 @@ public class CompositeDataStoreCacheTest @Before public void setup() throws IOException { root = folder.newFolder(); - loader = new TestCacheLoader<String, InputStream>(root); - uploader = new TestStagingUploader(root); + loader = new TestCacheLoader<String, InputStream>(folder.newFolder()); + uploader = new TestStagingUploader(folder.newFolder()); // create executor taskLatch = new CountDownLatch(1); @@ -99,9 +99,10 @@ public class CompositeDataStoreCacheTest closer.register(new ExecutorCloser(scheduledExecutor, 500, TimeUnit.MILLISECONDS)); //cache instance - cache = new CompositeDataStoreCache(root.getAbsolutePath(), - 80 * 1024 /* bytes */, 10, 1/*threads*/, loader, - uploader, statsProvider, executor, scheduledExecutor, 3000, 6000); + cache = + new CompositeDataStoreCache(root.getAbsolutePath(), null, 80 * 1024 /* bytes */, 10, + 1/*threads*/, + loader, uploader, statsProvider, executor, scheduledExecutor, 3000, 6000); closer.register(cache); } @@ -112,8 +113,8 @@ public class CompositeDataStoreCacheTest @Test public void zeroCache() throws IOException { - cache = new CompositeDataStoreCache(root.getAbsolutePath(), - 0 /* bytes */, 10, 1/*threads*/, loader, + cache = new CompositeDataStoreCache(root.getAbsolutePath(), null, 0 /* bytes + */, 10, 1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000, 6000); closer.register(cache); @@ -188,8 +189,8 @@ public class CompositeDataStoreCacheTest */ @Test public void addCacheFull() throws IOException { - cache = new CompositeDataStoreCache(root.getAbsolutePath(), - 40 * 1024 /* bytes */, 10 /* staging % */, + cache = new CompositeDataStoreCache(root.getAbsolutePath(), null, 40 * 1024 /* + bytes */, 10 /* staging % */, 1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000, 6000); closer.register(cache); @@ -224,8 +225,8 @@ public class CompositeDataStoreCacheTest callbackLatch = new CountDownLatch(2); afterExecuteLatch = new CountDownLatch(2); executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch); - cache = new CompositeDataStoreCache(root.getAbsolutePath(), - 80 * 1024 /* bytes */, 10 /* staging % */, + cache = new CompositeDataStoreCache(root.getAbsolutePath(), null, 80 * 1024 /* + bytes */, 10 /* staging % */, 1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000, 6000); closer.register(cache); @@ -383,9 +384,11 @@ public class CompositeDataStoreCacheTest File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); loader.write(ID_PREFIX + 0, f); + assertTrue(f.exists()); File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); loader.write(ID_PREFIX + 1, f2); + assertTrue(f2.exists()); CountDownLatch thread1Start = new CountDownLatch(1); SettableFuture<File> future1 = @@ -433,6 +436,7 @@ public class CompositeDataStoreCacheTest // Add file to backend File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); loader.write(ID_PREFIX + 1, f2); + assertTrue(f2.exists()); // stage for upload File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java?rev=1772153&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java (added) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java Thu Dec 1 08:42:02 2016 @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils.DOWNLOAD_DIR; +import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils.UPLOAD_STAGING_DIR; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link DataStoreCacheUpgradeUtils} + */ +public class DataStoreCacheUpgradeUtilsTest extends AbstractDataStoreCacheTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File("target")); + + File homeDir; + File path; + File pendingUploads; + + @Before + public void setup() throws IOException { + homeDir = folder.getRoot(); + path = folder.newFolder("repository", "datastore"); + pendingUploads = new File(homeDir + "/" + DataStoreCacheUpgradeUtils.UPLOAD_MAP); + } + + @Test + public void upgradeNoDownloads() throws Exception { + setupUploads("1111110", "2222220", "3333330"); + + DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, true); + + assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330"); + assertFalse(pendingUploads.exists()); + } + + @Test + public void upgradeNoDownloadsDelPendingFileFalse() throws Exception { + setupUploads("1111110", "2222220", "3333330"); + + DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, false); + + assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330"); + assertTrue(pendingUploads.exists()); + } + + @Test + public void upgradeMoveDownloadsFalse() throws Exception { + setupUploads("1111110", "2222220", "3333330"); + setupDownloads("4444440", "5555550", "6666660"); + + DataStoreCacheUpgradeUtils.upgrade(homeDir, path, false, true); + + assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330"); + assertFalse(pendingUploads.exists()); + assertFilesNoMove(DOWNLOAD_DIR, "4444440", "5555550", "6666660"); + } + + @Test + public void upgradeNoUploads() throws Exception { + setupDownloads("1111110", "2222220", "3333330"); + + DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, true); + + assertFiles(DOWNLOAD_DIR, "1111110", "2222220", "3333330"); + } + + @Test + public void upgradeNoUploadMap() throws Exception { + setupUploads("1111110", "2222220", "3333330"); + FileUtils.deleteQuietly(pendingUploads); + + DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, true); + + assertFiles(DOWNLOAD_DIR, "1111110", "2222220", "3333330"); + assertFalse(pendingUploads.exists()); + } + + @Test + public void upgrade() throws Exception { + upgrade(true); + } + + @Test + public void upgradeDelPendingFileFalse() throws Exception { + upgrade(false); + } + + private void upgrade(boolean pendingFileDelete) throws Exception { + setupUploads("1111110", "2222220", "3333330"); + setupDownloads("4444440", "5555550", "6666660"); + + DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, pendingFileDelete); + + assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330"); + if (pendingFileDelete) { + assertFalse(pendingUploads.exists()); + } else { + assertTrue(pendingUploads.exists()); + } + assertFiles(DOWNLOAD_DIR, "4444440", "5555550", "6666660"); + } + + private void setupUploads(String... ids) throws IOException { + Map<String, Long> pendingMap = Maps.newHashMap(); + + for (String id : ids) { + File f1 = copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), getFile(id, path)); + pendingMap.put(getFileName(id), System.currentTimeMillis()); + } + serializeMap(pendingMap, pendingUploads); + } + + private void setupDownloads(String... ids) throws IOException { + for (String id : ids) { + copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), getFile(id, path)); + } + } + + private void assertFiles(String moveFolder, String... ids) throws Exception { + for (String id : ids) { + File file = getFile(id, path); + assertFalse(file.exists()); + file = getFile(id, new File(path, moveFolder)); + assertTrue(file.exists()); + assertTrue(Files.equal(file, + copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), folder.newFile()))); + } + } + + private void assertFilesNoMove(String moveFolder, String... ids) throws Exception { + for (String id : ids) { + File file = getFile(id, path); + assertTrue(file.exists()); + assertTrue(Files.equal(file, + copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), folder.newFile()))); + file = getFile(id, new File(path, moveFolder)); + assertFalse(file.exists()); + } + } + + private static String getFileName(String name) { + return name.substring(0, 2) + "/" + name.substring(2, 4) + "/" + name; + } +} Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java Thu Dec 1 08:42:02 2016 @@ -64,26 +64,24 @@ public class FileCacheTest extends Abstr @Rule public TestName testName = new TestName(); + CountDownLatch afterExecuteLatch; @Before public void setup() throws Exception { root = folder.newFolder(); closer = Closer.create(); - loader = new TestCacheLoader<String, InputStream>(root); + loader = new TestCacheLoader<String, InputStream>(folder.newFolder()); - if (!testName.getMethodName().equals("rebuild")) { - - CountDownLatch beforeLatch = new CountDownLatch(1); - CountDownLatch afterLatch = new CountDownLatch(1); - CountDownLatch afterExecuteLatch = new CountDownLatch(1); + CountDownLatch beforeLatch = new CountDownLatch(1); + CountDownLatch afterLatch = new CountDownLatch(1); + afterExecuteLatch = new CountDownLatch(1); - TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch); - beforeLatch.countDown(); - afterLatch.countDown(); - cache = FileCache.build(4 * 1024/* KB */, root, loader, executor); - Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get(); + TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch); + beforeLatch.countDown(); + afterLatch.countDown(); + cache = FileCache.build(4 * 1024/* KB */, root, loader, executor); + Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get(); - closer.register(cache); - } + closer.register(cache); } @After @@ -354,30 +352,53 @@ public class FileCacheTest extends Abstr @Test public void rebuild() throws Exception { LOG.info("Started rebuild"); + afterExecuteLatch.await(); + LOG.info("Cache built"); + + File f = createFile(0, loader, cache, folder); + assertCache(0, cache, f); + cache.close(); - root = folder.newFolder(); CountDownLatch beforeLatch = new CountDownLatch(1); CountDownLatch afterLatch = new CountDownLatch(1); - CountDownLatch afterExecuteLatch = new CountDownLatch(1); + afterExecuteLatch = new CountDownLatch(1); TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch); beforeLatch.countDown(); afterLatch.countDown(); cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor); - + closer.register(cache); afterExecuteLatch.await(); Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get(); - LOG.info("Cache built"); + LOG.info("Cache rebuilt"); + + assertCacheIfPresent(0, cache, f); + assertCacheStats(cache, 1, 4 * 1024, 0, 0); + + LOG.info("Finished rebuild"); + } + + /** + * Trigger upgrade cache on start. + * @throws Exception + */ + @Test + public void upgrade() throws Exception { + LOG.info("Started upgrade"); + + afterExecuteLatch.await(); File f = createFile(0, loader, cache, folder); assertCache(0, cache, f); cache.close(); - beforeLatch = new CountDownLatch(1); - afterLatch = new CountDownLatch(1); + copyToFile(randomStream(1, 4 * 1024), getFile(ID_PREFIX + 1, root)); + + CountDownLatch beforeLatch = new CountDownLatch(1); + CountDownLatch afterLatch = new CountDownLatch(1); afterExecuteLatch = new CountDownLatch(1); - executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch); + TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch); beforeLatch.countDown(); afterLatch.countDown(); cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor); @@ -387,9 +408,12 @@ public class FileCacheTest extends Abstr LOG.info("Cache rebuilt"); assertCacheIfPresent(0, cache, f); - assertCacheStats(cache, 1, 4 * 1024, 0, 0); + assertCacheIfPresent(1, cache, copyToFile(randomStream(1, 4 * 1024), folder.newFile())); + assertFalse(getFile(ID_PREFIX + 1, root).exists()); - LOG.info("Finished rebuild"); + assertCacheStats(cache, 2, 8 * 1024, 0, 0); + + LOG.info("Finished upgrade"); } /**------------------------------ Helper methods --------------------------------------------**/ Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Thu Dec 1 08:42:02 2016 @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -34,6 +35,7 @@ import java.util.concurrent.atomic.Atomi import com.google.common.base.Optional; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.io.Closer; import com.google.common.io.Files; import com.google.common.util.concurrent.Futures; @@ -86,11 +88,11 @@ public class UploadStagingCacheTest exte init(0); } - private void init(int i) { - init(i, new TestStagingUploader(root)); + private void init(int i) throws IOException { + init(i, new TestStagingUploader(folder.newFolder()), null); } - private void init(int i, TestStagingUploader testUploader) { + private void init(int i, TestStagingUploader testUploader, File homeDir) { // uploader uploader = testUploader; @@ -110,7 +112,7 @@ public class UploadStagingCacheTest exte //cache instance stagingCache = - UploadStagingCache.build(root, 1/*threads*/, 8 * 1024 /* bytes */, + UploadStagingCache.build(root, homeDir, 1/*threads*/, 8 * 1024 /* bytes */, uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000); closer.register(stagingCache); } @@ -123,7 +125,7 @@ public class UploadStagingCacheTest exte @Test public void testZeroCache() throws IOException { stagingCache = - UploadStagingCache.build(root, 1/*threads*/, 0 /* bytes */, + UploadStagingCache.build(root, null, 1/*threads*/, 0 /* bytes */, uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000); closer.register(stagingCache); @@ -160,7 +162,7 @@ public class UploadStagingCacheTest exte @Test public void testAddUploadException() throws Exception { final AtomicInteger count = new AtomicInteger(0); - TestStagingUploader secondTimeUploader = new TestStagingUploader(root) { + TestStagingUploader secondTimeUploader = new TestStagingUploader(folder.newFolder()) { @Override public void write(String id, File f) throws DataStoreException { if (count.get() == 0) { @@ -171,7 +173,7 @@ public class UploadStagingCacheTest exte }; // initialize staging cache using the mocked uploader - init(2, secondTimeUploader); + init(2, secondTimeUploader, null); // Add load List<ListenableFuture<Integer>> futures = put(folder); @@ -290,7 +292,7 @@ public class UploadStagingCacheTest exte public void testCacheFullAdd() throws Exception { // initialize cache to have restricted size stagingCache = - UploadStagingCache.build(root, 1/*threads*/, 4 * 1024 /* bytes */, + UploadStagingCache.build(root, null, 1/*threads*/, 4 * 1024 /* bytes */, uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000); closer.register(stagingCache); @@ -578,8 +580,58 @@ public class UploadStagingCacheTest exte assertCacheStats(stagingCache, 0, 0, 3, 4); } + /** + * Test upgrade with build on start. + * @throws Exception + */ + @Test + public void testUpgrade() throws Exception { + // Add load + List<ListenableFuture<Integer>> futures = put(folder); + // Close before uploading finished + closer.close(); + + // Create pre-upgrade load + File home = folder.newFolder(); + File pendingUploadsFile = new File(home, DataStoreCacheUpgradeUtils.UPLOAD_MAP); + createUpgradeLoad(home, pendingUploadsFile); + + // Start again + init(1, new TestStagingUploader(folder.newFolder()), home); + + taskLatch.countDown(); + callbackLatch.countDown(); + afterExecuteLatch.await(); + + waitFinish(futures); + + assertNull(stagingCache.getIfPresent(ID_PREFIX + 0)); + assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()), + uploader.read(ID_PREFIX + 0))); + + assertUpgrade(pendingUploadsFile); + + assertCacheStats(stagingCache, 0, 0, 2, 2); + } + /** -------------------- Helper methods ----------------------------------------------------**/ + private void createUpgradeLoad(File home, File pendingUploadFile) throws IOException { + String id = ID_PREFIX + 1; + copyToFile(randomStream(1, 4 * 1024), getFile(id, root)); + String name = id.substring(0, 2) + "/" + id.substring(2, 4) + "/" + id; + Map<String, Long> pendingUploads = Maps.newHashMap(); + pendingUploads.put(name, System.currentTimeMillis()); + serializeMap(pendingUploads, pendingUploadFile); + } + + private void assertUpgrade(File pendingUploadFile) throws IOException { + assertNull(stagingCache.getIfPresent(ID_PREFIX + 1)); + assertTrue(Files.equal(copyToFile(randomStream(1, 4 * 1024), folder.newFile()), + uploader.read(ID_PREFIX + 1))); + assertFalse(pendingUploadFile.exists()); + } + private static SettableFuture<File> copyStreamThread(ListeningExecutorService executor, final InputStream fStream, final File temp, final CountDownLatch start) { final SettableFuture<File> future = SettableFuture.create();