Author: chetanm
Date: Mon Apr  6 06:47:41 2015
New Revision: 1671489

URL: http://svn.apache.org/r1671489
Log:
OAK-2709 - Misleading log message from IndexCopier

Made deletion of file more robust and exposed various stats as part of JMX  to 
get an insight into how much garbage is getting collected and how much is being 
accumulated

Modified:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java?rev=1671489&r1=1671488&r2=1671489&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
 Mon Apr  6 06:47:41 2015
@@ -31,9 +31,29 @@ public interface CopyOnReadStatsMBean {
 
     int getRemoteReadCount();
 
+    int getScheduledForCopyCount();
+
+    int getCopyInProgressCount();
+
+    int getMaxCopyInProgressCount();
+
+    int getMaxScheduledForCopyCount();
+
+    String getCopyInProgressSize();
+
+    String[] getCopyInProgressDetails();
+
     String getDownloadSize();
 
     long getDownloadTime();
 
     String getLocalIndexSize();
+
+    String[] getGarbageDetails();
+
+    String getGarbageSize();
+
+    int getDeletedFilesCount();
+
+    String getGarbageCollectedSize();
 }

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1671489&r1=1671488&r2=1671489&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
 Mon Apr  6 06:47:41 2015
@@ -22,11 +22,14 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -40,6 +43,7 @@ import javax.management.openmbean.Tabula
 import javax.management.openmbean.TabularType;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -58,10 +62,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.toArray;
