Author: amitj
Date: Mon Oct 24 04:52:52 2016
New Revision: 1766345

URL: http://svn.apache.org/viewvc?rev=1766345&view=rev
Log:
OAK-4979: Caching sub-system implementation for DataStore

- Ignore cache size when building on UploadStagingCache start to support cache 
size changes and upgrades.

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1766345&r1=1766344&r2=1766345&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
 Mon Oct 24 04:52:52 2016
@@ -213,9 +213,12 @@ public class UploadStagingCache implemen
         int count = 0;
         while (iter.hasNext()) {
             File toBeSyncedFile = iter.next();
-            Optional<SettableFuture<Integer>> scheduled = 
put(toBeSyncedFile.getName(), toBeSyncedFile);
+            Optional<SettableFuture<Integer>> scheduled =
+                putOptionalDisregardingSize(toBeSyncedFile.getName(), 
toBeSyncedFile, true);
             if (scheduled.isPresent()) {
                 count++;
+            } else {
+                LOG.info("File [{}] not setup for upload", 
toBeSyncedFile.getName());
             }
         }
 
@@ -231,13 +234,29 @@ public class UploadStagingCache implemen
      * @return An Optional SettableFuture.
      */
     public Optional<SettableFuture<Integer>> put(String id, File input) {
+        return putOptionalDisregardingSize(id, input, false);
+    }
+
+    /**
+     * Puts the file into the staging cache if ignoreSize else if possible
+     * Returns an optional SettableFuture if staged for upload otherwise empty.
+     *
+     * @param id
+     * @param input
+     * @param ignoreSize
+     * @return
+     */
+    private Optional<SettableFuture<Integer>> 
putOptionalDisregardingSize(String id, File input,
+        boolean ignoreSize) {
         cacheStats.markRequest();
 
         long length = input.length();
         File uploadFile = DataStoreCacheUtils.getFile(id, uploadCacheSpace);
 
-        // if size permits and not upload complete or already scheduled for 
upload
-        if (currentSize.addAndGet(length) <= size
+        // if ignoreSize update internal size else size permits
+        // and not upload complete or already scheduled for upload
+        if (((ignoreSize && currentSize.addAndGet(length) >= 0)
+                || currentSize.addAndGet(length) <= size)
             && !attic.containsKey(id)
             && map.putIfAbsent(id, uploadFile) == null ) {
 

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1766345&r1=1766344&r2=1766345&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
 Mon Oct 24 04:52:52 2016
@@ -85,17 +85,17 @@ public class UploadStagingCacheTest exte
     @Before
     public void setup() throws IOException {
         root = folder.newFolder();
-        init();
+        init(0);
     }
 
-    private void init() {
+    private void init(int i) {
         // uploader
         uploader = new TestStagingUploader(root);
 
         // create executor
         taskLatch = new CountDownLatch(1);
         callbackLatch = new CountDownLatch(1);
-        afterExecuteLatch = new CountDownLatch(1);
+        afterExecuteLatch = new CountDownLatch(i);
         executor = new TestExecutor(1, taskLatch, callbackLatch, 
afterExecuteLatch);
 
         // stats
@@ -502,7 +502,7 @@ public class UploadStagingCacheTest exte
         closer.close();
 
         // Start again
-        init();
+        init(0);
         taskLatch.countDown();
         callbackLatch.countDown();
         afterExecuteLatch.await();
@@ -515,6 +515,50 @@ public class UploadStagingCacheTest exte
         assertCacheStats(stagingCache, 0, 0, 1, 1);
     }
 
+    /**
+     * Test build on start with more files available in terms of total size in 
the upload cache.
+     * @throws Exception
+     */
+    @Test
+    public void testBuildMoreThanCacheSize() throws Exception {
+        closer.close();
+
+        // create load greater than the cache size upgrades or cache size 
changes
+        File f1 = copyToFile(randomStream(1, 4 * 1024),
+            DataStoreCacheUtils.getFile(ID_PREFIX + "1", new File(root, 
"upload")));
+        File f2 = copyToFile(randomStream(2, 4 * 1024),
+            DataStoreCacheUtils.getFile(ID_PREFIX + "2", new File(root, 
"upload")));
+        File f3 = copyToFile(randomStream(3, 4 * 1024),
+            DataStoreCacheUtils.getFile(ID_PREFIX + "3", new File(root, 
"upload")));
+        // Directly add files to staging dir simulating an upgrade scenario
+
+        // Start staging cache
+        init(3);
+
+        List<ListenableFuture<Integer>> futures = put(folder);
+        // Not staged as already full
+        assertTrue(futures.isEmpty());
+
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        afterExecuteLatch.await();
+
+        waitFinish(futures);
+
+        assertNull(stagingCache.getIfPresent(ID_PREFIX + 1));
+        assertNull(stagingCache.getIfPresent(ID_PREFIX + 2));
+        assertNull(stagingCache.getIfPresent(ID_PREFIX + 3));
+
+        // Initial files should have been uploaded
+        assertTrue(Files.equal(copyToFile(randomStream(1, 4 * 1024), 
folder.newFile()),
+            uploader.read(ID_PREFIX + 1)));
+        assertTrue(Files.equal(copyToFile(randomStream(2, 4 * 1024), 
folder.newFile()),
+            uploader.read(ID_PREFIX + 2)));
+        assertTrue(Files.equal(copyToFile(randomStream(3, 4 * 1024), 
folder.newFile()),
+            uploader.read(ID_PREFIX + 3)));
+        assertCacheStats(stagingCache, 0, 0, 3, 4);
+    }
+
     /** -------------------- Helper methods 
----------------------------------------------------**/
 
     private static SettableFuture<File> 
copyStreamThread(ListeningExecutorService executor,


Reply via email to