Author: chetanm
Date: Mon Nov  7 08:46:13 2016
New Revision: 1768450

URL: http://svn.apache.org/viewvc?rev=1768450&view=rev
Log:
OAK-5075 - Refactor IndexCopier to make it more modular

Added:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
   (with props)
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
   (with props)
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java
   (with props)
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1768450&r1=1768449&r2=1768450&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
 Mon Nov  7 08:46:13 2016
@@ -21,27 +21,15 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.io.Closeable;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.openmbean.CompositeDataSupport;
 import javax.management.openmbean.CompositeType;
@@ -54,26 +42,22 @@ import javax.management.openmbean.Tabula
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.io.FileUtils;
-import org.apache.jackrabbit.oak.commons.IOUtils;
-import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
+import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory;
+import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnWriteDirectory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryUtils;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.IndexRootDirectory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexDir;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexFile;
 import org.apache.jackrabbit.oak.util.PerfLogger;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.FilterDirectory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.NoLockFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.toArray;
 import static com.google.common.collect.Iterables.transform;
@@ -82,12 +66,11 @@ import static com.google.common.collect.
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 
 public class IndexCopier implements CopyOnReadStatsMBean, Closeable {
-    private static final Set<String> REMOTE_ONLY = 
ImmutableSet.of("segments.gen");
+    public static final Set<String> REMOTE_ONLY = 
ImmutableSet.of("segments.gen");
     private static final int MAX_FAILURE_ENTRIES = 10000;
     private static final String WORK_DIR_NAME = "indexWriterDir";
 
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final PerfLogger PERF_LOGGER = new 
PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf"));
     private final Executor executor;
     private final File indexWorkDir;
 
@@ -134,13 +117,13 @@ public class IndexCopier implements Copy
     public Directory wrapForRead(String indexPath, IndexDefinition definition,
                                  Directory remote, String dirName) throws 
IOException {
         Directory local = createLocalDirForIndexReader(indexPath, definition, 
dirName);
-        return new CopyOnReadDirectory(remote, local, prefetchEnabled, 
indexPath, getSharedWorkingSet(indexPath));
+        return new CopyOnReadDirectory(this, remote, local, prefetchEnabled, 
indexPath, getSharedWorkingSet(indexPath), executor);
     }
 
     public Directory wrapForWrite(IndexDefinition definition, Directory 
remote, boolean reindexMode, String dirName) throws IOException {
         Directory local = createLocalDirForIndexWriter(definition, dirName);
-        return new CopyOnWriteDirectory(remote, local, reindexMode,
-                getIndexPathForLogging(definition), 
getSharedWorkingSet(definition.getIndexPathFromConfig()));
+        return new CopyOnWriteDirectory(this, remote, local, reindexMode,
+                getIndexPathForLogging(definition), 
getSharedWorkingSet(definition.getIndexPathFromConfig()), executor);
     }
 
     @Override
@@ -148,6 +131,10 @@ public class IndexCopier implements Copy
         this.closed = true;
     }
 
+    public boolean isClosed() {
+        return closed;
+    }
+
     File getIndexWorkDir() {
         return indexWorkDir;
     }
@@ -209,7 +196,7 @@ public class IndexCopier implements Copy
         }
 
         if (fileExisted){
-            garbageCollectedSize.addAndGet(file.size);
+            garbageCollectedSize.addAndGet(file.getSize());
             deletedFileCount.incrementAndGet();
         }
     }
@@ -262,762 +249,8 @@ public class IndexCopier implements Copy
         return indexPath.concat(dirName);
     }
 
