Author: chetanm
Date: Thu Jul 16 08:51:46 2015
New Revision: 1691338
URL: http://svn.apache.org/r1691338
Log:
OAK-3110 - AsyncIndexer fails due to FileNotFoundException thrown by
CopyOnWrite logic
Merging 1691331,1691332,1691333
Modified:
jackrabbit/oak/branches/1.2/ (props changed)
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jul 16 08:51:46 2015
@@ -1,3 +1,3 @@
/jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820,1684868,1685023,1685075,1685370
,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691151,1691167,1691183,1691210,1691307
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820,1684868,1685023,1685075,1685370
,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691151,1691167,1691183,1691210,1691307,1691331-1691333
/jackrabbit/trunk:1345480
Modified:
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1691338&r1=1691337&r2=1691338&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
Thu Jul 16 08:51:46 2015
@@ -75,6 +75,7 @@ import static com.google.common.base.Pre
import static com.google.common.collect.Iterables.toArray;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Maps.newConcurrentMap;
+import static com.google.common.collect.Maps.newHashMap;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
public class IndexCopier implements CopyOnReadStatsMBean, Closeable {
@@ -111,6 +112,7 @@ public class IndexCopier implements Copy
private final Map<String, String> indexPathMapping = newConcurrentMap();
+ private final Map<String, Set<String>> sharedWorkingSetMap = newHashMap();
private final Map<String, String> indexPathVersionMapping =
newConcurrentMap();
private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles =
newConcurrentMap();
private final Set<LocalIndexFile> copyInProgressFiles =
Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
@@ -131,12 +133,13 @@ public class IndexCopier implements Copy
public Directory wrapForRead(String indexPath, IndexDefinition definition,
Directory remote) throws IOException {
Directory local = createLocalDirForIndexReader(indexPath, definition);
- return new CopyOnReadDirectory(remote, local, prefetchEnabled,
indexPath);
+ return new CopyOnReadDirectory(remote, local, prefetchEnabled,
indexPath, getSharedWorkingSet(definition));
}
public Directory wrapForWrite(IndexDefinition definition, Directory
remote, boolean reindexMode) throws IOException {
Directory local = createLocalDirForIndexWriter(definition);
- return new CopyOnWriteDirectory(remote, local, reindexMode);
+ return new CopyOnWriteDirectory(remote, local, reindexMode,
+ getIndexPathForLogging(definition),
getSharedWorkingSet(definition));
}
@Override
@@ -238,6 +241,34 @@ public class IndexCopier implements Copy
}
/**
+ * Provide the corresponding shared state to enable COW inform COR
+ * about new files it is creating while indexing. This would allow COR to
ignore
+ * such files while determining the deletion candidates.
+ *
+ * @param defn index definition for which the directory is being created
+ * @return a set to maintain the state of new files being created by the
COW Directory
+ */
+ private Set<String> getSharedWorkingSet(IndexDefinition defn){
+ String indexPath = defn.getIndexPathFromConfig();
+
+ if (indexPath == null){
+ //With indexPath null the working directory would not
+ //be shared between COR and COW. So just return a new set
+ return new HashSet<String>();
+ }
+
+ Set<String> sharedSet;
+ synchronized (sharedWorkingSetMap){
+ sharedSet = sharedWorkingSetMap.get(indexPath);
+ if (sharedSet == null){
+ sharedSet = Sets.newConcurrentHashSet();
+ sharedWorkingSetMap.put(indexPath, sharedSet);
+ }
+ }
+ return sharedSet;
+ }
+
+ /**
* Creates the workDir. If it exists then it is cleaned
*
* @param indexRootDir root directory under which all indexing related
files are managed
@@ -250,6 +281,14 @@ public class IndexCopier implements Copy
return workDir;
}
+ private static String getIndexPathForLogging(IndexDefinition defn){
+ String indexPath = defn.getIndexPathFromConfig();
+ if (indexPath == null){
+ return "UNKNOWN";
+ }
+ return indexPath;
+ }
+
/**
* Directory implementation which lazily copies the index files from a
* remote directory in background.
@@ -266,12 +305,17 @@ public class IndexCopier implements Copy
*/
private final Set<String> localFileNames = Sets.newConcurrentHashSet();
- public CopyOnReadDirectory(Directory remote, Directory local, boolean
prefetch, String indexPath) throws IOException {
+ public CopyOnReadDirectory(Directory remote, Directory local, boolean
prefetch,
+ String indexPath, Set<String>
sharedWorkingSet) throws IOException {
super(remote);
this.remote = remote;
this.local = local;
this.indexPath = indexPath;
+
this.localFileNames.addAll(Arrays.asList(local.listAll()));
+ //Remove files which are being worked upon by COW
+ this.localFileNames.removeAll(sharedWorkingSet);
+
if (prefetch) {
prefetchIndexFiles();
}
@@ -540,6 +584,8 @@ public class IndexCopier implements Copy
private final AtomicReference<Throwable> errorInCopy = new
AtomicReference<Throwable>();
private final CountDownLatch copyDone = new CountDownLatch(1);
private final boolean reindexMode;
+ private final String indexPathForLogging;
+ private final Set<String> sharedWorkingSet;
/**
* Current background task
@@ -558,7 +604,8 @@ public class IndexCopier implements Copy
Callable<Void> task = queue.poll();
if (task != null && task != STOP) {
if (errorInCopy.get() != null) {
- log.trace("Skipping task {} as some exception
occurred in previous run", task);
+ log.trace("[COW][{}] Skipping task {} as some
exception occurred in previous run",
+ indexPathForLogging, task);
} else {
task.call();
}
@@ -571,7 +618,8 @@ public class IndexCopier implements Copy
}
} catch (Throwable t) {
errorInCopy.set(t);
- log.debug("Error occurred while copying files. Further
processing would be skipped", t);
+ log.debug("[COW][{}] Error occurred while copying
files. Further processing would " +
+ "be skipped", indexPathForLogging, t);
currentTask.onComplete(completionHandler);
}
return null;
@@ -590,11 +638,14 @@ public class IndexCopier implements Copy
}
};
- public CopyOnWriteDirectory(Directory remote, Directory local, boolean
reindexMode) throws IOException {
+ public CopyOnWriteDirectory(Directory remote, Directory local, boolean
reindexMode,
+ String indexPathForLogging, Set<String>
sharedWorkingSet) throws IOException {
super(local);
this.remote = remote;
this.local = local;
+ this.indexPathForLogging = indexPathForLogging;
this.reindexMode = reindexMode;
+ this.sharedWorkingSet = sharedWorkingSet;
initialize();
}
@@ -610,7 +661,7 @@ public class IndexCopier implements Copy
@Override
public void deleteFile(String name) throws IOException {
- log.trace("[COW] Deleted file {}", name);
+ log.trace("[COW][{}] Deleted file {}", indexPathForLogging, name);
COWFileReference ref = fileMap.remove(name);
if (ref != null) {
ref.delete();
@@ -634,6 +685,7 @@ public class IndexCopier implements Copy
}
ref = new COWLocalFileReference(name);
fileMap.put(name, ref);
+ sharedWorkingSet.add(name);
return ref.createOutput(context);
}
@@ -674,7 +726,7 @@ public class IndexCopier implements Copy
"while processing copy task for" +
remote.toString());
}
}
- PERF_LOGGER.end(start, -1, "Completed pending copying task
{}", pendingCopies);
+ PERF_LOGGER.end(start, -1, "[COW][{}] Completed pending
copying task {}", indexPathForLogging, pendingCopies);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
@@ -682,7 +734,7 @@ public class IndexCopier implements Copy
Throwable t = errorInCopy.get();
if (t != null){
- throw new IOException("Error occurred while copying files", t);
+ throw new IOException("Error occurred while copying files for
" + indexPathForLogging, t);
}
//Sanity check
@@ -697,24 +749,25 @@ public class IndexCopier implements Copy
skippedFromUploadSize.addAndGet(skippedFilesSize);
- String msg = "CopyOnWrite stats : Skipped copying {} files with
total size {}";
+ String msg = "[COW][{}] CopyOnWrite stats : Skipped copying {}
files with total size {}";
if (reindexMode || skippedFilesSize > 10 * FileUtils.ONE_MB){
- log.info(msg, skippedFiles.size(),
humanReadableByteCount(skippedFilesSize));
+ log.info(msg, indexPathForLogging, skippedFiles.size(),
humanReadableByteCount(skippedFilesSize));
} else {
- log.debug(msg, skippedFiles.size(),
humanReadableByteCount(skippedFilesSize));
+ log.debug(msg,indexPathForLogging, skippedFiles.size(),
humanReadableByteCount(skippedFilesSize));
}
if (log.isTraceEnabled()){
- log.trace("File listing - Upon completion {}",
Arrays.toString(remote.listAll()));
+ log.trace("[COW][{}] File listing - Upon completion {}",
indexPathForLogging, Arrays.toString(remote.listAll()));
}
local.close();
remote.close();
+ sharedWorkingSet.clear();
}
@Override
public String toString() {
- return String.format("[COW] Local %s, Remote %s", local, remote);
+ return String.format("[COW][%s] Local %s, Remote %s",
indexPathForLogging, local, remote);
}
private long getSkippedFilesSize() {
@@ -741,7 +794,7 @@ public class IndexCopier implements Copy
}
if (log.isTraceEnabled()){
- log.trace("File listing - Start" +
Arrays.toString(remote.listAll()));
+ log.trace("[COW][{}] File listing - At start {}",
indexPathForLogging, Arrays.toString(remote.listAll()));
}
}
@@ -753,7 +806,7 @@ public class IndexCopier implements Copy
scheduledForCopyCount.decrementAndGet();
if (deletedFilesLocal.contains(name)){
skippedFiles.add(name);
- log.trace("[COW] Skip copying of deleted file {}",
name);
+ log.trace("[COW][{}] Skip copying of deleted file {}",
indexPathForLogging, name);
return null;
}
long fileSize = local.fileLength(name);
@@ -764,7 +817,7 @@ public class IndexCopier implements Copy
local.copy(remote, name, name, IOContext.DEFAULT);
doneCopy(file, start);
- PERF_LOGGER.end(perfStart, 0, "Copied to remote {} ",
name);
+ PERF_LOGGER.end(perfStart, 0, "[COW][{}] Copied to remote
{} ",indexPathForLogging, name);
return null;
}
@@ -780,7 +833,7 @@ public class IndexCopier implements Copy
@Override
public Void call() throws Exception {
if (!skippedFiles.contains(name)) {
- log.trace("[COW] Marking as deleted {}", name);
+ log.trace("[COW][{}] Marking as deleted {}",
indexPathForLogging, name);
remote.deleteFile(name);
}
return null;
@@ -895,7 +948,7 @@ public class IndexCopier implements Copy
@Override
public IndexOutput createOutput(IOContext context) throws
IOException {
- log.debug("[COW] Creating output {}", name);
+ log.debug("[COW][{}] Creating output {}", indexPathForLogging,
name);
return new CopyOnCloseIndexOutput(local.createOutput(name,
context));
}
@@ -981,7 +1034,7 @@ public class IndexCopier implements Copy
} catch (IOException e) {
failedToDelete(file);
log.debug("Error occurred while removing deleted file {} from
Local {}. " +
- "Attempt would be maid to delete it on next run ",
fileName, dir, e);
+ "Attempt would be made to delete it on next run ",
fileName, dir, e);
}
return successFullyDeleted;
}
Modified:
jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1691338&r1=1691337&r2=1691338&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
(original)
+++
jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
Thu Jul 16 08:51:46 2015
@@ -54,10 +54,12 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RAMDirectory;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -894,6 +896,66 @@ public class IndexCopierTest {
assertNotNull("Close should have thrown an exception", error.get());
}
+ /**
+ * Test the interaction between COR and COW using same underlying directory
+ */
+ @Test
+ public void cowConcurrentAccess() throws Exception{
+ CollectingExecutor executor = new CollectingExecutor();
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ executor.setForwardingExecutor(executorService);
+
+ Directory baseDir = new CloseSafeDir();
+ String indexPath = "/foo";
+ builder.setProperty(LuceneIndexConstants.INDEX_PATH, indexPath);
+ IndexDefinition defn = new IndexDefinition(root,
builder.getNodeState());
+ IndexCopier copier = new RAMIndexCopier(baseDir, executor,
getWorkDir(), true);
+
+ Directory remote = new CloseSafeDir();
+ byte[] f1 = writeFile(remote, "f1");
+
+ Directory cor1 = copier.wrapForRead(indexPath, defn, remote);
+ readAndAssert(cor1, "f1", f1);
+ cor1.close();
+
+ final CountDownLatch pauseCopyLatch = new CountDownLatch(1);
+ Directory remote2 = new FilterDirectory(remote) {
+ @Override
+ public IndexOutput createOutput(String name, IOContext context)
throws IOException {
+ try {
+ pauseCopyLatch.await();
+ } catch (InterruptedException ignore) {
+
+ }
+ return super.createOutput(name, context);
+ }
+ };
+
+ //Start copying a file to remote via COW
+ Directory cow1 = copier.wrapForWrite(defn, remote2, false);
+ byte[] f2 = writeFile(cow1, "f2");
+
+ //Before copy is done to remote lets delete f1 from remote and
+ //open a COR and close it such that it triggers delete of f1
+ remote.deleteFile("f1");
+ Directory cor2 = copier.wrapForRead(indexPath, defn, remote);
+
+ //Ensure that deletion task submitted to executor get processed
immediately
+ executor.enableImmediateExecution();
+ cor2.close();
+ executor.enableDelayedExecution();
+
+ assertFalse(baseDir.fileExists("f1"));
+ assertFalse("f2 should not have been copied to remote so far",
remote.fileExists("f2"));
+ assertTrue("f2 should exist", baseDir.fileExists("f2"));
+
+ pauseCopyLatch.countDown();
+ cow1.close();
+ assertTrue("f2 should exist", remote.fileExists("f2"));
+
+ executorService.shutdown();
+ }
+
private byte[] writeFile(Directory dir, String name) throws IOException {
byte[] data = randomBytes(rnd.nextInt(maxFileSize) + 1);
IndexOutput o = dir.createOutput(name, IOContext.DEFAULT);
@@ -972,20 +1034,22 @@ public class IndexCopierTest {
private static class CollectingExecutor implements Executor {
final BlockingQueue<Runnable> commands = new
LinkedBlockingQueue<Runnable>();
- private boolean immediateExecution = false;
+ private volatile boolean immediateExecution = false;
private volatile Executor forwardingExecutor;
@Override
public void execute(Runnable command) {
+ if (immediateExecution){
+ command.run();
+ return;
+ }
+
if (forwardingExecutor != null){
forwardingExecutor.execute(command);
return;
}
- if (immediateExecution){
- command.run();
- } else {
- commands.add(command);
- }
+
+ commands.add(command);
}
void executeAll(){