Author: chetanm
Date: Thu Jul 16 08:51:46 2015
New Revision: 1691338

URL: http://svn.apache.org/r1691338
Log:
OAK-3110 - AsyncIndexer fails due to FileNotFoundException thrown by 
CopyOnWrite logic

Merging 1691331,1691332,1691333

Modified:
    jackrabbit/oak/branches/1.2/   (props changed)
    
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
    
jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java

Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jul 16 08:51:46 2015
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820,1684868,1685023,1685075,1685370
 
,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691151,1691167,1691183,1691210,1691307
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820,1684868,1685023,1685075,1685370
 
,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691151,1691167,1691183,1691210,1691307,1691331-1691333
 /jackrabbit/trunk:1345480

Modified: 
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1691338&r1=1691337&r2=1691338&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
 Thu Jul 16 08:51:46 2015
@@ -75,6 +75,7 @@ import static com.google.common.base.Pre
 import static com.google.common.collect.Iterables.toArray;
 import static com.google.common.collect.Iterables.transform;
 import static com.google.common.collect.Maps.newConcurrentMap;
+import static com.google.common.collect.Maps.newHashMap;
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 
 public class IndexCopier implements CopyOnReadStatsMBean, Closeable {
@@ -111,6 +112,7 @@ public class IndexCopier implements Copy
 
 
     private final Map<String, String> indexPathMapping = newConcurrentMap();
+    private final Map<String, Set<String>> sharedWorkingSetMap = newHashMap();
     private final Map<String, String> indexPathVersionMapping = 
newConcurrentMap();
     private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = 
newConcurrentMap();
     private final Set<LocalIndexFile> copyInProgressFiles = 
Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
@@ -131,12 +133,13 @@ public class IndexCopier implements Copy
     public Directory wrapForRead(String indexPath, IndexDefinition definition,
             Directory remote) throws IOException {
         Directory local = createLocalDirForIndexReader(indexPath, definition);
-        return new CopyOnReadDirectory(remote, local, prefetchEnabled, 
indexPath);
+        return new CopyOnReadDirectory(remote, local, prefetchEnabled, 
indexPath, getSharedWorkingSet(definition));
     }
 
     public Directory wrapForWrite(IndexDefinition definition, Directory 
remote, boolean reindexMode) throws IOException {
         Directory local = createLocalDirForIndexWriter(definition);
-        return new CopyOnWriteDirectory(remote, local, reindexMode);
+        return new CopyOnWriteDirectory(remote, local, reindexMode,
+                getIndexPathForLogging(definition), 
getSharedWorkingSet(definition));
     }
 
     @Override
@@ -238,6 +241,34 @@ public class IndexCopier implements Copy
     }
 
     /**
+     * Provide the corresponding shared state to enable COW inform COR
+     * about new files it is creating while indexing. This would allow COR to 
ignore
+     * such files while determining the deletion candidates.
+     *
+     * @param defn index definition for which the directory is being created
+     * @return a set to maintain the state of new files being created by the 
COW Directory
+     */
+    private Set<String> getSharedWorkingSet(IndexDefinition defn){
+        String indexPath = defn.getIndexPathFromConfig();
+
+        if (indexPath == null){
+            //With indexPath null the working directory would not
+            //be shared between COR and COW. So just return a new set
+            return new HashSet<String>();
+        }
+
+        Set<String> sharedSet;
+        synchronized (sharedWorkingSetMap){
+            sharedSet = sharedWorkingSetMap.get(indexPath);
+            if (sharedSet == null){
+                sharedSet = Sets.newConcurrentHashSet();
+                sharedWorkingSetMap.put(indexPath, sharedSet);
+            }
+        }
+        return sharedSet;
+    }
+
+    /**
      * Creates the workDir. If it exists then it is cleaned
      *
      * @param indexRootDir root directory under which all indexing related 
files are managed
@@ -250,6 +281,14 @@ public class IndexCopier implements Copy
         return workDir;
     }
 
+    private static String getIndexPathForLogging(IndexDefinition defn){
+        String indexPath = defn.getIndexPathFromConfig();
+        if (indexPath == null){
+            return "UNKNOWN";
+        }
+        return indexPath;
+    }
+
     /**
      * Directory implementation which lazily copies the index files from a
      * remote directory in background.
@@ -266,12 +305,17 @@ public class IndexCopier implements Copy
          */
         private final Set<String> localFileNames = Sets.newConcurrentHashSet();
 