-    /**
-     * Directory implementation which lazily copies the index files from a
-     * remote directory in background.
-     */
-    class CopyOnReadDirectory extends FilterDirectory {
-        private final Directory remote;
-        private final Directory local;
-        private final String indexPath;
-
-        private final ConcurrentMap<String, CORFileReference> files = 
newConcurrentMap();
-        /**
-         * Set of fileNames bound to current local dir. It is updated with any 
new file
-         * which gets added by this directory
-         */
-        private final Set<String> localFileNames = Sets.newConcurrentHashSet();
-
-        public CopyOnReadDirectory(Directory remote, Directory local, boolean 
prefetch,
-                                   String indexPath, Set<String> 
sharedWorkingSet) throws IOException {
-            super(remote);
-            this.remote = remote;
-            this.local = local;
-            this.indexPath = indexPath;
-
-            this.localFileNames.addAll(Arrays.asList(local.listAll()));
-            //Remove files which are being worked upon by COW
-            this.localFileNames.removeAll(sharedWorkingSet);
-
-            if (prefetch) {
-                prefetchIndexFiles();
-            }
-        }
-
-        @Override
-        public void deleteFile(String name) throws IOException {
-            throw new UnsupportedOperationException("Cannot delete in a 
ReadOnly directory");
-        }
-
-        @Override
-        public IndexOutput createOutput(String name, IOContext context) throws 
IOException {
-            throw new UnsupportedOperationException("Cannot write in a 
ReadOnly directory");
-        }
-
-        @Override
-        public IndexInput openInput(String name, IOContext context) throws 
IOException {
-            if (REMOTE_ONLY.contains(name)) {
-                log.trace("[{}] opening remote only file {}", indexPath, name);
-                return remote.openInput(name, context);
-            }
-
-            CORFileReference ref = files.get(name);
-            if (ref != null) {
-                if (ref.isLocalValid()) {
-                    log.trace("[{}] opening existing local file {}", 
indexPath, name);
-                    return files.get(name).openLocalInput(context);
-                } else {
-                    readerRemoteReadCount.incrementAndGet();
-                    log.trace(
-                            "[{}] opening existing remote file as local 
version is not valid {}",
-                            indexPath, name);
-                    return remote.openInput(name, context);
-                }
-            }
-
-            //If file does not exist then just delegate to remote and not
-            //schedule a copy task
-            if (!remote.fileExists(name)){
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Looking for non existent file {}. Current 
known files {}",
-                            indexPath, name, 
Arrays.toString(remote.listAll()));
-                }
-                return remote.openInput(name, context);
-            }
-
-            CORFileReference toPut = new CORFileReference(name);
-            CORFileReference old = files.putIfAbsent(name, toPut);
-            if (old == null) {
-                log.trace("[{}] scheduled local copy for {}", indexPath, name);
-                copy(toPut);
-            }
-
-            //If immediate executor is used the result would be ready right 
away
-            if (toPut.isLocalValid()) {
-                log.trace("[{}] opening new local file {}", indexPath, name);
-                return toPut.openLocalInput(context);
-            }
-
-            log.trace("[{}] opening new remote file {}", indexPath, name);
-            readerRemoteReadCount.incrementAndGet();
-            return remote.openInput(name, context);
-        }
-
-        Directory getLocal() {
-            return local;
-        }
-
-        private void copy(final CORFileReference reference) {
-            updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
-            executor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    scheduledForCopyCount.decrementAndGet();
-                    copyFilesToLocal(reference, true, true);
-                }
-            });
-        }
-
-        private void prefetchIndexFiles() throws IOException {
-            long start = PERF_LOGGER.start();
-            long totalSize = 0;
-            int copyCount = 0;
-            List<String> copiedFileNames = Lists.newArrayList();
-            for (String name : remote.listAll()) {
-                if (REMOTE_ONLY.contains(name)) {
-                    continue;
-                }
-                CORFileReference fileRef = new CORFileReference(name);
-                files.putIfAbsent(name, fileRef);
-                long fileSize = copyFilesToLocal(fileRef, false, false);
-                if (fileSize > 0) {
-                    copyCount++;
-                    totalSize += fileSize;
-                    copiedFileNames.add(name);
-                }
-            }
-
-            local.sync(copiedFileNames);
-            PERF_LOGGER.end(start, -1, "[{}] Copied {} files totaling {}", 
indexPath, copyCount, humanReadableByteCount(totalSize));
-        }
-
-        private long copyFilesToLocal(CORFileReference reference, boolean 
sync, boolean logDuration) {
-            String name = reference.name;
-            boolean success = false;
-            boolean copyAttempted = false;
-            long fileSize = 0;
-            try {
-                if (!local.fileExists(name)) {
-                    long perfStart = -1;
-                    if (logDuration) {
-                        perfStart = PERF_LOGGER.start();
-                    }
-
-                    fileSize = remote.fileLength(name);
-                    LocalIndexFile file = new LocalIndexFile(local, name, 
fileSize, true);
-                    long start = startCopy(file);
-                    copyAttempted = true;
-
-                    remote.copy(local, name, name, IOContext.READ);
-                    reference.markValid();
-
-                    if (sync) {
-                        local.sync(Collections.singleton(name));
-                    }
-
-                    doneCopy(file, start);
-                    if (logDuration) {
-                        PERF_LOGGER.end(perfStart, 0,
-                                "[{}] Copied file {} of size {}", indexPath,
-                                name, humanReadableByteCount(fileSize));
-                    }
-                } else {
-                    long localLength = local.fileLength(name);
-                    long remoteLength = remote.fileLength(name);
-
-                    //Do a simple consistency check. Ideally Lucene index 
files are never
-                    //updated but still do a check if the copy is consistent
-                    if (localLength != remoteLength) {
-                        log.warn("[{}] Found local copy for {} in {} but size 
of local {} differs from remote {}. " +
-                                        "Content would be read from remote 
file only",
-                                indexPath, name, local, localLength, 
remoteLength);
-                        invalidFileCount.incrementAndGet();
-                    } else {
-                        reference.markValid();
-                        log.trace("[{}] found local copy of file {}",
-                                indexPath, name);
-                    }
-                }
-                success = true;
-            } catch (IOException e) {
-                //TODO In case of exception there would not be any other 
attempt
-                //to download the file. Look into support for retry
-                log.warn("[{}] Error occurred while copying file [{}] from {} 
to {}", indexPath, name, remote, local, e);
-            } finally {
-                if (copyAttempted && !success){
-                    try {
-                        if (local.fileExists(name)) {
-                            local.deleteFile(name);
-                        }
-                    } catch (IOException e) {
-                        log.warn("[{}] Error occurred while deleting corrupted 
file [{}] from [{}]", indexPath, name, local, e);
-                    }
-                }
-            }
-            return fileSize;
-        }
-
-        /**
-         * On close file which are not present in remote are removed from 
local.
-         * CopyOnReadDir is opened at different revisions of the index state
-         *
-         * CDir1 - V1
-         * CDir2 - V2
-         *
-         * Its possible that two different IndexSearcher are opened at same 
local
-         * directory but pinned to different revisions. So while removing it 
must
-         * be ensured that any currently opened IndexSearcher does not get 
affected.
-         * The way IndexSearchers get created in IndexTracker it ensures that 
new searcher
-         * pinned to newer revision gets opened first and then existing ones 
are closed.
-         *
-         *
-         * @throws IOException
-         */
-        @Override
-        public void close() throws IOException {
-            //Always remove old index file on close as it ensures that
-            //no other IndexSearcher are opened with previous revision of 
Index due to
-            //way IndexTracker closes IndexNode. At max there would be only 
two IndexNode
-            //opened pinned to different revision of same Lucene index
-            executor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    try{
-                        removeDeletedFiles();
-                    } catch (IOException e) {
-                        log.warn(
-                                "[{}] Error occurred while removing deleted 
files from Local {}, Remote {}",
-                                indexPath, local, remote, e);
-                    }
-
-                    try {
-                        //This would also remove old index files if current
-                        //directory was based on newerRevision as local would
-                        //be of type DeleteOldDirOnClose
-                        local.close();
-                        remote.close();
-                    } catch (IOException e) {
-                        log.warn(
-                                "[{}] Error occurred while closing directory ",
-                                indexPath, e);
-                    }
-                }
-            });
-        }
-
-        @Override
-        public String toString() {
-            return String.format("[COR] Local %s, Remote %s", local, remote);
-        }
-
-        private void removeDeletedFiles() throws IOException {
-            //Files present in dest but not present in source have to be 
deleted
-            Set<String> filesToBeDeleted = Sets.difference(
-                    ImmutableSet.copyOf(localFileNames),
-                    ImmutableSet.copyOf(remote.listAll())
-            );
-
-            Set<String> failedToDelete = Sets.newHashSet();
-
-            for (String fileName : filesToBeDeleted) {
-                boolean deleted = IndexCopier.this.deleteFile(local, fileName, 
true);
-                if (!deleted){
-                    failedToDelete.add(fileName);
-                }
-            }
-
-            filesToBeDeleted = new HashSet<String>(filesToBeDeleted);
-            filesToBeDeleted.removeAll(failedToDelete);
-            if(!filesToBeDeleted.isEmpty()) {
-                log.debug(
-                        "[{}] Following files have been removed from Lucene 
index directory {}",
-                        indexPath, filesToBeDeleted);
-            }
-        }
-
-        private class CORFileReference {
-            final String name;
-            private volatile boolean valid;
-
-            private CORFileReference(String name) {
-                this.name = name;
-            }
-
-            boolean isLocalValid(){
-                return valid;
-            }
-
-            IndexInput openLocalInput( IOContext context) throws IOException {
-                readerLocalReadCount.incrementAndGet();
-                return local.openInput(name, context);
-            }
-
-            void markValid(){
-                this.valid = true;
-                localFileNames.add(name);
-            }
-        }
-    }
-
-    private class CopyOnWriteDirectory extends FilterDirectory {
-        /**
-         * Signal for the background thread to stop processing changes.
-         */
-        private final Callable<Void> STOP = new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                return null;
-            }
-        };
-        private final Directory remote;
-        private final Directory local;
-        private final ConcurrentMap<String, COWFileReference> fileMap = 
newConcurrentMap();
-        private final Set<String> deletedFilesLocal = 
Sets.newConcurrentHashSet();
-        private final Set<String> skippedFiles = Sets.newConcurrentHashSet();
-
-        private final BlockingQueue<Callable<Void>> queue = new 
LinkedBlockingQueue<Callable<Void>>();
-        private final AtomicReference<Throwable> errorInCopy = new 
AtomicReference<Throwable>();
-        private final CountDownLatch copyDone = new CountDownLatch(1);
-        private final boolean reindexMode;
-        private final String indexPathForLogging;
-        private final Set<String> sharedWorkingSet;
-
-        /**
-         * Current background task
-         */
-        private volatile NotifyingFutureTask currentTask =  
NotifyingFutureTask.completed();
-
-        /**
-         * Completion handler: set the current task to the next task and 
schedules that one
-         * on the background thread.
-         */
-        private final Runnable completionHandler = new Runnable() {
-            Callable<Void> task = new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    try {
-                        Callable<Void> task = queue.poll();
-                        if (task != null && task != STOP) {
-                            if (errorInCopy.get() != null) {
-                                log.trace("[COW][{}] Skipping task {} as some 
exception occurred in previous run",
-                                        indexPathForLogging, task);
-                            } else {
-                                task.call();
-                            }
-                            currentTask.onComplete(completionHandler);
-                        }
-
-                        //Signal that all tasks completed
-                        if (task == STOP){
-                            copyDone.countDown();
-                        }
-                    } catch (Throwable t) {
-                        errorInCopy.set(t);
-                        log.debug("[COW][{}] Error occurred while copying 
files. Further processing would " +
-                                "be skipped", indexPathForLogging, t);
-                        currentTask.onComplete(completionHandler);
-                    }
-                    return null;
-                }
-            };
-
-            @Override
-            public void run() {
-                currentTask = new NotifyingFutureTask(task);
-                try {
-                    executor.execute(currentTask);
-                } catch (RejectedExecutionException e){
-                    checkIfClosed(false);
-                    throw e;
-                }
-            }
-        };
-
-        public CopyOnWriteDirectory(Directory remote, Directory local, boolean 
reindexMode,
-                                    String indexPathForLogging, Set<String> 
sharedWorkingSet) throws IOException {
-            super(local);
-            this.remote = remote;
-            this.local = local;
-            this.indexPathForLogging = indexPathForLogging;
-            this.reindexMode = reindexMode;
-            this.sharedWorkingSet = sharedWorkingSet;
-            initialize();
-        }
-
-        @Override
-        public String[] listAll() throws IOException {
-            return Iterables.toArray(fileMap.keySet(), String.class);
-        }
-
-        @Override
-        public boolean fileExists(String name) throws IOException {
-            return fileMap.containsKey(name);
-        }
-
-        @Override
-        public void deleteFile(String name) throws IOException {
-            log.trace("[COW][{}] Deleted file {}", indexPathForLogging, name);
-            COWFileReference ref = fileMap.remove(name);
-            if (ref != null) {
-                ref.delete();
-            }
-        }
-
-        @Override
-        public long fileLength(String name) throws IOException {
-            COWFileReference ref = fileMap.get(name);
-            if (ref == null) {
-                throw new FileNotFoundException(name);
-            }
-            return ref.fileLength();
-        }
-
-        @Override
-        public IndexOutput createOutput(String name, IOContext context) throws 
IOException {
-            COWFileReference ref = fileMap.remove(name);
-            if (ref != null) {
-                ref.delete();
-            }
-            ref = new COWLocalFileReference(name);
-            fileMap.put(name, ref);
-            sharedWorkingSet.add(name);
-            return ref.createOutput(context);
-        }
-
-        @Override
-        public void sync(Collection<String> names) throws IOException {
-            for (String name : names){
-                COWFileReference file = fileMap.get(name);
-                if (file != null){
-                    file.sync();
-                }
-            }
-        }
-
-        @Override
-        public IndexInput openInput(String name, IOContext context) throws 
IOException {
-            COWFileReference ref = fileMap.get(name);
-            if (ref == null) {
-                throw new FileNotFoundException(name);
-            }
-            return ref.openInput(context);
-        }
-
-        @Override
-        public void close() throws IOException {
-            int pendingCopies = queue.size();
-            addTask(STOP);
-
-            //Wait for all pending copy task to finish
-            try {
-                long start = PERF_LOGGER.start();
-
-                //Loop untill queue finished or IndexCopier
-                //found to be closed. Doing it with timeout to
-                //prevent any bug causing the thread to wait indefinitely
-                while (!copyDone.await(10, TimeUnit.SECONDS)) {
-                    if (closed) {
-                        throw new IndexCopierClosedException("IndexCopier 
found to be closed " +
-                                "while processing copy task for" + 
remote.toString());
-                    }
-                }
-                PERF_LOGGER.end(start, -1, "[COW][{}] Completed pending 
copying task {}", indexPathForLogging, pendingCopies);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new IOException(e);
-            }
-
-            Throwable t = errorInCopy.get();
-            if (t != null){
-                throw new IOException("Error occurred while copying files for 
" + indexPathForLogging, t);
-            }
-
-            //Sanity check
-            checkArgument(queue.isEmpty(), "Copy queue still " +
-                    "has pending task left [%d]. %s", queue.size(), queue);
-
-            long skippedFilesSize = getSkippedFilesSize();
-
-            for (String fileName : deletedFilesLocal){
-                deleteLocalFile(fileName);
-            }
-
-            skippedFromUploadSize.addAndGet(skippedFilesSize);
-
-            String msg = "[COW][{}] CopyOnWrite stats : Skipped copying {} 
files with total size {}";
-            if ((reindexMode && skippedFilesSize > 0) || skippedFilesSize > 10 
* FileUtils.ONE_MB){
-                log.info(msg, indexPathForLogging, skippedFiles.size(), 
humanReadableByteCount(skippedFilesSize));
-            } else {
-                log.debug(msg,indexPathForLogging, skippedFiles.size(), 
humanReadableByteCount(skippedFilesSize));
-            }
-
-            if (log.isTraceEnabled()){
-                log.trace("[COW][{}] File listing - Upon completion {}", 
indexPathForLogging, Arrays.toString(remote.listAll()));
-            }
-
-            local.close();
-            remote.close();
-            sharedWorkingSet.clear();
-        }
-
-        @Override
-        public String toString() {
-            return String.format("[COW][%s] Local %s, Remote %s", 
indexPathForLogging, local, remote);
-        }
-
-        private long getSkippedFilesSize() {
-            long size = 0;
-            for (String name : skippedFiles){
-                try{
-                    if (local.fileExists(name)){
-                        size += local.fileLength(name);
-                    }
-                } catch (Exception ignore){
-
-                }
-            }
-            return size;
-        }
-
-        private void deleteLocalFile(String fileName) {
-            IndexCopier.this.deleteFile(local, fileName, false);
-        }
-
-        private void initialize() throws IOException {
-            for (String name : remote.listAll()) {
-                fileMap.put(name, new COWRemoteFileReference(name));
-            }
-
-            if (log.isTraceEnabled()){
-                log.trace("[COW][{}] File listing - At start {}", 
indexPathForLogging, Arrays.toString(remote.listAll()));
-            }
-        }
-
-        private void addCopyTask(final String name){
-            updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
-            addTask(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    scheduledForCopyCount.decrementAndGet();
-                    if (deletedFilesLocal.contains(name)){
-                        skippedFiles.add(name);
-                        log.trace("[COW][{}] Skip copying of deleted file {}", 
indexPathForLogging, name);
-                        return null;
-                    }
-                    long fileSize = local.fileLength(name);
-                    LocalIndexFile file = new LocalIndexFile(local, name, 
fileSize, false);
-                    long perfStart = PERF_LOGGER.start();
-                    long start = startCopy(file);
-
-                    local.copy(remote, name, name, IOContext.DEFAULT);
-
-                    doneCopy(file, start);
-                    PERF_LOGGER.end(perfStart, 0, "[COW][{}] Copied to remote 
{} -- size: {}",
-                        indexPathForLogging, name, 
IOUtils.humanReadableByteCount(fileSize));
-                    return null;
-                }
-
-                @Override
-                public String toString() {
-                    return "Copy: " + name;
-                }
-            });
-        }
-
-        private void addDeleteTask(final String name){
-            addTask(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    if (!skippedFiles.contains(name)) {
-                        log.trace("[COW][{}] Marking as deleted {}", 
indexPathForLogging, name);
-                        remote.deleteFile(name);
-                    }
-                    return null;
-                }
-
-                @Override
-                public String toString() {
-                    return "Delete : " + name;
-                }
-            });
-        }
-
-        private void addTask(Callable<Void> task){
-            checkIfClosed(true);
-            queue.add(task);
-            currentTask.onComplete(completionHandler);
-        }
-
-        private void checkIfClosed(boolean throwException) {
-            if (closed) {
-                IndexCopierClosedException e = new 
IndexCopierClosedException("IndexCopier found to be closed " +
-                        "while processing" +remote.toString());
-                errorInCopy.set(e);
-                copyDone.countDown();
-
-                if (throwException) {
-                    throw e;
-                }
-            }
-        }
-
-        private abstract class COWFileReference {
-            protected final String name;
-
-            public COWFileReference(String name) {
-                this.name = name;
-            }
-
-            public abstract long fileLength() throws IOException;
-
-            public abstract IndexInput openInput(IOContext context) throws 
IOException;
-
-            public abstract IndexOutput createOutput(IOContext context) throws 
IOException;
-
-            public abstract void delete() throws IOException;
-
-            public void sync() throws IOException {
-
-            }
-        }
-
-        private class COWRemoteFileReference extends COWFileReference {
-            private boolean validLocalCopyPresent;
-            private final long length;
-
-            public COWRemoteFileReference(String name) throws IOException {
-                super(name);
-                this.length = remote.fileLength(name);
-            }
-
-            @Override
-            public long fileLength() throws IOException {
-                return length;
-            }
-
-            @Override
-            public IndexInput openInput(IOContext context) throws IOException {
-                checkIfLocalValid();
-                if (validLocalCopyPresent && !REMOTE_ONLY.contains(name)) {
-                    writerLocalReadCount.incrementAndGet();
-                    return local.openInput(name, context);
-                }
-                writerRemoteReadCount.incrementAndGet();
-                return remote.openInput(name, context);
-            }
-
-            @Override
-            public IndexOutput createOutput(IOContext context) throws 
IOException {
-                throw new UnsupportedOperationException("Cannot create output 
for existing remote file " + name);
-            }
-
-            @Override
-            public void delete() throws IOException {
-                //Remote file should not be deleted locally as it might be
-                //in use by existing opened IndexSearcher. It would anyway
-                //get deleted by CopyOnRead later
-                //For now just record that these need to be deleted to avoid
-                //potential concurrent access of the NodeBuilder
-                addDeleteTask(name);
-            }
-
-            private void checkIfLocalValid() throws IOException {
-                validLocalCopyPresent = local.fileExists(name)
-                        && local.fileLength(name) == remote.fileLength(name);
-            }
-        }
-
-        private class COWLocalFileReference extends COWFileReference {
-            public COWLocalFileReference(String name) {
-                super(name);
-            }
-
-            @Override
-            public long fileLength() throws IOException {
-                return local.fileLength(name);
-            }
-
-            @Override
-            public IndexInput openInput(IOContext context) throws IOException {
-                return local.openInput(name, context);
-            }
-
-            @Override
-            public IndexOutput createOutput(IOContext context) throws 
IOException {
-                log.debug("[COW][{}] Creating output {}", indexPathForLogging, 
name);
-                return new CopyOnCloseIndexOutput(local.createOutput(name, 
context));
-            }
-
-            @Override
-            public void delete() throws IOException {
-                addDeleteTask(name);
-                deletedFilesLocal.add(name);
-            }
-
-            @Override
-            public void sync() throws IOException {
-                local.sync(Collections.singleton(name));
-            }
-
-            /**
-             * Implementation note - As we are decorating existing 
implementation
-             * we would need to ensure that we also override methods (non 
abstract)
-             * which might be implemented in say FSIndexInput like setLength
-             */
-            private class CopyOnCloseIndexOutput extends IndexOutput {
-                private final IndexOutput delegate;
-
-                public CopyOnCloseIndexOutput(IndexOutput delegate) {
-                    this.delegate = delegate;
-                }
-
-                @Override
-                public void flush() throws IOException {
-                    delegate.flush();
-                }
-
-                @Override
-                public void close() throws IOException {
-                    delegate.close();
-                    //Schedule this file to be copied in background
-                    addCopyTask(name);
-                }
-
-                @Override
-                public long getFilePointer() {
-                    return delegate.getFilePointer();
-                }
-
-                @Override
-                public void seek(long pos) throws IOException {
-                    delegate.seek(pos);
-                }
-
-                @Override
-                public long length() throws IOException {
-                    return delegate.length();
-                }
-
-                @Override
-                public void writeByte(byte b) throws IOException {
-                    delegate.writeByte(b);
-                }
-
-                @Override
-                public void writeBytes(byte[] b, int offset, int length) 
throws IOException {
-                    delegate.writeBytes(b, offset, length);
-                }
-
-                @Override
-                public void setLength(long length) throws IOException {
-                    delegate.setLength(length);
-                }
-            }
-        }
-    }
-
-    private boolean deleteFile(Directory dir, String fileName, boolean 
copiedFromRemote){
-        LocalIndexFile file = new LocalIndexFile(dir, fileName, 
getFileLength(dir, fileName), copiedFromRemote);
+    public boolean deleteFile(Directory dir, String fileName, boolean 
copiedFromRemote){
+        LocalIndexFile file = new LocalIndexFile(dir, fileName, 
DirectoryUtils.getFileLength(dir, fileName), copiedFromRemote);
         boolean successFullyDeleted = false;
         try {
             boolean fileExisted = false;
@@ -1035,24 +268,24 @@ public class IndexCopier implements Copy
         return successFullyDeleted;
     }
 
-    private long startCopy(LocalIndexFile file) {
+    public long startCopy(LocalIndexFile file) {
         updateMaxInProgress(copyInProgressCount.incrementAndGet());
-        copyInProgressSize.addAndGet(file.size);
+        copyInProgressSize.addAndGet(file.getSize());
         copyInProgressFiles.add(file);
         return System.currentTimeMillis();
     }
 
-    private void doneCopy(LocalIndexFile file, long start) {
+    public void doneCopy(LocalIndexFile file, long start) {
         copyInProgressFiles.remove(file);
         copyInProgressCount.decrementAndGet();
-        copyInProgressSize.addAndGet(-file.size);
+        copyInProgressSize.addAndGet(-file.getSize());
 
-        if(file.copyFromRemote) {
+        if(file.isCopyFromRemote()) {
             downloadTime.addAndGet(System.currentTimeMillis() - start);
-            downloadSize.addAndGet(file.size);
+            downloadSize.addAndGet(file.getSize());
             downloadCount.incrementAndGet();
         } else {
-            uploadSize.addAndGet(file.size);
+            uploadSize.addAndGet(file.getSize());
             uploadTime.addAndGet(System.currentTimeMillis() - start);
             uploadCount.incrementAndGet();
         }
@@ -1077,6 +310,7 @@ public class IndexCopier implements Copy
         }
     }
 
+
     private class DeleteOldDirOnClose extends FilterDirectory {
         private final File oldIndexDir;
 
@@ -1109,114 +343,39 @@ public class IndexCopier implements Copy
             return "DeleteOldDirOnClose wrapper for " + getDelegate();
         }
     }
-    
-    static final class LocalIndexFile {
-        final File dir;
-        final String name;
-        final long size;
-        final boolean copyFromRemote;
-        private volatile int deleteAttemptCount;
-        final long creationTime = System.currentTimeMillis();
-        
-        public LocalIndexFile(Directory dir, String fileName,
-                              long size, boolean copyFromRemote){
-            this.copyFromRemote = copyFromRemote;
-            this.dir = getFSDir(dir);
-            this.name = fileName;
-            this.size = size;
-        }
 
-        public LocalIndexFile(Directory dir, String fileName){
-            this(dir, fileName, getFileLength(dir, fileName), true);
-        }
+    //~------------------------------------------< Stats Collection >
 
-        public String getKey(){
-            if (dir != null){
-                return new File(dir, name).getAbsolutePath();
-            }
-            return name;
-        }
-
-        public void incrementAttemptToDelete(){
-            deleteAttemptCount++;
-        }
-
-        public int getDeleteAttemptCount() {
-            return deleteAttemptCount;
-        }
-
-        public String deleteLog(){
-            return String.format("%s (%s, %d attempts, %d s)", name,
-                    humanReadableByteCount(size), deleteAttemptCount, 
timeTaken());
-        }
-
-        public String copyLog(){
-            return String.format("%s (%s, %1.1f%%, %s, %d s)", name,
-                    humanReadableByteCount(actualSize()),
-                    copyProgress(),
-                    humanReadableByteCount(size), timeTaken());
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            LocalIndexFile localIndexFile = (LocalIndexFile) o;
-
-            if (dir != null ? !dir.equals(localIndexFile.dir) : 
localIndexFile.dir != null)
-                return false;
-            return name.equals(localIndexFile.name);
-
-        }
-
-        @Override
-        public int hashCode() {
-            int result = dir != null ? dir.hashCode() : 0;
-            result = 31 * result + name.hashCode();
-            return result;
-        }
-
-        private long timeTaken(){
-            return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() 
- creationTime);
-        }
+    public void skippedUpload(long skippedFilesSize) {
+        skippedFromUploadSize.addAndGet(skippedFilesSize);
+    }
 
-        private float copyProgress(){
-            return actualSize() * 1.0f / size * 100;
-        }
+    public void scheduledForCopy() {
+        updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
+    }
 
-        private long actualSize(){
-            return dir != null ? new File(dir, name).length() : 0;
-        }
+    public void copyDone(){
+        scheduledForCopyCount.decrementAndGet();
     }
 
-    static File getFSDir(Directory dir) {
-        if (dir instanceof FilterDirectory){
-            dir = ((FilterDirectory) dir).getDelegate();
+    public void readFromRemote(boolean reader) {
+        if (reader) {
+            readerRemoteReadCount.incrementAndGet();
+        } else {
+            writerRemoteReadCount.incrementAndGet();
         }
+    }
 
-        if (dir instanceof FSDirectory){
-            return ((FSDirectory) dir).getDirectory();
+    public void readFromLocal(boolean reader) {
+        if (reader) {
+            readerLocalReadCount.incrementAndGet();
+        } else {
+            writerLocalReadCount.incrementAndGet();
         }
-
-        return null;
     }
 
-    /**
-     * Get the file length in best effort basis.
-     * @return actual fileLength. -1 if cannot determine
-     */
-    private static long getFileLength(Directory dir, String fileName){
-        try{
-            //Check for file presence otherwise internally it results in
-            //an exception to be created
-            if (dir.fileExists(fileName)) {
-                return dir.fileLength(fileName);
-            }
-        } catch (Exception ignore){
-
-        }
-        return -1;
+    public void foundInvalidFile(){
+        invalidFileCount.incrementAndGet();
     }
 
     //~------------------------------------------< CopyOnReadStatsMBean >
@@ -1321,7 +480,7 @@ public class IndexCopier implements Copy
     public String getGarbageSize() {
         long garbageSize = 0;
         for (LocalIndexFile failedToDeleteFile : failedToDeleteFiles.values()){
-            garbageSize += failedToDeleteFile.size;
+            garbageSize += failedToDeleteFile.getSize();
         }
         return humanReadableByteCount(garbageSize);
     }

Added: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java?rev=1768450&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
 Mon Nov  7 08:46:13 2016
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.jackrabbit.oak.util.PerfLogger;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.Maps.newConcurrentMap;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+
+/**
+ * Directory implementation which lazily copies the index files from a
+ * remote directory in background.
+ */
+public class CopyOnReadDirectory extends FilterDirectory {
+    private static final Logger log = 
LoggerFactory.getLogger(CopyOnReadDirectory.class);
+    private static final PerfLogger PERF_LOGGER = new 
PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf"));
+    private final IndexCopier indexCopier;
+    private final Directory remote;
+    private final Directory local;
+    private final String indexPath;
+    private final Executor executor;
+
+    private final ConcurrentMap<String, CORFileReference> files = 
newConcurrentMap();
+    /**
+     * Set of fileNames bound to current local dir. It is updated with any new 
file
+     * which gets added by this directory
+     */
+    private final Set<String> localFileNames = Sets.newConcurrentHashSet();
+
+    public CopyOnReadDirectory(IndexCopier indexCopier, Directory remote, 
Directory local, boolean prefetch,
+                               String indexPath, Set<String> sharedWorkingSet, 
Executor executor) throws IOException {
+        super(remote);
+        this.indexCopier = indexCopier;
+        this.executor = executor;
+        this.remote = remote;
+        this.local = local;
+        this.indexPath = indexPath;
+
+        this.localFileNames.addAll(Arrays.asList(local.listAll()));
+        //Remove files which are being worked upon by COW
+        this.localFileNames.removeAll(sharedWorkingSet);
+
+        if (prefetch) {
+            prefetchIndexFiles();
+        }
+    }
+
+    @Override
+    public void deleteFile(String name) throws IOException {
+        throw new UnsupportedOperationException("Cannot delete in a ReadOnly 
directory");
+    }
+
+    @Override
+    public IndexOutput createOutput(String name, IOContext context) throws 
IOException {
+        throw new UnsupportedOperationException("Cannot write in a ReadOnly 
directory");
+    }
+
+    @Override
+    public IndexInput openInput(String name, IOContext context) throws 
IOException {
+        if (IndexCopier.REMOTE_ONLY.contains(name)) {
+            log.trace("[{}] opening remote only file {}", indexPath, name);
+            return remote.openInput(name, context);
+        }
+
+        CORFileReference ref = files.get(name);
+        if (ref != null) {
+            if (ref.isLocalValid()) {
+                log.trace("[{}] opening existing local file {}", indexPath, 
name);
+                return files.get(name).openLocalInput(context);
+            } else {
+                indexCopier.readFromRemote(true);
+                log.trace(
+                        "[{}] opening existing remote file as local version is 
not valid {}",
+                        indexPath, name);
+                return remote.openInput(name, context);
+            }
+        }
+
+        //If file does not exist then just delegate to remote and not
+        //schedule a copy task
+        if (!remote.fileExists(name)){
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Looking for non existent file {}. Current 
known files {}",
+                        indexPath, name, Arrays.toString(remote.listAll()));
+            }
+            return remote.openInput(name, context);
+        }
+
+        CORFileReference toPut = new CORFileReference(name);
+        CORFileReference old = files.putIfAbsent(name, toPut);
+        if (old == null) {
+            log.trace("[{}] scheduled local copy for {}", indexPath, name);
+            copy(toPut);
+        }
+
+        //If immediate executor is used the result would be ready right away
+        if (toPut.isLocalValid()) {
+            log.trace("[{}] opening new local file {}", indexPath, name);
+            return toPut.openLocalInput(context);
+        }
+
+        log.trace("[{}] opening new remote file {}", indexPath, name);
+        indexCopier.readFromRemote(true);
+        return remote.openInput(name, context);
+    }
+
+    public Directory getLocal() {
+        return local;
+    }
+
+    private void copy(final CORFileReference reference) {
+        indexCopier.scheduledForCopy();
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                indexCopier.copyDone();
+                copyFilesToLocal(reference, true, true);
+            }
+        });
+    }
+
+    private void prefetchIndexFiles() throws IOException {
+        long start = PERF_LOGGER.start();
+        long totalSize = 0;
+        int copyCount = 0;
+        List<String> copiedFileNames = Lists.newArrayList();
+        for (String name : remote.listAll()) {
+            if (IndexCopier.REMOTE_ONLY.contains(name)) {
+                continue;
+            }
+            CORFileReference fileRef = new CORFileReference(name);
+            files.putIfAbsent(name, fileRef);
+            long fileSize = copyFilesToLocal(fileRef, false, false);
+            if (fileSize > 0) {
+                copyCount++;
+                totalSize += fileSize;
+                copiedFileNames.add(name);
+            }
+        }
+
+        local.sync(copiedFileNames);
+        PERF_LOGGER.end(start, -1, "[{}] Copied {} files totaling {}", 
indexPath, copyCount, humanReadableByteCount(totalSize));
+    }
+
+    private long copyFilesToLocal(CORFileReference reference, boolean sync, 
boolean logDuration) {
+        String name = reference.name;
+        boolean success = false;
+        boolean copyAttempted = false;
+        long fileSize = 0;
+        try {
+            if (!local.fileExists(name)) {
+                long perfStart = -1;
+                if (logDuration) {
+                    perfStart = PERF_LOGGER.start();
+                }
+
+                fileSize = remote.fileLength(name);
+                LocalIndexFile file = new LocalIndexFile(local, name, 
fileSize, true);
+                long start = indexCopier.startCopy(file);
+                copyAttempted = true;
+
+                remote.copy(local, name, name, IOContext.READ);
+                reference.markValid();
+
+                if (sync) {
+                    local.sync(Collections.singleton(name));
+                }
+
+                indexCopier.doneCopy(file, start);
+                if (logDuration) {
+                    PERF_LOGGER.end(perfStart, 0,
+                            "[{}] Copied file {} of size {}", indexPath,
+                            name, humanReadableByteCount(fileSize));
+                }
+            } else {
+                long localLength = local.fileLength(name);
+                long remoteLength = remote.fileLength(name);
+
+                //Do a simple consistency check. Ideally Lucene index files 
are never
+                //updated but still do a check if the copy is consistent
+                if (localLength != remoteLength) {
+                    log.warn("[{}] Found local copy for {} in {} but size of 
local {} differs from remote {}. " +
+                                    "Content would be read from remote file 
only",
+                            indexPath, name, local, localLength, remoteLength);
+                    indexCopier.foundInvalidFile();
+                } else {
+                    reference.markValid();
+                    log.trace("[{}] found local copy of file {}",
+                            indexPath, name);
+                }
+            }
+            success = true;
+        } catch (IOException e) {
+            //TODO In case of exception there would not be any other attempt
+            //to download the file. Look into support for retry
+            log.warn("[{}] Error occurred while copying file [{}] from {} to 
{}", indexPath, name, remote, local, e);
+        } finally {
+            if (copyAttempted && !success){
+                try {
+                    if (local.fileExists(name)) {
+                        local.deleteFile(name);
+                    }
+                } catch (IOException e) {
+                    log.warn("[{}] Error occurred while deleting corrupted 
file [{}] from [{}]", indexPath, name, local, e);
+                }
+            }
+        }
+        return fileSize;
+    }
+
+    /**
+     * On close file which are not present in remote are removed from local.
+     * CopyOnReadDir is opened at different revisions of the index state
+     *
+     * CDir1 - V1
+     * CDir2 - V2
+     *
+     * Its possible that two different IndexSearcher are opened at same local
+     * directory but pinned to different revisions. So while removing it must
+     * be ensured that any currently opened IndexSearcher does not get 
affected.
+     * The way IndexSearchers get created in IndexTracker it ensures that new 
searcher
+     * pinned to newer revision gets opened first and then existing ones are 
closed.
+     *
+     *
+     */
+    @Override
+    public void close() throws IOException {
+        //Always remove old index file on close as it ensures that
+        //no other IndexSearcher are opened with previous revision of Index 
due to
+        //way IndexTracker closes IndexNode. At max there would be only two 
IndexNode
+        //opened pinned to different revision of same Lucene index
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try{
+                    removeDeletedFiles();
+                } catch (IOException e) {
+                    log.warn(
+                            "[{}] Error occurred while removing deleted files 
from Local {}, Remote {}",
+                            indexPath, local, remote, e);
+                }
+
+                try {
+                    //This would also remove old index files if current
+                    //directory was based on newerRevision as local would
+                    //be of type DeleteOldDirOnClose
+                    local.close();
+                    remote.close();
+                } catch (IOException e) {
+                    log.warn(
+                            "[{}] Error occurred while closing directory ",
+                            indexPath, e);
+                }
+            }
+        });
+    }
+
+    @Override
+    public String toString() {
+        return String.format("[COR] Local %s, Remote %s", local, remote);
+    }
+
+    private void removeDeletedFiles() throws IOException {
+        //Files present in dest but not present in source have to be deleted
+        Set<String> filesToBeDeleted = Sets.difference(
+                ImmutableSet.copyOf(localFileNames),
+                ImmutableSet.copyOf(remote.listAll())
+        );
+
+        Set<String> failedToDelete = Sets.newHashSet();
+
+        for (String fileName : filesToBeDeleted) {
+            boolean deleted = indexCopier.deleteFile(local, fileName, true);
+            if (!deleted){
+                failedToDelete.add(fileName);
+            }
+        }
+
+        filesToBeDeleted = new HashSet<String>(filesToBeDeleted);
+        filesToBeDeleted.removeAll(failedToDelete);
+        if(!filesToBeDeleted.isEmpty()) {
+            log.debug(
+                    "[{}] Following files have been removed from Lucene index 
directory {}",
+                    indexPath, filesToBeDeleted);
+        }
+    }
+
+    private class CORFileReference {
+        final String name;
+        private volatile boolean valid;
+
+        private CORFileReference(String name) {
+            this.name = name;
+        }
+
+        boolean isLocalValid(){
+            return valid;
+        }
+
+        IndexInput openLocalInput( IOContext context) throws IOException {
+            indexCopier.readFromLocal(true);
+            return local.openInput(name, context);
+        }
+
+        void markValid(){
+            this.valid = true;
+            localFileNames.add(name);
+        }
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java?rev=1768450&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
 Mon Nov  7 08:46:13 2016
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import 
org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopierClosedException;
+import org.apache.jackrabbit.oak.util.PerfLogger;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Maps.newConcurrentMap;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+
+public class CopyOnWriteDirectory extends FilterDirectory {
+    private static final Logger log = 
LoggerFactory.getLogger(CopyOnWriteDirectory.class);
+    private static final PerfLogger PERF_LOGGER = new 
PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf"));
+    private IndexCopier indexCopier;
+    /**
+     * Signal for the background thread to stop processing changes.
+     */
+    private final Callable<Void> STOP = new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+            return null;
+        }
+    };
+    private final Directory remote;
+    private final Directory local;
+    private final Executor executor;
+    private final ConcurrentMap<String, COWFileReference> fileMap = 
newConcurrentMap();
+    private final Set<String> deletedFilesLocal = Sets.newConcurrentHashSet();
+    private final Set<String> skippedFiles = Sets.newConcurrentHashSet();
+
+    private final BlockingQueue<Callable<Void>> queue = new 
LinkedBlockingQueue<Callable<Void>>();
+    private final AtomicReference<Throwable> errorInCopy = new 
AtomicReference<Throwable>();
+    private final CountDownLatch copyDone = new CountDownLatch(1);
+    private final boolean reindexMode;
+    private final String indexPathForLogging;
+    private final Set<String> sharedWorkingSet;
+
+    /**
+     * Current background task
+     */
+    private volatile NotifyingFutureTask currentTask =  
NotifyingFutureTask.completed();
+
+    /**
+     * Completion handler: set the current task to the next task and schedules 
that one
+     * on the background thread.
+     */
+    private final Runnable completionHandler = new Runnable() {
+        Callable<Void> task = new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                try {
+                    Callable<Void> task = queue.poll();
+                    if (task != null && task != STOP) {
+                        if (errorInCopy.get() != null) {
+                            log.trace("[COW][{}] Skipping task {} as some 
exception occurred in previous run",
+                                    indexPathForLogging, task);
+                        } else {
+                            task.call();
+                        }
+                        currentTask.onComplete(completionHandler);
+                    }
+
+                    //Signal that all tasks completed
+                    if (task == STOP){
+                        copyDone.countDown();
+                    }
+                } catch (Throwable t) {
+                    errorInCopy.set(t);
+                    log.debug("[COW][{}] Error occurred while copying files. 
Further processing would " +
+                            "be skipped", indexPathForLogging, t);
+                    currentTask.onComplete(completionHandler);
+                }
+                return null;
+            }
+        };
+
+        @Override
+        public void run() {
+            currentTask = new NotifyingFutureTask(task);
+            try {
+                executor.execute(currentTask);
+            } catch (RejectedExecutionException e){
+                checkIfClosed(false);
+                throw e;
+            }
+        }
+    };
+
+    public CopyOnWriteDirectory(IndexCopier indexCopier, Directory remote, 
Directory local, boolean reindexMode,
+                                String indexPathForLogging, Set<String> 
sharedWorkingSet, Executor executor) throws
+            IOException {
+        super(local);
+        this.indexCopier = indexCopier;
+        this.remote = remote;
+        this.local = local;
+        this.executor = executor;
+        this.indexPathForLogging = indexPathForLogging;
+        this.reindexMode = reindexMode;
+        this.sharedWorkingSet = sharedWorkingSet;
+        initialize();
+    }
+
+    @Override
+    public String[] listAll() throws IOException {
+        return Iterables.toArray(fileMap.keySet(), String.class);
+    }
+
+    @Override
+    public boolean fileExists(String name) throws IOException {
+        return fileMap.containsKey(name);
+    }
+
+    @Override
+    public void deleteFile(String name) throws IOException {
+        log.trace("[COW][{}] Deleted file {}", indexPathForLogging, name);
+        COWFileReference ref = fileMap.remove(name);
+        if (ref != null) {
+            ref.delete();
+        }
+    }
+
+    @Override
+    public long fileLength(String name) throws IOException {
+        COWFileReference ref = fileMap.get(name);
+        if (ref == null) {
+            throw new FileNotFoundException(name);
+        }
+        return ref.fileLength();
+    }
+
+    @Override
+    public IndexOutput createOutput(String name, IOContext context) throws 
IOException {
+        COWFileReference ref = fileMap.remove(name);
+        if (ref != null) {
+            ref.delete();
+        }
+        ref = new COWLocalFileReference(name);
+        fileMap.put(name, ref);
+        sharedWorkingSet.add(name);
+        return ref.createOutput(context);
+    }
+
+    @Override
+    public void sync(Collection<String> names) throws IOException {
+        for (String name : names){
+            COWFileReference file = fileMap.get(name);
+            if (file != null){
+                file.sync();
+            }
+        }
+    }
+
+    @Override
+    public IndexInput openInput(String name, IOContext context) throws 
IOException {
+        COWFileReference ref = fileMap.get(name);
+        if (ref == null) {
+            throw new FileNotFoundException(name);
+        }
+        return ref.openInput(context);
+    }
+
+    @Override
+    public void close() throws IOException {
+        int pendingCopies = queue.size();
+        addTask(STOP);
+
+        //Wait for all pending copy task to finish
+        try {
+            long start = PERF_LOGGER.start();
+
+            //Loop untill queue finished or IndexCopier
+            //found to be closed. Doing it with timeout to
+            //prevent any bug causing the thread to wait indefinitely
+            while (!copyDone.await(10, TimeUnit.SECONDS)) {
+                if (indexCopier.isClosed()) {
+                    throw new IndexCopierClosedException("IndexCopier found to 
be closed " +
+                            "while processing copy task for" + 
remote.toString());
+                }
+            }
+            PERF_LOGGER.end(start, -1, "[COW][{}] Completed pending copying 
task {}", indexPathForLogging, pendingCopies);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException(e);
+        }
+
+        Throwable t = errorInCopy.get();
+        if (t != null){
+            throw new IOException("Error occurred while copying files for " + 
indexPathForLogging, t);
+        }
+
+        //Sanity check
+        checkArgument(queue.isEmpty(), "Copy queue still " +
+                "has pending task left [%d]. %s", queue.size(), queue);
+
+        long skippedFilesSize = getSkippedFilesSize();
+
+        for (String fileName : deletedFilesLocal){
+            deleteLocalFile(fileName);
+        }
+
+        indexCopier.skippedUpload(skippedFilesSize);
+
+        String msg = "[COW][{}] CopyOnWrite stats : Skipped copying {} files 
with total size {}";
+        if ((reindexMode && skippedFilesSize > 0) || skippedFilesSize > 10 * 
FileUtils.ONE_MB){
+            log.info(msg, indexPathForLogging, skippedFiles.size(), 
humanReadableByteCount(skippedFilesSize));
+        } else {
+            log.debug(msg,indexPathForLogging, skippedFiles.size(), 
humanReadableByteCount(skippedFilesSize));
+        }
+
+        if (log.isTraceEnabled()){
+            log.trace("[COW][{}] File listing - Upon completion {}", 
indexPathForLogging, Arrays.toString(remote.listAll()));
+        }
+
+        local.close();
+        remote.close();
+        sharedWorkingSet.clear();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("[COW][%s] Local %s, Remote %s", 
indexPathForLogging, local, remote);
+    }
+
+    private long getSkippedFilesSize() {
+        long size = 0;
+        for (String name : skippedFiles){
+            try{
+                if (local.fileExists(name)){
+                    size += local.fileLength(name);
+                }
+            } catch (Exception ignore){
+
+            }
+        }
+        return size;
+    }
+
+    private void deleteLocalFile(String fileName) {
+        indexCopier.deleteFile(local, fileName, false);
+    }
+
+    private void initialize() throws IOException {
+        for (String name : remote.listAll()) {
+            fileMap.put(name, new COWRemoteFileReference(name));
+        }
+
+        if (log.isTraceEnabled()){
+            log.trace("[COW][{}] File listing - At start {}", 
indexPathForLogging, Arrays.toString(remote.listAll()));
+        }
+    }
+
+    private void addCopyTask(final String name){
+        indexCopier.scheduledForCopy();
+        addTask(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                indexCopier.copyDone();
+                if (deletedFilesLocal.contains(name)){
+                    skippedFiles.add(name);
+                    log.trace("[COW][{}] Skip copying of deleted file {}", 
indexPathForLogging, name);
+                    return null;
+                }
+                long fileSize = local.fileLength(name);
+                LocalIndexFile file = new LocalIndexFile(local, name, 
fileSize, false);
+                long perfStart = PERF_LOGGER.start();
+                long start = indexCopier.startCopy(file);
+
+                local.copy(remote, name, name, IOContext.DEFAULT);
+
+                indexCopier.doneCopy(file, start);
+                PERF_LOGGER.end(perfStart, 0, "[COW][{}] Copied to remote {} 
-- size: {}",
+                    indexPathForLogging, name, 
IOUtils.humanReadableByteCount(fileSize));
+                return null;
+            }
+
+            @Override
+            public String toString() {
+                return "Copy: " + name;
+            }
+        });
+    }
+
+    private void addDeleteTask(final String name){
+        addTask(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                if (!skippedFiles.contains(name)) {
+                    log.trace("[COW][{}] Marking as deleted {}", 
indexPathForLogging, name);
+                    remote.deleteFile(name);
+                }
+                return null;
+            }
+
+            @Override
+            public String toString() {
+                return "Delete : " + name;
+            }
+        });
+    }
+
+    private void addTask(Callable<Void> task){
+        checkIfClosed(true);
+        queue.add(task);
+        currentTask.onComplete(completionHandler);
+    }
+
+    private void checkIfClosed(boolean throwException) {
+        if (indexCopier.isClosed()) {
+            IndexCopierClosedException e = new 
IndexCopierClosedException("IndexCopier found to be closed " +
+                    "while processing" +remote.toString());
+            errorInCopy.set(e);
+            copyDone.countDown();
+
+            if (throwException) {
+                throw e;
+            }
+        }
+    }
+
+    private abstract class COWFileReference {
+        protected final String name;
+
+        public COWFileReference(String name) {
+            this.name = name;
+        }
+
+        public abstract long fileLength() throws IOException;
+
+        public abstract IndexInput openInput(IOContext context) throws 
IOException;
+
+        public abstract IndexOutput createOutput(IOContext context) throws 
IOException;
+
+        public abstract void delete() throws IOException;
+
+        public void sync() throws IOException {
+
+        }
+    }
+
+    private class COWRemoteFileReference extends COWFileReference {
+        private boolean validLocalCopyPresent;
+        private final long length;
+
+        public COWRemoteFileReference(String name) throws IOException {
+            super(name);
+            this.length = remote.fileLength(name);
+        }
+
+        @Override
+        public long fileLength() throws IOException {
+            return length;
+        }
+
+        @Override
+        public IndexInput openInput(IOContext context) throws IOException {
+            checkIfLocalValid();
+            if (validLocalCopyPresent && 
!IndexCopier.REMOTE_ONLY.contains(name)) {
+                indexCopier.readFromLocal(false);
+                return local.openInput(name, context);
+            }
+            indexCopier.readFromRemote(false);
+            return remote.openInput(name, context);
+        }
+
+        @Override
+        public IndexOutput createOutput(IOContext context) throws IOException {
+            throw new UnsupportedOperationException("Cannot create output for 
existing remote file " + name);
+        }
+
+        @Override
+        public void delete() throws IOException {
+            //Remote file should not be deleted locally as it might be
+            //in use by existing opened IndexSearcher. It would anyway
+            //get deleted by CopyOnRead later
+            //For now just record that these need to be deleted to avoid
+            //potential concurrent access of the NodeBuilder
+            addDeleteTask(name);
+        }
+
+        private void checkIfLocalValid() throws IOException {
+            validLocalCopyPresent = local.fileExists(name)
+                    && local.fileLength(name) == remote.fileLength(name);
+        }
+    }
+
+    private class COWLocalFileReference extends COWFileReference {
+        public COWLocalFileReference(String name) {
+            super(name);
+        }
+
+        @Override
+        public long fileLength() throws IOException {
+            return local.fileLength(name);
+        }
+
+        @Override
+        public IndexInput openInput(IOContext context) throws IOException {
+            return local.openInput(name, context);
+        }
+
+        @Override
+        public IndexOutput createOutput(IOContext context) throws IOException {
+            log.debug("[COW][{}] Creating output {}", indexPathForLogging, 
name);
+            return new 
COWLocalFileReference.CopyOnCloseIndexOutput(local.createOutput(name, context));
+        }
+
+        @Override
+        public void delete() throws IOException {
+            addDeleteTask(name);
+            deletedFilesLocal.add(name);
+        }
+
+        @Override
+        public void sync() throws IOException {
+            local.sync(Collections.singleton(name));
+        }
+
+        /**
+         * Implementation note - As we are decorating existing implementation
+         * we would need to ensure that we also override methods (non abstract)
+         * which might be implemented in say FSIndexInput like setLength
+         */
+        private class CopyOnCloseIndexOutput extends IndexOutput {
+            private final IndexOutput delegate;
+
+            public CopyOnCloseIndexOutput(IndexOutput delegate) {
+                this.delegate = delegate;
+            }
+
+            @Override
+            public void flush() throws IOException {
+                delegate.flush();
+            }
+
+            @Override
+            public void close() throws IOException {
+                delegate.close();
+                //Schedule this file to be copied in background
+                addCopyTask(name);
+            }
+
+            @Override
+            public long getFilePointer() {
+                return delegate.getFilePointer();
+            }
+
+            @Override
+            public void seek(long pos) throws IOException {
+                delegate.seek(pos);
+            }
+
+            @Override
+            public long length() throws IOException {
+                return delegate.length();
+            }
+
+            @Override
+            public void writeByte(byte b) throws IOException {
+                delegate.writeByte(b);
+            }
+
+            @Override
+            public void writeBytes(byte[] b, int offset, int length) throws 
IOException {
+                delegate.writeBytes(b, offset, length);
+            }
+
+            @Override
+            public void setLength(long length) throws IOException {
+                delegate.setLength(length);
+            }
+        }
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java?rev=1768450&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java
 Mon Nov  7 08:46:13 2016
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import org.apache.lucene.store.Directory;
+
+public class DirectoryUtils {
+    /**
+     * Get the file length in best effort basis.
+     * @return actual fileLength. -1 if cannot determine
+     */
+    public static long getFileLength(Directory dir, String fileName){
+        try{
+            //Check for file presence otherwise internally it results in
+            //an exception to be created
+            if (dir.fileExists(fileName)) {
+                return dir.fileLength(fileName);
+            }
+        } catch (Exception ignore){
+
+        }
+        return -1;
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java?rev=1768450&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
 Mon Nov  7 08:46:13 2016
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
+
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+
+public final class LocalIndexFile {
+    final File dir;
+    final String name;
+    final long size;
+    final boolean copyFromRemote;
+    private volatile int deleteAttemptCount;
+    final long creationTime = System.currentTimeMillis();
+
+    public LocalIndexFile(Directory dir, String fileName,
+                          long size, boolean copyFromRemote){
+        this.copyFromRemote = copyFromRemote;
+        this.dir = getFSDir(dir);
+        this.name = fileName;
+        this.size = size;
+    }
+
+    public LocalIndexFile(Directory dir, String fileName){
+        this(dir, fileName, DirectoryUtils.getFileLength(dir, fileName), true);
+    }
+
+    public String getKey(){
+        if (dir != null){
+            return new File(dir, name).getAbsolutePath();
+        }
+        return name;
+    }
+
+    public boolean isCopyFromRemote() {
+        return copyFromRemote;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+    public void incrementAttemptToDelete(){
+        deleteAttemptCount++;
+    }
+
+    public int getDeleteAttemptCount() {
+        return deleteAttemptCount;
+    }
+
+    public String deleteLog(){
+        return String.format("%s (%s, %d attempts, %d s)", name,
+                humanReadableByteCount(size), deleteAttemptCount, timeTaken());
+    }
+
+    public String copyLog(){
+        return String.format("%s (%s, %1.1f%%, %s, %d s)", name,
+                humanReadableByteCount(actualSize()),
+                copyProgress(),
+                humanReadableByteCount(size), timeTaken());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        LocalIndexFile localIndexFile = (LocalIndexFile) o;
+
+        if (dir != null ? !dir.equals(localIndexFile.dir) : localIndexFile.dir 
!= null)
+            return false;
+        return name.equals(localIndexFile.name);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = dir != null ? dir.hashCode() : 0;
+        result = 31 * result + name.hashCode();
+        return result;
+    }
+
+    private long timeTaken(){
+        return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
creationTime);
+    }
+
+    private float copyProgress(){
+        return actualSize() * 1.0f / size * 100;
+    }
+
+    private long actualSize(){
+        return dir != null ? new File(dir, name).length() : 0;
+    }
+
+    static File getFSDir(Directory dir) {
+        if (dir instanceof FilterDirectory){
+            dir = ((FilterDirectory) dir).getDelegate();
+        }
+
+        if (dir instanceof FSDirectory){
+            return ((FSDirectory) dir).getDirectory();
+        }
+
+        return null;
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1768450&r1=1768449&r2=1768450&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
 Mon Nov  7 08:46:13 2016
@@ -52,6 +52,7 @@ import com.google.common.util.concurrent
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexFile;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.lucene.store.Directory;
@@ -521,7 +522,7 @@ public class IndexCopierTest {
         w2.close();
 
         assertEquals(1, c1.getFailedToDeleteFiles().size());
-        IndexCopier.LocalIndexFile testFile = 
c1.getFailedToDeleteFiles().values().iterator().next();
+        LocalIndexFile testFile = 
c1.getFailedToDeleteFiles().values().iterator().next();
 
         assertEquals(1, testFile.getDeleteAttemptCount());
         assertEquals(IOUtils.humanReadableByteCount(t1.length), 
c1.getGarbageSize());

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java?rev=1768450&r1=1768449&r2=1768450&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
 Mon Nov  7 08:46:13 2016
@@ -107,6 +107,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText;
 import 
org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText.ExtractionResult;
 import 
org.apache.jackrabbit.oak.plugins.index.fulltext.PreExtractedTextProvider;
+import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory;
 import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider;
 import 
org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
@@ -196,7 +197,7 @@ public class LucenePropertyIndexTest ext
                 }
 
                 private String getFSDirPath(Directory dir){
-                    if (dir instanceof IndexCopier.CopyOnReadDirectory){
+                    if (dir instanceof CopyOnReadDirectory){
                         dir = ((CopyOnReadDirectory) dir).getLocal();
                     }
 



Reply via email to