Author: amitj
Date: Mon Oct 24 04:52:52 2016
New Revision: 1766345
URL: http://svn.apache.org/viewvc?rev=1766345&view=rev
Log:
OAK-4979: Caching sub-system implementation for DataStore
- Ignore cache size when building on UploadStagingCache start to support cache
size changes and upgrades.
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1766345&r1=1766344&r2=1766345&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
Mon Oct 24 04:52:52 2016
@@ -213,9 +213,12 @@ public class UploadStagingCache implemen
int count = 0;
while (iter.hasNext()) {
File toBeSyncedFile = iter.next();
- Optional<SettableFuture<Integer>> scheduled =
put(toBeSyncedFile.getName(), toBeSyncedFile);
+ Optional<SettableFuture<Integer>> scheduled =
+ putOptionalDisregardingSize(toBeSyncedFile.getName(),
toBeSyncedFile, true);
if (scheduled.isPresent()) {
count++;
+ } else {
+ LOG.info("File [{}] not setup for upload",
toBeSyncedFile.getName());
}
}
@@ -231,13 +234,29 @@ public class UploadStagingCache implemen
* @return An Optional SettableFuture.
*/
public Optional<SettableFuture<Integer>> put(String id, File input) {
+ return putOptionalDisregardingSize(id, input, false);
+ }
+
+ /**
+ * Puts the file into the staging cache if ignoreSize else if possible
+ * Returns an optional SettableFuture if staged for upload otherwise empty.
+ *
+ * @param id
+ * @param input
+ * @param ignoreSize
+ * @return
+ */
+ private Optional<SettableFuture<Integer>>
putOptionalDisregardingSize(String id, File input,
+ boolean ignoreSize) {
cacheStats.markRequest();
long length = input.length();
File uploadFile = DataStoreCacheUtils.getFile(id, uploadCacheSpace);
- // if size permits and not upload complete or already scheduled for
upload
- if (currentSize.addAndGet(length) <= size
+ // if ignoreSize update internal size else size permits
+ // and not upload complete or already scheduled for upload
+ if (((ignoreSize && currentSize.addAndGet(length) >= 0)
+ || currentSize.addAndGet(length) <= size)
&& !attic.containsKey(id)
&& map.putIfAbsent(id, uploadFile) == null ) {
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1766345&r1=1766344&r2=1766345&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
Mon Oct 24 04:52:52 2016
@@ -85,17 +85,17 @@ public class UploadStagingCacheTest exte
@Before
public void setup() throws IOException {
root = folder.newFolder();
- init();
+ init(0);
}
- private void init() {
+ private void init(int i) {
// uploader
uploader = new TestStagingUploader(root);
// create executor
taskLatch = new CountDownLatch(1);
callbackLatch = new CountDownLatch(1);
- afterExecuteLatch = new CountDownLatch(1);
+ afterExecuteLatch = new CountDownLatch(i);
executor = new TestExecutor(1, taskLatch, callbackLatch,
afterExecuteLatch);
// stats
@@ -502,7 +502,7 @@ public class UploadStagingCacheTest exte
closer.close();
// Start again
- init();
+ init(0);
taskLatch.countDown();
callbackLatch.countDown();
afterExecuteLatch.await();
@@ -515,6 +515,50 @@ public class UploadStagingCacheTest exte
assertCacheStats(stagingCache, 0, 0, 1, 1);
}
+ /**
+ * Test build on start with more files available in terms of total size in
the upload cache.
+ * @throws Exception
+ */
+ @Test
+ public void testBuildMoreThanCacheSize() throws Exception {
+ closer.close();
+
+ // create load greater than the cache size upgrades or cache size
changes
+ File f1 = copyToFile(randomStream(1, 4 * 1024),
+ DataStoreCacheUtils.getFile(ID_PREFIX + "1", new File(root,
"upload")));
+ File f2 = copyToFile(randomStream(2, 4 * 1024),
+ DataStoreCacheUtils.getFile(ID_PREFIX + "2", new File(root,
"upload")));
+ File f3 = copyToFile(randomStream(3, 4 * 1024),
+ DataStoreCacheUtils.getFile(ID_PREFIX + "3", new File(root,
"upload")));
+ // Directly add files to staging dir simulating an upgrade scenario
+
+ // Start staging cache
+ init(3);
+
+ List<ListenableFuture<Integer>> futures = put(folder);
+ // Not staged as already full
+ assertTrue(futures.isEmpty());
+
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ afterExecuteLatch.await();
+
+ waitFinish(futures);
+
+ assertNull(stagingCache.getIfPresent(ID_PREFIX + 1));
+ assertNull(stagingCache.getIfPresent(ID_PREFIX + 2));
+ assertNull(stagingCache.getIfPresent(ID_PREFIX + 3));
+
+ // Initial files should have been uploaded
+ assertTrue(Files.equal(copyToFile(randomStream(1, 4 * 1024),
folder.newFile()),
+ uploader.read(ID_PREFIX + 1)));
+ assertTrue(Files.equal(copyToFile(randomStream(2, 4 * 1024),
folder.newFile()),
+ uploader.read(ID_PREFIX + 2)));
+ assertTrue(Files.equal(copyToFile(randomStream(3, 4 * 1024),
folder.newFile()),
+ uploader.read(ID_PREFIX + 3)));
+ assertCacheStats(stagingCache, 0, 0, 3, 4);
+ }
+
/** -------------------- Helper methods
----------------------------------------------------**/
private static SettableFuture<File>
copyStreamThread(ListeningExecutorService executor,