Author: amitj
Date: Tue Jul 17 05:04:08 2018
New Revision: 1836082

URL: http://svn.apache.org/viewvc?rev=1836082&view=rev
Log:
OAK-7638: Race condition when simultaneous request to stage file for async 
upload

- Fixed the entry condition to check for file existence and create if required

Modified:
    
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
    
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
    
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
    
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java

Modified: 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1836082&r1=1836081&r2=1836082&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
 Tue Jul 17 05:04:08 2018
@@ -21,6 +21,7 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.security.DigestOutputStream;
@@ -38,8 +39,8 @@ import com.google.common.base.Stopwatch;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
-import com.google.common.io.Closeables;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.core.data.AbstractDataStore;
@@ -169,6 +170,10 @@ public abstract class AbstractSharedCach
                     @Override public void write(String id, File file) throws 
DataStoreException {
                         backend.write(new DataIdentifier(id), file);
                     }
+
+                @Override public void adopt(File f, File moved) throws 
IOException {
+                    FileUtils.moveFile(f, moved);
+                }
             }, statisticsProvider, listeningExecutor, schedulerExecutor, 
executor, stagingPurgeInterval,
                 stagingRetryInterval);
     }

Modified: 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1836082&r1=1836081&r2=1836082&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
 Tue Jul 17 05:04:08 2018
@@ -294,14 +294,10 @@ public class UploadStagingCache implemen
         if (((ignoreSize && currentSize.addAndGet(length) >= 0)
                 || currentSize.addAndGet(length) <= size)
             && !attic.containsKey(id)
+            && existsOrNotExistsMoveFile(input, uploadFile, currentSize, 
length)
             && map.putIfAbsent(id, uploadFile) == null ) {
 
             try {
-                if (!uploadFile.exists()) {
-                    FileUtils.moveFile(input, uploadFile);
-                    LOG.trace("File [{}] moved to staging cache [{}]", input, 
uploadFile);
-                }
-
                 // update stats
                 cacheStats.markHit();
                 cacheStats.incrementCount();
@@ -328,6 +324,23 @@ public class UploadStagingCache implemen
         return Optional.absent();
     }
 
+    private synchronized boolean existsOrNotExistsMoveFile(File source, File 
destination, AtomicLong currentSize,
+        long length) {
+        if (!destination.exists()) {
+            try {
+                uploader.adopt(source, destination);
+                LOG.trace("Moved file to staging");
+            } catch (IOException e) {
+                LOG.info("Error moving file to staging", e);
+                currentSize.addAndGet(-length);
+                return false;
+            }
+            LOG.trace("File [{}] moved to staging cache [{}]", source, 
destination);
+            return true;
+        }
+        return true;
+    }
+
     /**
      * Stages the file for async upload.
      * * Puts the file into the stage caching file system directory
@@ -809,4 +822,6 @@ class StagingCacheStats extends Annotate
  */
 interface StagingUploader {
     void write(String id, File f) throws DataStoreException;
+
+    void adopt(File f, File moved) throws IOException;
 }

Modified: 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java?rev=1836082&r1=1836081&r2=1836082&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
 Tue Jul 17 05:04:08 2018
@@ -70,12 +70,19 @@ public class AbstractDataStoreCacheTest
 
     static class TestStagingUploader implements StagingUploader {
         private final File root;
+        private CountDownLatch adoptLatch;
 
         public TestStagingUploader(File dir) {
             this.root = new File(dir, "datastore");
             root.mkdirs();
         }
 
+        public TestStagingUploader(File dir, CountDownLatch adoptLatch) {
+            this.root = new File(dir, "datastore");
+            root.mkdirs();
+            this.adoptLatch = adoptLatch;
+        }
+
         @Override public void write(String id, File f) throws 
DataStoreException {
             try {
                 File move = getFile(id, root);
@@ -87,6 +94,17 @@ public class AbstractDataStoreCacheTest
             }
         }
 
+        @Override public void adopt(File f, File moved) throws IOException {
+            try {
+                if (adoptLatch != null) {
+                    adoptLatch.await();
+                }
+            } catch (Exception e) {
+                LOG.info("Error in adopt", e);
+            }
+            FileUtils.moveFile(f, moved);
+        }
+
         public File read(String id) {
             return getFile(id, root);
         }

Modified: 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1836082&r1=1836081&r2=1836082&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
 Tue Jul 17 05:04:08 2018
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -401,6 +402,8 @@ public class UploadStagingCacheTest exte
      */
     @Test
     public void testConcurrentSameAdd() throws Exception {
+        LOG.info("Starting testConcurrentSameAdd");
+
         // Add load
         List<ListenableFuture<Integer>> futures = put(folder);
 
@@ -415,6 +418,44 @@ public class UploadStagingCacheTest exte
         assertFuture(futures, 0);
 
         assertCacheStats(stagingCache, 0, 0, 1, 2);
+
+        LOG.info("Finished testConcurrentSameAdd");
+    }
+
+    /**
+     * Stage request same file concurrently.
+     * @throws Exception
+     */
+    @Test
+    public void testConcurrentSameAddRequest() throws Exception {
+        LOG.info("Starting testConcurrentSameAddRequest");
+
+        closer.close();
+
+        List<ListenableFuture<Integer>> futures = Lists.newArrayList();
+        CountDownLatch moveLatch = new CountDownLatch(1);
+        init(1, new TestStagingUploader(folder.newFolder(), moveLatch), null);
+
+        //1st request
+        ListenableFuture<Boolean> resultReq1 = putThread(folder, futures);
+
+        //2nd Request
+        ListenableFuture<Boolean> resultReq2 = putThread(folder, futures);
+        Thread.sleep(200);
+
+        // Allow any thread to start moving
+        moveLatch.countDown();
+
+        assertTrue(resultReq1.get());
+        assertTrue(resultReq2.get());
+
+        taskLatch.countDown();
+        callbackLatch.countDown();
+
+        assertFuture(futures, 0);
+        assertCacheStats(stagingCache, 0, 0, 1, 2);
+
+        LOG.info("Finished testConcurrentSameAddRequest");
     }
 
     /**
@@ -742,6 +783,31 @@ public class UploadStagingCacheTest exte
         }
     }
 
+
+    private ListenableFuture<Boolean> putThread(TemporaryFolder folder, 
List<ListenableFuture<Integer>> futures) {
+        ListeningExecutorService executorService =
+            MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+        closer.register(new ExecutorCloser(executorService));
+
+        ListenableFuture<Boolean> result = executorService.submit(new 
Callable<Boolean>() {
+            @Override public Boolean call() {
+                try {
+                    LOG.info("Starting put");
+                    futures.addAll(put(folder));
+                    LOG.info("Finished put");
+                    File f = stagingCache.getIfPresent(ID_PREFIX + 0);
+                    LOG.info("Retrieved file {}, {}", f, f.exists());
+                    return f != null && f.exists();
+                } catch (Exception e) {
+                    LOG.info("Exception in get", e);
+                }
+                return false;
+            }
+        });
+
+        return result;
+    }
+
     private List<ListenableFuture<Integer>> put(TemporaryFolder folder)
         throws IOException {
         File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());


Reply via email to