Author: amitj
Date: Wed Oct 3 04:29:24 2018
New Revision: 1842677
URL: http://svn.apache.org/viewvc?rev=1842677&view=rev
Log:
OAK-7798: Return stream for downloaded files rather than directly from backend
- Returning stream from a downloaded file rather than directly from backend
Modified:
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
Modified:
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1842677&r1=1842676&r2=1842677&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
(original)
+++
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
Wed Oct 3 04:29:24 2018
@@ -37,6 +37,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
+import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
@@ -51,12 +52,14 @@ import org.apache.jackrabbit.oak.spi.blo
import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.util.LazyFileInputStream;
import org.apache.jackrabbit.util.TransientFileFactory;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.jackrabbit.oak.commons.FileIOUtils.copyInputStreamToFile;
import static
org.apache.jackrabbit.oak.spi.blob.BlobOptions.UploadType.SYNCHRONOUS;
/**
@@ -129,7 +132,7 @@ public abstract class AbstractSharedCach
/**
* DataStore cache
*/
- private CompositeDataStoreCache cache;
+ protected CompositeDataStoreCache cache;
/**
* The delegate backend
@@ -200,13 +203,13 @@ public abstract class AbstractSharedCach
File cached = cache.getIfPresent(dataIdentifier.toString());
if (cached != null && cached.exists()) {
return new FileCacheDataRecord(this, backend, dataIdentifier,
cached.length(),
- cached.lastModified());
+ tmp, cached.lastModified());
} else {
// Return the metadata from backend and lazily load the stream
try {
DataRecord rec = backend.getRecord(dataIdentifier);
return new FileCacheDataRecord(this, backend, dataIdentifier,
rec.getLength(),
- rec.getLastModified());
+ tmp, rec.getLastModified());
} catch (Exception e) {
LOG.error("Error retrieving record [{}]", dataIdentifier, e);
}
@@ -295,12 +298,14 @@ public abstract class AbstractSharedCach
private final long length;
private final long lastModified;
private final AbstractSharedCachingDataStore store;
+ private final File temp;
public FileCacheDataRecord(AbstractSharedCachingDataStore store,
AbstractSharedBackend backend,
- DataIdentifier identifier, long length,
+ DataIdentifier identifier, long length, File temp,
long lastModified) {
super(backend, identifier);
this.length = length;
+ this.temp = temp;
this.lastModified = lastModified;
this.store = store;
}
@@ -323,7 +328,16 @@ public abstract class AbstractSharedCach
try {
// If cache configured to 0 will return null
if (cached == null || !cached.exists()) {
- return backend.getRecord(getIdentifier()).getStream();
+ InputStream in = null;
+ try {
+ TransientFileFactory fileFactory =
TransientFileFactory.getInstance();
+ File tmpFile =
fileFactory.createTransientFile("temp0cache", null, temp);
+ in = backend.getRecord(getIdentifier()).getStream();
+ copyInputStreamToFile(in, tmpFile);
+ return new LazyFileInputStream(tmpFile);
+ } finally {
+ Closeables.close(in, false);
+ }
} else {
return new FileInputStream(cached);
}
Modified:
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java?rev=1842677&r1=1842676&r2=1842677&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
(original)
+++
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
Wed Oct 3 04:29:24 2018
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -33,7 +34,9 @@ import com.google.common.collect.Iterato
import com.google.common.io.Closer;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataRecord;
@@ -83,6 +86,8 @@ public class CachingDataStoreTest extend
private ScheduledExecutorService scheduledExecutor;
private AbstractSharedCachingDataStore dataStore;
private TestMemoryBackend backend;
+ private StatisticsProvider statsProvider;
+ private TestExecutor listeningExecutor;
@Before
public void setup() throws Exception {
@@ -97,12 +102,12 @@ public class CachingDataStoreTest extend
taskLatch = new CountDownLatch(1);
callbackLatch = new CountDownLatch(1);
afterExecuteLatch = new CountDownLatch(i);
- TestExecutor listeningExecutor = new TestExecutor(1, taskLatch,
callbackLatch, afterExecuteLatch);
+ listeningExecutor = new TestExecutor(1, taskLatch, callbackLatch,
afterExecuteLatch);
// stats
ScheduledExecutorService statsExecutor =
Executors.newSingleThreadScheduledExecutor();
closer.register(new ExecutorCloser(statsExecutor, 500,
TimeUnit.MILLISECONDS));
- StatisticsProvider statsProvider = new
DefaultStatisticsProvider(statsExecutor);
+ statsProvider = new DefaultStatisticsProvider(statsExecutor);
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
closer.register(new ExecutorCloser(scheduledExecutor, 500,
TimeUnit.MILLISECONDS));
@@ -131,6 +136,64 @@ public class CachingDataStoreTest extend
LOG.info("Finished init");
}
+ @Test
+ public void loadCacheErrorDirectTemp() throws Exception {
+ LOG.info("Started loadCacheErrorDirectTemp");
+ loadDirectBackendTemp(64 * 1024 * 1024);
+ LOG.info("Finished loadCacheErrorDirectTemp");
+ }
+
+ @Test
+ public void cacheZeroDirectTemp() throws Exception {
+ LOG.info("Started cacheZeroDirectTemp");
+ loadDirectBackendTemp(0);
+ LOG.info("Finished cacheZeroDirectTemp");
+ }
+
+ public void loadDirectBackendTemp(long cacheSize) throws Exception {
+ LOG.info("Started loadDirectBackendTemp");
+ dataStore.close();
+ init(1, (int) cacheSize, 0);
+ String path = FilenameUtils
+ .normalizeNoEndSeparator(new File(root.getAbsolutePath() +
"/repository/datastore").getAbsolutePath());
+ String home = FilenameUtils.normalizeNoEndSeparator(new
File(root.getAbsolutePath()).getAbsolutePath());
+
+ dataStore.cache = new CompositeDataStoreCache(path , new File(home),
cacheSize, 0,
+ 0,
+ new TestErrorCacheLoader(new File(path), 40), new
StagingUploader() {
+ @Override public void write(String id, File file) throws
DataStoreException {
+ backend.write(new DataIdentifier(id), file);
+ }
+
+ @Override public void adopt(File f, File moved) throws IOException
{
+ FileUtils.moveFile(f, moved);
+ }
+ }, statsProvider, listeningExecutor, scheduledExecutor,
dataStore.executor, 300,
+ 600);
+
+ File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+ String id = getIdForInputStream(f);
+
+ DataRecord rec;
+ if (cacheSize != 0) {
+ backend.write(new DataIdentifier(id), f);
+ rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+ } else {
+ FileInputStream fin = new FileInputStream(f);
+ closer.register(fin);
+ rec = dataStore.addRecord(fin);
+ }
+ assertEquals(id, rec.getIdentifier().toString());
+ assertFile(rec.getStream(), f, folder);
+
+ File tmp = new File(new File(path), "tmp");
+ Collection<File> temp0cacheFiles =
+ FileUtils.listFiles(tmp,
FileFilterUtils.prefixFileFilter("temp0cache"), null);
+ assertEquals(1, temp0cacheFiles.size());
+
+ LOG.info("Finished loadDirectBackendTemp");
+ }
+
/**
* Add, get, delete when zero cache size.
* @throws Exception