-        public CopyOnReadDirectory(Directory remote, Directory local, boolean 
prefetch, String indexPath) throws IOException {
+        public CopyOnReadDirectory(Directory remote, Directory local, boolean 
prefetch,
+                                   String indexPath, Set<String> 
sharedWorkingSet) throws IOException {
             super(remote);
             this.remote = remote;
             this.local = local;
             this.indexPath = indexPath;
+
             this.localFileNames.addAll(Arrays.asList(local.listAll()));
+            //Remove files which are being worked upon by COW
+            this.localFileNames.removeAll(sharedWorkingSet);
+
             if (prefetch) {
                 prefetchIndexFiles();
             }
@@ -540,6 +584,8 @@ public class IndexCopier implements Copy
         private final AtomicReference<Throwable> errorInCopy = new 
AtomicReference<Throwable>();
         private final CountDownLatch copyDone = new CountDownLatch(1);
         private final boolean reindexMode;
+        private final String indexPathForLogging;
+        private final Set<String> sharedWorkingSet;
 
         /**
          * Current background task
@@ -558,7 +604,8 @@ public class IndexCopier implements Copy
                         Callable<Void> task = queue.poll();
                         if (task != null && task != STOP) {
                             if (errorInCopy.get() != null) {
-                                log.trace("Skipping task {} as some exception 
occurred in previous run", task);
+                                log.trace("[COW][{}] Skipping task {} as some 
exception occurred in previous run",
+                                        indexPathForLogging, task);
                             } else {
                                 task.call();
                             }
@@ -571,7 +618,8 @@ public class IndexCopier implements Copy
                         }
                     } catch (Throwable t) {
                         errorInCopy.set(t);
-                        log.debug("Error occurred while copying files. Further 
processing would be skipped", t);
+                        log.debug("[COW][{}] Error occurred while copying 
files. Further processing would " +
+                                "be skipped", indexPathForLogging, t);
                         currentTask.onComplete(completionHandler);
                     }
                     return null;
@@ -590,11 +638,14 @@ public class IndexCopier implements Copy
             }
         };
 
-        public CopyOnWriteDirectory(Directory remote, Directory local, boolean 
reindexMode) throws IOException {
+        public CopyOnWriteDirectory(Directory remote, Directory local, boolean 
reindexMode,
+                                    String indexPathForLogging, Set<String> 
sharedWorkingSet) throws IOException {
             super(local);
             this.remote = remote;
             this.local = local;
+            this.indexPathForLogging = indexPathForLogging;
             this.reindexMode = reindexMode;
+            this.sharedWorkingSet = sharedWorkingSet;
             initialize();
         }
 
@@ -610,7 +661,7 @@ public class IndexCopier implements Copy
 
         @Override
         public void deleteFile(String name) throws IOException {
-            log.trace("[COW] Deleted file {}", name);
+            log.trace("[COW][{}] Deleted file {}", indexPathForLogging, name);
             COWFileReference ref = fileMap.remove(name);
             if (ref != null) {
                 ref.delete();
@@ -634,6 +685,7 @@ public class IndexCopier implements Copy
             }
             ref = new COWLocalFileReference(name);
             fileMap.put(name, ref);
+            sharedWorkingSet.add(name);
             return ref.createOutput(context);
         }
 
@@ -674,7 +726,7 @@ public class IndexCopier implements Copy
                                 "while processing copy task for" + 
remote.toString());
                     }
                 }
-                PERF_LOGGER.end(start, -1, "Completed pending copying task 
{}", pendingCopies);
+                PERF_LOGGER.end(start, -1, "[COW][{}] Completed pending 
copying task {}", indexPathForLogging, pendingCopies);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new IOException(e);
@@ -682,7 +734,7 @@ public class IndexCopier implements Copy
 
             Throwable t = errorInCopy.get();
             if (t != null){
-                throw new IOException("Error occurred while copying files", t);
+                throw new IOException("Error occurred while copying files for 
" + indexPathForLogging, t);
             }
 
             //Sanity check
@@ -697,24 +749,25 @@ public class IndexCopier implements Copy
 
             skippedFromUploadSize.addAndGet(skippedFilesSize);
 
-            String msg = "CopyOnWrite stats : Skipped copying {} files with 
total size {}";
+            String msg = "[COW][{}] CopyOnWrite stats : Skipped copying {} 
files with total size {}";
             if (reindexMode || skippedFilesSize > 10 * FileUtils.ONE_MB){
-                log.info(msg, skippedFiles.size(), 
humanReadableByteCount(skippedFilesSize));
+                log.info(msg, indexPathForLogging, skippedFiles.size(), 
humanReadableByteCount(skippedFilesSize));
             } else {
-                log.debug(msg, skippedFiles.size(), 
humanReadableByteCount(skippedFilesSize));
+                log.debug(msg,indexPathForLogging, skippedFiles.size(), 
humanReadableByteCount(skippedFilesSize));
             }
 
             if (log.isTraceEnabled()){
-                log.trace("File listing - Upon completion {}", 
Arrays.toString(remote.listAll()));
+                log.trace("[COW][{}] File listing - Upon completion {}", 
indexPathForLogging, Arrays.toString(remote.listAll()));
             }
 
             local.close();
             remote.close();
+            sharedWorkingSet.clear();
         }
 
         @Override
         public String toString() {
-            return String.format("[COW] Local %s, Remote %s", local, remote);
+            return String.format("[COW][%s] Local %s, Remote %s", 
indexPathForLogging, local, remote);
         }
 
         private long getSkippedFilesSize() {
@@ -741,7 +794,7 @@ public class IndexCopier implements Copy
             }
 
             if (log.isTraceEnabled()){
-                log.trace("File listing - Start" + 
Arrays.toString(remote.listAll()));
+                log.trace("[COW][{}] File listing - At start {}", 
indexPathForLogging, Arrays.toString(remote.listAll()));
             }
         }
 
@@ -753,7 +806,7 @@ public class IndexCopier implements Copy
                     scheduledForCopyCount.decrementAndGet();
                     if (deletedFilesLocal.contains(name)){
                         skippedFiles.add(name);
-                        log.trace("[COW] Skip copying of deleted file {}", 
name);
+                        log.trace("[COW][{}] Skip copying of deleted file {}", 
indexPathForLogging, name);
                         return null;
                     }
                     long fileSize = local.fileLength(name);
@@ -764,7 +817,7 @@ public class IndexCopier implements Copy
                     local.copy(remote, name, name, IOContext.DEFAULT);
 
                     doneCopy(file, start);
-                    PERF_LOGGER.end(perfStart, 0, "Copied to remote {} ", 
name);
+                    PERF_LOGGER.end(perfStart, 0, "[COW][{}] Copied to remote 
{} ",indexPathForLogging, name);
                     return null;
                 }
 
@@ -780,7 +833,7 @@ public class IndexCopier implements Copy
                 @Override
                 public Void call() throws Exception {
                     if (!skippedFiles.contains(name)) {
-                        log.trace("[COW] Marking as deleted {}", name);
+                        log.trace("[COW][{}] Marking as deleted {}", 
indexPathForLogging, name);
                         remote.deleteFile(name);
                     }
                     return null;
@@ -895,7 +948,7 @@ public class IndexCopier implements Copy
 
             @Override
             public IndexOutput createOutput(IOContext context) throws 
IOException {
-                log.debug("[COW] Creating output {}", name);
+                log.debug("[COW][{}] Creating output {}", indexPathForLogging, 
name);
                 return new CopyOnCloseIndexOutput(local.createOutput(name, 
context));
             }
 
@@ -981,7 +1034,7 @@ public class IndexCopier implements Copy
         } catch (IOException e) {
             failedToDelete(file);
             log.debug("Error occurred while removing deleted file {} from 
Local {}. " +
-                    "Attempt would be maid to delete it on next run ", 
fileName, dir, e);
+                    "Attempt would be made to delete it on next run ", 
fileName, dir, e);
         }
         return successFullyDeleted;
     }

Modified: 
jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1691338&r1=1691337&r2=1691338&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
 (original)
+++ 
jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
 Thu Jul 16 08:51:46 2015
@@ -54,10 +54,12 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.RAMDirectory;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -894,6 +896,66 @@ public class IndexCopierTest {
         assertNotNull("Close should have thrown an exception", error.get());
     }
 
+    /**
+     * Test the interaction between COR and COW using same underlying directory
+     */
+    @Test
+    public void cowConcurrentAccess() throws Exception{
+        CollectingExecutor executor = new CollectingExecutor();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        executor.setForwardingExecutor(executorService);
+
+        Directory baseDir = new CloseSafeDir();
+        String indexPath = "/foo";
+        builder.setProperty(LuceneIndexConstants.INDEX_PATH, indexPath);
+        IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
+        IndexCopier copier = new RAMIndexCopier(baseDir, executor, 
getWorkDir(), true);
+
+        Directory remote = new CloseSafeDir();
+        byte[] f1 = writeFile(remote, "f1");
+
+        Directory cor1 = copier.wrapForRead(indexPath, defn, remote);
+        readAndAssert(cor1, "f1", f1);
+        cor1.close();
+
+        final CountDownLatch pauseCopyLatch = new CountDownLatch(1);
+        Directory remote2 = new FilterDirectory(remote) {
+            @Override
+            public IndexOutput createOutput(String name, IOContext context) 
throws IOException {
+                try {
+                    pauseCopyLatch.await();
+                } catch (InterruptedException ignore) {
+
+                }
+                return super.createOutput(name, context);
+            }
+        };
+
+        //Start copying a file to remote via COW
+        Directory cow1 = copier.wrapForWrite(defn, remote2, false);
+        byte[] f2 = writeFile(cow1, "f2");
+
+        //Before copy is done to remote lets delete f1 from remote and
+        //open a COR and close it such that it triggers delete of f1
+        remote.deleteFile("f1");
+        Directory cor2 = copier.wrapForRead(indexPath, defn, remote);
+
+        //Ensure that deletion task submitted to executor get processed 
immediately
+        executor.enableImmediateExecution();
+        cor2.close();
+        executor.enableDelayedExecution();
+
+        assertFalse(baseDir.fileExists("f1"));
+        assertFalse("f2 should not have been copied to remote so far", 
remote.fileExists("f2"));
+        assertTrue("f2 should exist", baseDir.fileExists("f2"));
+
+        pauseCopyLatch.countDown();
+        cow1.close();
+        assertTrue("f2 should exist", remote.fileExists("f2"));
+
+        executorService.shutdown();
+    }
+
     private byte[] writeFile(Directory dir, String name) throws IOException {
         byte[] data = randomBytes(rnd.nextInt(maxFileSize) + 1);
         IndexOutput o = dir.createOutput(name, IOContext.DEFAULT);
@@ -972,20 +1034,22 @@ public class IndexCopierTest {
 
     private static class CollectingExecutor implements Executor {
         final BlockingQueue<Runnable> commands = new 
LinkedBlockingQueue<Runnable>();
-        private boolean immediateExecution = false;
+        private volatile boolean immediateExecution = false;
         private volatile Executor forwardingExecutor;
 
         @Override
         public void execute(Runnable command) {
+            if (immediateExecution){
+                command.run();
+                return;
+            }
+
             if (forwardingExecutor != null){
                 forwardingExecutor.execute(command);
                 return;
             }
-            if (immediateExecution){
-                command.run();
-            } else {
-                commands.add(command);
-            }
+
+            commands.add(command);
         }
 
         void executeAll(){


Reply via email to