+import static com.google.common.collect.Iterables.transform;
 import static com.google.common.collect.Maps.newConcurrentMap;
 
 class IndexCopier implements CopyOnReadStatsMBean {
     private static final Set<String> REMOTE_ONLY = 
ImmutableSet.of("segments.gen");
+    private static final int MAX_FAILURE_ENTRIES = 10000;
 
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final Executor executor;
@@ -70,11 +77,21 @@ class IndexCopier implements CopyOnReadS
     private final AtomicInteger localReadCount = new AtomicInteger();
     private final AtomicInteger remoteReadCount = new AtomicInteger();
     private final AtomicInteger invalidFileCount = new AtomicInteger();
+    private final AtomicInteger deletedFileCount = new AtomicInteger();
+    private final AtomicInteger scheduledForCopyCount = new AtomicInteger();
+    private final AtomicInteger copyInProgressCount = new AtomicInteger();
+    private final AtomicInteger maxCopyInProgressCount = new AtomicInteger();
+    private final AtomicInteger maxScheduledForCopyCount = new AtomicInteger();
+    private final AtomicLong copyInProgressSize = new AtomicLong();
     private final AtomicLong downloadSize = new AtomicLong();
+    private final AtomicLong garbageCollectedSize = new AtomicLong();
     private final AtomicLong downloadTime = new AtomicLong();
 
+
     private final Map<String, String> indexPathMapping = 
Maps.newConcurrentMap();
     private final Map<String, String> indexPathVersionMapping = 
Maps.newConcurrentMap();
+    private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = 
Maps.newConcurrentMap();
+    private final Set<LocalIndexFile> copyInProgressFiles = 
Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
 
     public IndexCopier(Executor executor, File indexRootDir) {
         this.executor = executor;
@@ -108,6 +125,36 @@ class IndexCopier implements CopyOnReadS
         return new File(indexRootDir, subDir);
     }
 
+    Map<String, LocalIndexFile> getFailedToDeleteFiles() {
+        return Collections.unmodifiableMap(failedToDeleteFiles);
+    }
+
+    private void failedToDelete(LocalIndexFile file){
+        //Limit the size on best effort basis
+        if (failedToDeleteFiles.size() < MAX_FAILURE_ENTRIES) {
+            LocalIndexFile failedToDeleteFile = 
failedToDeleteFiles.putIfAbsent(file.getKey(), file);
+            if (failedToDeleteFile == null){
+                failedToDeleteFile = file;
+            }
+            failedToDeleteFile.incrementAttemptToDelete();
+        } else {
+            log.warn("Not able to delete {}. Currently more than {} file with 
total size {} are pending delete.",
+                    file.deleteLog(), failedToDeleteFiles.size(), 
getGarbageSize());
+        }
+    }
+
+    private void successfullyDeleted(LocalIndexFile file, boolean fileExisted){
+        LocalIndexFile failedToDeleteFile = 
failedToDeleteFiles.remove(file.getKey());
+        if (failedToDeleteFile != null){
+            log.debug("Deleted : {}", failedToDeleteFile.deleteLog());
+        }
+
+        if (fileExisted){
+            garbageCollectedSize.addAndGet(file.size);
+            deletedFileCount.incrementAndGet();
+        }
+    }
+
     /**
      * Directory implementation which lazily copies the index files from a
      * remote directory in background.
@@ -184,6 +231,7 @@ class IndexCopier implements CopyOnReadS
         }
 
         private void copy(final FileReference reference) {
+            updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
             executor.execute(new Runnable() {
                 @Override
                 public void run() {
@@ -191,13 +239,17 @@ class IndexCopier implements CopyOnReadS
                     boolean success = false;
                     boolean copyAttempted = false;
                     try {
+                        scheduledForCopyCount.decrementAndGet();
                         if (!local.fileExists(name)) {
-                            long start = System.currentTimeMillis();
+                            long fileSize = remote.fileLength(name);
+                            LocalIndexFile file = new LocalIndexFile(local, 
name, fileSize);
+                            long start = startCopy(file);
                             copyAttempted = true;
+
                             remote.copy(local, name, name, IOContext.READ);
                             reference.markValid();
-                            downloadTime.addAndGet(System.currentTimeMillis() 
- start);
-                            downloadSize.addAndGet(remote.fileLength(name));
+
+                            doneCopy(file, start);
                         } else {
                             long localLength = local.fileLength(name);
                             long remoteLength = remote.fileLength(name);
@@ -289,18 +341,22 @@ class IndexCopier implements CopyOnReadS
             Set<String> failedToDelete = Sets.newHashSet();
 
             for (String fileName : filesToBeDeleted) {
+                LocalIndexFile file = new LocalIndexFile(local, fileName);
                 try {
-                    local.deleteFile(fileName);
+                    boolean fileExisted = false;
+                    if (local.fileExists(fileName)) {
+                        fileExisted = true;
+                        local.deleteFile(fileName);
+                    }
+                    successfullyDeleted(file, fileExisted);
                 } catch (IOException e) {
                     failedToDelete.add(fileName);
-                    log.debug("Error occurred while removing deleted file {} 
from Local {} ", fileName, local, e);
+                    failedToDelete(file);
+                    log.debug("Error occurred while removing deleted file {} 
from Local {}. " +
+                            "Attempt would be maid to delete it on next run ", 
fileName, local, e);
                 }
             }
 
-            log.info("Error occurred while deleting following files from the 
local index directory [{}]. " +
-                    "This can happen on Windows based system. Attempt would be 
made to remove them " +
-                    "in next attempt ", local, failedToDelete);
-
             filesToBeDeleted = new HashSet<String>(filesToBeDeleted);
             filesToBeDeleted.removeAll(failedToDelete);
             if(!filesToBeDeleted.isEmpty()) {
@@ -332,6 +388,40 @@ class IndexCopier implements CopyOnReadS
         }
     }
 
+    private long startCopy(LocalIndexFile file) {
+        updateMaxInProgress(copyInProgressCount.incrementAndGet());
+        copyInProgressSize.addAndGet(file.size);
+        copyInProgressFiles.add(file);
+        return System.currentTimeMillis();
+    }
+
+    private void doneCopy(LocalIndexFile file, long start) {
+        copyInProgressFiles.remove(file);
+        copyInProgressCount.decrementAndGet();
+        copyInProgressSize.addAndGet(-file.size);
+
+        downloadTime.addAndGet(System.currentTimeMillis() - start);
+        downloadSize.addAndGet(file.size);
+    }
+
+    private void updateMaxScheduled(int val) {
+        synchronized (maxScheduledForCopyCount){
+            int current = maxScheduledForCopyCount.get();
+            if (val > current){
+                maxScheduledForCopyCount.set(val);
+            }
+        }
+    }
+
+    private void updateMaxInProgress(int val) {
+        synchronized (maxCopyInProgressCount){
+            int current = maxCopyInProgressCount.get();
+            if (val > current){
+                maxCopyInProgressCount.set(val);
+            }
+        }
+    }
+
     private class DeleteOldDirOnClose extends FilterDirectory {
         private final File oldIndexDir;
 
@@ -351,6 +441,107 @@ class IndexCopier implements CopyOnReadS
             super.close();
         }
     }
+    
+    static final class LocalIndexFile {
+        final File dir;
+        final String name;
+        final long size;
+        private volatile int deleteAttemptCount;
+        final long creationTime = System.currentTimeMillis();
+        
+        public LocalIndexFile(Directory dir, String fileName, long size){
+            this.dir = getFSDir(dir);
+            this.name = fileName;
+            this.size = size;
+        }
+
+        public LocalIndexFile(Directory dir, String fileName){
+            this(dir, fileName, getFileLength(dir, fileName));
+        }
+
+        public String getKey(){
+            if (dir != null){
+                return new File(dir, name).getAbsolutePath();
+            }
+            return name;
+        }
+
+        public void incrementAttemptToDelete(){
+            deleteAttemptCount++;
+        }
+
+        public int getDeleteAttemptCount() {
+            return deleteAttemptCount;
+        }
+
+        public String deleteLog(){
+            return String.format("%s (%s, %d attempts, %d s)", name,
+                    IOUtils.humanReadableByteCount(size), deleteAttemptCount, 
timeTaken());
+        }
+
+        public String copyLog(){
+            return String.format("%s (%s, %1.1f%%, %s, %d s)", name,
+                    IOUtils.humanReadableByteCount(actualSize()),
+                    copyProgress(),
+                    IOUtils.humanReadableByteCount(size), timeTaken());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            LocalIndexFile localIndexFile = (LocalIndexFile) o;
+
+            if (dir != null ? !dir.equals(localIndexFile.dir) : 
localIndexFile.dir != null)
+                return false;
+            return name.equals(localIndexFile.name);
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = dir != null ? dir.hashCode() : 0;
+            result = 31 * result + name.hashCode();
+            return result;
+        }
+
+        private long timeTaken(){
+            return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() 
- creationTime);
+        }
+
+        private float copyProgress(){
+            return actualSize() * 1.0f / size * 100;
+        }
+
+        private long actualSize(){
+            return dir != null ? new File(dir, name).length() : 0;
+        }
+    }
+
+    static File getFSDir(Directory dir) {
+        if (dir instanceof FilterDirectory){
+            dir = ((FilterDirectory) dir).getDelegate();
+        }
+
+        if (dir instanceof FSDirectory){
+            return ((FSDirectory) dir).getDirectory();
+        }
+
+        return null;
+    }
+
+    /**
+     * Get the file length in best effort basis.
+     * @return actual fileLength. -1 if cannot determine
+     */
+    private static long getFileLength(Directory dir, String fileName){
+        try{
+            return dir.fileLength(fileName);
+        } catch (Exception e){
+            return -1;
+        }
+    }
 
     //~------------------------------------------< CopyOnReadStatsMBean >
 
@@ -362,9 +553,10 @@ class IndexCopier implements CopyOnReadS
                     "Lucene Index Stats", IndexMappingData.TYPE, new 
String[]{"jcrPath"});
             tds = new TabularDataSupport(tt);
             for (Map.Entry<String, String> e : indexPathMapping.entrySet()){
+                String size = 
IOUtils.humanReadableByteCount(FileUtils.sizeOfDirectory(new 
File(e.getValue())));
                 tds.put(new CompositeDataSupport(IndexMappingData.TYPE,
                         IndexMappingData.FIELD_NAMES,
-                        new String[] {e.getKey(), e.getValue()}));
+                        new String[]{e.getKey(), e.getValue(), size}));
             }
         } catch (OpenDataException e){
             throw new IllegalStateException(e);
@@ -401,20 +593,89 @@ class IndexCopier implements CopyOnReadS
         return 
IOUtils.humanReadableByteCount(FileUtils.sizeOfDirectory(indexRootDir));
     }
 
