Author: amitj
Date: Wed Oct  3 04:29:24 2018
New Revision: 1842677

URL: http://svn.apache.org/viewvc?rev=1842677&view=rev
Log:
OAK-7798: Return stream for downloaded files rather than directly from backend

- Returning stream from a downloaded file rather than directly from backend

Modified:
    
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
    
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java

Modified: 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1842677&r1=1842676&r2=1842677&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
 Wed Oct  3 04:29:24 2018
@@ -37,6 +37,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import com.google.common.io.Closeables;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FilenameUtils;
@@ -51,12 +52,14 @@ import org.apache.jackrabbit.oak.spi.blo
 import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
 import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.util.LazyFileInputStream;
 import org.apache.jackrabbit.util.TransientFileFactory;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.jackrabbit.oak.commons.FileIOUtils.copyInputStreamToFile;
 import static 
org.apache.jackrabbit.oak.spi.blob.BlobOptions.UploadType.SYNCHRONOUS;
 
 /**
@@ -129,7 +132,7 @@ public abstract class AbstractSharedCach
     /**
      * DataStore cache
      */
-    private CompositeDataStoreCache cache;
+    protected CompositeDataStoreCache cache;
 
     /**
      * The delegate backend
@@ -200,13 +203,13 @@ public abstract class AbstractSharedCach
         File cached = cache.getIfPresent(dataIdentifier.toString());
         if (cached != null && cached.exists()) {
             return new FileCacheDataRecord(this, backend, dataIdentifier, 
cached.length(),
-                cached.lastModified());
+                tmp, cached.lastModified());
         } else {
             // Return the metadata from backend and lazily load the stream
             try {
                 DataRecord rec = backend.getRecord(dataIdentifier);
                 return new FileCacheDataRecord(this, backend, dataIdentifier, 
rec.getLength(),
-                    rec.getLastModified());
+                    tmp, rec.getLastModified());
             } catch (Exception e) {
                 LOG.error("Error retrieving record [{}]", dataIdentifier, e);
             }
@@ -295,12 +298,14 @@ public abstract class AbstractSharedCach
         private final long length;
         private final long lastModified;
         private final AbstractSharedCachingDataStore store;
+        private final File temp;
 
         public FileCacheDataRecord(AbstractSharedCachingDataStore store, 
AbstractSharedBackend backend,
-            DataIdentifier identifier, long length,
+            DataIdentifier identifier, long length, File temp,
             long lastModified) {
             super(backend, identifier);
             this.length = length;
+            this.temp = temp;
             this.lastModified = lastModified;
             this.store = store;
         }
@@ -323,7 +328,16 @@ public abstract class AbstractSharedCach
             try {
                 // If cache configured to 0 will return null
                 if (cached == null || !cached.exists()) {
-                    return backend.getRecord(getIdentifier()).getStream();
+                    InputStream in = null;
+                    try {
+                        TransientFileFactory fileFactory = 
TransientFileFactory.getInstance();
+                        File tmpFile = 
fileFactory.createTransientFile("temp0cache", null, temp);
+                        in = backend.getRecord(getIdentifier()).getStream();
+                        copyInputStreamToFile(in, tmpFile);
+                        return new LazyFileInputStream(tmpFile);
+                    } finally {
+                        Closeables.close(in, false);
+                    }
                 } else {
                     return new FileInputStream(cached);
                 }

Modified: 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java?rev=1842677&r1=1842676&r2=1842677&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
 Wed Oct  3 04:29:24 2018
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.security.DigestOutputStream;
 import java.security.MessageDigest;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -33,7 +34,9 @@ import com.google.common.collect.Iterato
 import com.google.common.io.Closer;
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.output.NullOutputStream;
 import org.apache.jackrabbit.core.data.DataIdentifier;
 import org.apache.jackrabbit.core.data.DataRecord;
@@ -83,6 +86,8 @@ public class CachingDataStoreTest extend
     private ScheduledExecutorService scheduledExecutor;
     private AbstractSharedCachingDataStore dataStore;
     private TestMemoryBackend backend;
+    private StatisticsProvider statsProvider;
+    private TestExecutor listeningExecutor;
 
     @Before
     public void setup() throws Exception {
@@ -97,12 +102,12 @@ public class CachingDataStoreTest extend
         taskLatch = new CountDownLatch(1);
         callbackLatch = new CountDownLatch(1);
         afterExecuteLatch = new CountDownLatch(i);
-        TestExecutor listeningExecutor = new TestExecutor(1, taskLatch, 
callbackLatch, afterExecuteLatch);
+        listeningExecutor = new TestExecutor(1, taskLatch, callbackLatch, 
afterExecuteLatch);
 
         // stats
         ScheduledExecutorService statsExecutor = 
Executors.newSingleThreadScheduledExecutor();
         closer.register(new ExecutorCloser(statsExecutor, 500, 
TimeUnit.MILLISECONDS));
-        StatisticsProvider statsProvider = new 
DefaultStatisticsProvider(statsExecutor);
+        statsProvider = new DefaultStatisticsProvider(statsExecutor);
 
         scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
         closer.register(new ExecutorCloser(scheduledExecutor, 500, 
TimeUnit.MILLISECONDS));
@@ -131,6 +136,64 @@ public class CachingDataStoreTest extend
         LOG.info("Finished init");
     }
 
+    @Test
+    public void loadCacheErrorDirectTemp() throws Exception {
+        LOG.info("Started loadCacheErrorDirectTemp");
+        loadDirectBackendTemp(64 * 1024 * 1024);
+        LOG.info("Finished loadCacheErrorDirectTemp");
+    }
+
+    @Test
+    public void cacheZeroDirectTemp() throws Exception {
+        LOG.info("Started cacheZeroDirectTemp");
+        loadDirectBackendTemp(0);
+        LOG.info("Finished cacheZeroDirectTemp");
+    }
+
+    public void loadDirectBackendTemp(long cacheSize) throws Exception {
+        LOG.info("Started loadDirectBackendTemp");
+        dataStore.close();
+        init(1, (int) cacheSize, 0);
+        String path = FilenameUtils
+            .normalizeNoEndSeparator(new File(root.getAbsolutePath() + 
"/repository/datastore").getAbsolutePath());
+        String home = FilenameUtils.normalizeNoEndSeparator(new 
File(root.getAbsolutePath()).getAbsolutePath());
+
+        dataStore.cache = new CompositeDataStoreCache(path , new File(home), 
cacheSize, 0,
+            0,
+            new TestErrorCacheLoader(new File(path), 40), new 
StagingUploader() {
+            @Override public void write(String id, File file) throws 
DataStoreException {
+                backend.write(new DataIdentifier(id), file);
+            }
+
+            @Override public void adopt(File f, File moved) throws IOException 
{
+                FileUtils.moveFile(f, moved);
+            }
+        }, statsProvider, listeningExecutor, scheduledExecutor, 
dataStore.executor, 300,
+            600);
+
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        String id = getIdForInputStream(f);
+
+        DataRecord rec;
+        if (cacheSize != 0) {
+            backend.write(new DataIdentifier(id), f);
+            rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+        } else {
+            FileInputStream fin = new FileInputStream(f);
+            closer.register(fin);
+            rec = dataStore.addRecord(fin);
+        }
+        assertEquals(id, rec.getIdentifier().toString());
+        assertFile(rec.getStream(), f, folder);
+
+        File tmp = new File(new File(path), "tmp");
+        Collection<File> temp0cacheFiles =
+            FileUtils.listFiles(tmp, 
FileFilterUtils.prefixFileFilter("temp0cache"), null);
+        assertEquals(1, temp0cacheFiles.size());
+
+        LOG.info("Finished loadDirectBackendTemp");
+    }
+
     /**
      * Add, get, delete when zero cache size.
      * @throws Exception


Reply via email to