+    @Override
+    public String[] getGarbageDetails() {
+        return toArray(transform(failedToDeleteFiles.values(),
+                new Function<LocalIndexFile, String>() {
+                    @Override
+                    public String apply(LocalIndexFile input) {
+                        return input.deleteLog();
+                    }
+                }), String.class);
+    }
+
+    @Override
+    public String getGarbageSize() {
+        long garbageSize = 0;
+        for (LocalIndexFile failedToDeleteFile : failedToDeleteFiles.values()){
+            garbageSize += failedToDeleteFile.size;
+        }
+        return IOUtils.humanReadableByteCount(garbageSize);
+    }
+
+    @Override
+    public int getScheduledForCopyCount() {
+        return scheduledForCopyCount.get();
+    }
+
+    @Override
+    public int getCopyInProgressCount() {
+        return copyInProgressCount.get();
+    }
+
+    @Override
+    public String getCopyInProgressSize() {
+        return IOUtils.humanReadableByteCount(copyInProgressSize.get());
+    }
+
+    @Override
+    public int getMaxCopyInProgressCount() {
+        return maxCopyInProgressCount.get();
+    }
+
+    @Override
+    public int getMaxScheduledForCopyCount() {
+        return maxScheduledForCopyCount.get();
+    }
+
+    @Override
+    public String[] getCopyInProgressDetails() {
+        return toArray(transform(copyInProgressFiles,
+                new Function<LocalIndexFile, String>() {
+                    @Override
+                    public String apply(LocalIndexFile input) {
+                        return input.copyLog();
+                    }
+                }), String.class);
+    }
+
+    @Override
+    public int getDeletedFilesCount() {
+        return deletedFileCount.get();
+    }
+
+    @Override
+    public String getGarbageCollectedSize() {
+        return IOUtils.humanReadableByteCount(garbageCollectedSize.get());
+    }
+
     private static class IndexMappingData {
         static final String[] FIELD_NAMES = new String[]{
                 "jcrPath",
                 "fsPath",
+                "size",
         };
 
         static final String[] FIELD_DESCRIPTIONS = new String[]{
                 "JCR Path",
                 "Filesystem Path",
+                "Size",
         };
 
         static final OpenType[] FIELD_TYPES = new OpenType[]{
                 SimpleType.STRING,
                 SimpleType.STRING,
+                SimpleType.STRING,
         };
 
         static final CompositeType TYPE = createCompositeType();

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1671489&r1=1671488&r2=1671489&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
 Mon Apr  6 06:47:41 2015
@@ -22,11 +22,24 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-import org.apache.commons.io.FileUtils;
+import javax.management.openmbean.TabularData;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.lucene.store.Directory;
@@ -34,11 +47,13 @@ import org.apache.lucene.store.IOContext
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.RAMDirectory;
-import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import static com.google.common.collect.Lists.newArrayList;
 import static 
com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_COUNT;
 import static 
org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -51,6 +66,9 @@ public class IndexCopierTest {
 
     private NodeState root = INITIAL_CONTENT;
 
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
     private NodeBuilder builder = root.builder();
 
     @Test
@@ -80,6 +98,72 @@ public class IndexCopierTest {
     }
 
     @Test
+    public void basicTestWithFS() throws Exception{
+        IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
+        IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());
+
+        Directory remote = new RAMDirectory();
+        Directory wrapped = c1.wrap("/foo" , defn, remote);
+
+        byte[] t1 = writeFile(remote, "t1");
+        byte[] t2 = writeFile(remote , "t2");
+
+        assertEquals(2, wrapped.listAll().length);
+
+        assertTrue(wrapped.fileExists("t1"));
+        assertTrue(wrapped.fileExists("t2"));
+
+        assertEquals(t1.length, wrapped.fileLength("t1"));
+        assertEquals(t2.length, wrapped.fileLength("t2"));
+
+        readAndAssert(wrapped, "t1", t1);
+
+        //t1 should now be added to testDir
+        File indexBaseDir = c1.getIndexDir("/foo");
+        File indexDir = new File(indexBaseDir, "0");
+        assertTrue(new File(indexDir, "t1").exists());
+
+        TabularData td = c1.getIndexPathMapping();
+        assertEquals(1, td.size());
+    }
+
+    @Test
+    public void deleteOldPostReindex() throws Exception{
+        IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
+        IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());
+
+        Directory remote = new CloseSafeDir();
+        Directory w1 = c1.wrap("/foo" , defn, remote);
+
+        byte[] t1 = writeFile(remote , "t1");
+        byte[] t2 = writeFile(remote , "t2");
+
+        readAndAssert(w1, "t1", t1);
+        readAndAssert(w1, "t2", t2);
+
+        //t1 should now be added to testDir
+        File indexBaseDir = c1.getIndexDir("/foo");
+        File indexDir = new File(indexBaseDir, "0");
+        assertTrue(new File(indexDir, "t1").exists());
+
+        builder.setProperty(REINDEX_COUNT, 1);
+        defn = new IndexDefinition(root, builder.getNodeState());
+
+        //Close old version
+        w1.close();
+        //Get a new one with updated reindexCount
+        Directory w2 = c1.wrap("/foo" , defn, remote);
+
+        readAndAssert(w2, "t1", t1);
+
+        w2.close();
+        assertFalse("Old index directory should have been removed", 
indexDir.exists());
+
+        File indexDir2 = new File(indexBaseDir, "1");
+        assertTrue(new File(indexDir2, "t1").exists());
+    }
+
+    @Test
     public void concurrentRead() throws Exception{
         Directory baseDir = new RAMDirectory();
         IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
@@ -88,18 +172,20 @@ public class IndexCopierTest {
         IndexCopier c1 = new RAMIndexCopier(baseDir, executor, getWorkDir());
 
         TestRAMDirectory remote = new TestRAMDirectory();
-        Directory wrapped = c1.wrap("/foo" , defn, remote);
+        Directory wrapped = c1.wrap("/foo", defn, remote);
 
         byte[] t1 = writeFile(remote , "t1");
 
         //1. Trigger a read which should go to remote
         readAndAssert(wrapped, "t1", t1);
+        assertEquals(1, c1.getScheduledForCopyCount());
         assertEquals(1, remote.openedFiles.size());
         assertEquals(1, executor.commands.size());
 
         //2. Trigger another read and this should also be
         //served from remote
         readAndAssert(wrapped, "t1", t1);
+        assertEquals(1, c1.getScheduledForCopyCount());
         assertEquals(2, remote.openedFiles.size());
         //Second read should not add a new copy task
         assertEquals(1, executor.commands.size());
@@ -112,6 +198,74 @@ public class IndexCopierTest {
         readAndAssert(wrapped, "t1", t1);
         // Now read should be served from local and not from remote
         assertEquals(0, remote.openedFiles.size());
+        assertEquals(0, c1.getScheduledForCopyCount());
+    }
+
+    @Test
+    public void copyInProgressStats() throws Exception{
+        Directory baseDir = new RAMDirectory();
+        IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
+
+        final List<ListenableFuture<?>> submittedTasks = Lists.newArrayList();
+        ExecutorService executor = new ForwardingListeningExecutorService() {
+            @Override
+            protected ListeningExecutorService delegate() {
+                return 
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+            }
+
+            @Override
+            public void execute(Runnable command) {
+                submittedTasks.add(super.submit(command));
+            }
+        };
+
+        IndexCopier c1 = new RAMIndexCopier(baseDir, executor, getWorkDir());
+
+        final CountDownLatch copyProceed = new CountDownLatch(1);
+        final CountDownLatch copyRequestArrived = new CountDownLatch(1);
+        TestRAMDirectory remote = new TestRAMDirectory(){
+            @Override
+            public void copy(Directory to, String src, String dest, IOContext 
context) throws IOException {
+                copyRequestArrived.countDown();
+                try {
+                    copyProceed.await();
+                } catch (InterruptedException e) {
+
+                }
+                super.copy(to, src, dest, context);
+            }
+        };
+        Directory wrapped = c1.wrap("/foo", defn, remote);
+
+        byte[] t1 = writeFile(remote , "t1");
+
+        //1. Trigger a read which should go to remote
+        readAndAssert(wrapped, "t1", t1);
+        copyRequestArrived.await();
+        assertEquals(1, c1.getCopyInProgressCount());
+        assertEquals(1, remote.openedFiles.size());
+
+        //2. Trigger another read and this should also be
+        //served from remote
+        readAndAssert(wrapped, "t1", t1);
+        assertEquals(1, c1.getCopyInProgressCount());
+        assertEquals(IOUtils.humanReadableByteCount(t1.length), 
c1.getCopyInProgressSize());
+        assertEquals(1, c1.getCopyInProgressDetails().length);
+        System.out.println(Arrays.toString(c1.getCopyInProgressDetails()));
+        assertEquals(2, remote.openedFiles.size());
+
+        //3. Perform copy
+        copyProceed.countDown();
+        Futures.allAsList(submittedTasks).get();
+        remote.reset();
+
+        //4. Now read again after copy is done
+        readAndAssert(wrapped, "t1", t1);
+        // Now read should be served from local and not from remote
+        assertEquals(0, remote.openedFiles.size());
+        assertEquals(0, c1.getCopyInProgressCount());
+
+        executor.shutdown();
     }
 
     /**
@@ -127,14 +281,14 @@ public class IndexCopierTest {
         TestRAMDirectory remote = new TestRAMDirectory();
         Directory wrapped = c1.wrap("/foo" , defn, remote);
 
-        byte[] t1 = writeFile(remote , "t1");
+        byte[] t1 = writeFile(remote, "t1");
 
         //1. Read for the first time should be served from remote
         readAndAssert(wrapped, "t1", t1);
         assertEquals(1, remote.openedFiles.size());
 
         //2. Reuse the testDir and read again
-        Directory wrapped2 = c1.wrap("/foo" , defn, remote);
+        Directory wrapped2 = c1.wrap("/foo", defn, remote);
         remote.reset();
 
         //3. Now read should be served from local
@@ -219,9 +373,65 @@ public class IndexCopierTest {
         assertTrue(baseDir.fileExists("t2"));
     }
 
-    @After
-    public void close() throws IOException {
-        FileUtils.deleteQuietly(getWorkDir());
+
+    @Test
+    public void failureInDelete() throws Exception{
+        final Set<String> testFiles = new HashSet<String>();
+        Directory baseDir = new CloseSafeDir() {
+            @Override
+            public void deleteFile(String name) throws IOException {
+                if (testFiles.contains(name)){
+                    throw new IOException("Not allowed to delete " + name);
+                }
+                super.deleteFile(name);
+            }
+        };
+
+        IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
+        IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), 
getWorkDir());
+
+        Directory r1 = new RAMDirectory();
+
+        byte[] t1 = writeFile(r1, "t1");
+        byte[] t2 = writeFile(r1 , "t2");
+
+        Directory w1 = c1.wrap("/foo" , defn, r1);
+        readAndAssert(w1, "t1", t1);
+        readAndAssert(w1, "t2", t2);
+
+        // t1 and t2 should now be present in local (base dir which back local)
+        assertTrue(baseDir.fileExists("t1"));
+        assertTrue(baseDir.fileExists("t2"));
+
+        Directory r2 = new CloseSafeDir();
+        copy(r1, r2);
+        r2.deleteFile("t1");
+
+        Directory w2 = c1.wrap("/foo" , defn, r2);
+
+        //Close would trigger removal of file which are not present in remote
+        testFiles.add("t1");
+        w2.close();
+
+        assertEquals(1, c1.getFailedToDeleteFiles().size());
+        IndexCopier.LocalIndexFile testFile = 
c1.getFailedToDeleteFiles().values().iterator().next();
+
+        assertEquals(1, testFile.getDeleteAttemptCount());
+        assertEquals(IOUtils.humanReadableByteCount(t1.length), 
c1.getGarbageSize());
+        assertEquals(1, c1.getGarbageDetails().length);
+
+        Directory w3 = c1.wrap("/foo" , defn, r2);
+        w3.close();
+        assertEquals(2, testFile.getDeleteAttemptCount());
+
+        //Now let the file to be deleted
+        testFiles.clear();
+
+        Directory w4 = c1.wrap("/foo" , defn, r2);
+        w4.close();
+
+        //No pending deletes left
+        assertEquals(0, c1.getFailedToDeleteFiles().size());
     }
 
     private byte[] writeFile(Directory dir, String name) throws IOException {
@@ -239,7 +449,7 @@ public class IndexCopierTest {
     }
 
     private File getWorkDir(){
-        return new File("target", "IndexClonerTest");
+        return temporaryFolder.getRoot();
     }
 
     private static void readAndAssert(Directory wrapped, String fileName, 
byte[] expectedData) throws IOException {


Reply via email to