Author: chetanm
Date: Mon Apr 6 06:47:41 2015
New Revision: 1671489
URL: http://svn.apache.org/r1671489
Log:
OAK-2709 - Misleading log message from IndexCopier
Made deletion of file more robust and exposed various stats as part of JMX to
get an insight into how much garbage is getting collected and how much is being
accumulated
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java?rev=1671489&r1=1671488&r2=1671489&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
Mon Apr 6 06:47:41 2015
@@ -31,9 +31,29 @@ public interface CopyOnReadStatsMBean {
int getRemoteReadCount();
+ int getScheduledForCopyCount();
+
+ int getCopyInProgressCount();
+
+ int getMaxCopyInProgressCount();
+
+ int getMaxScheduledForCopyCount();
+
+ String getCopyInProgressSize();
+
+ String[] getCopyInProgressDetails();
+
String getDownloadSize();
long getDownloadTime();
String getLocalIndexSize();
+
+ String[] getGarbageDetails();
+
+ String getGarbageSize();
+
+ int getDeletedFilesCount();
+
+ String getGarbageCollectedSize();
}
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1671489&r1=1671488&r2=1671489&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
Mon Apr 6 06:47:41 2015
@@ -22,11 +22,14 @@ package org.apache.jackrabbit.oak.plugin
import java.io.File;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -40,6 +43,7 @@ import javax.management.openmbean.Tabula
import javax.management.openmbean.TabularType;
import com.google.common.base.Charsets;
+import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -58,10 +62,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.toArray;
+import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Maps.newConcurrentMap;
class IndexCopier implements CopyOnReadStatsMBean {
private static final Set<String> REMOTE_ONLY =
ImmutableSet.of("segments.gen");
+ private static final int MAX_FAILURE_ENTRIES = 10000;
private final Logger log = LoggerFactory.getLogger(getClass());
private final Executor executor;
@@ -70,11 +77,21 @@ class IndexCopier implements CopyOnReadS
private final AtomicInteger localReadCount = new AtomicInteger();
private final AtomicInteger remoteReadCount = new AtomicInteger();
private final AtomicInteger invalidFileCount = new AtomicInteger();
+ private final AtomicInteger deletedFileCount = new AtomicInteger();
+ private final AtomicInteger scheduledForCopyCount = new AtomicInteger();
+ private final AtomicInteger copyInProgressCount = new AtomicInteger();
+ private final AtomicInteger maxCopyInProgressCount = new AtomicInteger();
+ private final AtomicInteger maxScheduledForCopyCount = new AtomicInteger();
+ private final AtomicLong copyInProgressSize = new AtomicLong();
private final AtomicLong downloadSize = new AtomicLong();
+ private final AtomicLong garbageCollectedSize = new AtomicLong();
private final AtomicLong downloadTime = new AtomicLong();
+
private final Map<String, String> indexPathMapping =
Maps.newConcurrentMap();
private final Map<String, String> indexPathVersionMapping =
Maps.newConcurrentMap();
+ private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles =
Maps.newConcurrentMap();
+ private final Set<LocalIndexFile> copyInProgressFiles =
Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
public IndexCopier(Executor executor, File indexRootDir) {
this.executor = executor;
@@ -108,6 +125,36 @@ class IndexCopier implements CopyOnReadS
return new File(indexRootDir, subDir);
}
+ Map<String, LocalIndexFile> getFailedToDeleteFiles() {
+ return Collections.unmodifiableMap(failedToDeleteFiles);
+ }
+
+ private void failedToDelete(LocalIndexFile file){
+ //Limit the size on best effort basis
+ if (failedToDeleteFiles.size() < MAX_FAILURE_ENTRIES) {
+ LocalIndexFile failedToDeleteFile =
failedToDeleteFiles.putIfAbsent(file.getKey(), file);
+ if (failedToDeleteFile == null){
+ failedToDeleteFile = file;
+ }
+ failedToDeleteFile.incrementAttemptToDelete();
+ } else {
+ log.warn("Not able to delete {}. Currently more than {} file with
total size {} are pending delete.",
+ file.deleteLog(), failedToDeleteFiles.size(),
getGarbageSize());
+ }
+ }
+
+ private void successfullyDeleted(LocalIndexFile file, boolean fileExisted){
+ LocalIndexFile failedToDeleteFile =
failedToDeleteFiles.remove(file.getKey());
+ if (failedToDeleteFile != null){
+ log.debug("Deleted : {}", failedToDeleteFile.deleteLog());
+ }
+
+ if (fileExisted){
+ garbageCollectedSize.addAndGet(file.size);
+ deletedFileCount.incrementAndGet();
+ }
+ }
+
/**
* Directory implementation which lazily copies the index files from a
* remote directory in background.
@@ -184,6 +231,7 @@ class IndexCopier implements CopyOnReadS
}
private void copy(final FileReference reference) {
+ updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
executor.execute(new Runnable() {
@Override
public void run() {
@@ -191,13 +239,17 @@ class IndexCopier implements CopyOnReadS
boolean success = false;
boolean copyAttempted = false;
try {
+ scheduledForCopyCount.decrementAndGet();
if (!local.fileExists(name)) {
- long start = System.currentTimeMillis();
+ long fileSize = remote.fileLength(name);
+ LocalIndexFile file = new LocalIndexFile(local,
name, fileSize);
+ long start = startCopy(file);
copyAttempted = true;
+
remote.copy(local, name, name, IOContext.READ);
reference.markValid();
- downloadTime.addAndGet(System.currentTimeMillis()
- start);
- downloadSize.addAndGet(remote.fileLength(name));
+
+ doneCopy(file, start);
} else {
long localLength = local.fileLength(name);
long remoteLength = remote.fileLength(name);
@@ -289,18 +341,22 @@ class IndexCopier implements CopyOnReadS
Set<String> failedToDelete = Sets.newHashSet();
for (String fileName : filesToBeDeleted) {
+ LocalIndexFile file = new LocalIndexFile(local, fileName);
try {
- local.deleteFile(fileName);
+ boolean fileExisted = false;
+ if (local.fileExists(fileName)) {
+ fileExisted = true;
+ local.deleteFile(fileName);
+ }
+ successfullyDeleted(file, fileExisted);
} catch (IOException e) {
failedToDelete.add(fileName);
- log.debug("Error occurred while removing deleted file {}
from Local {} ", fileName, local, e);
+ failedToDelete(file);
+ log.debug("Error occurred while removing deleted file {}
from Local {}. " +
+ "Attempt would be maid to delete it on next run ",
fileName, local, e);
}
}
- log.info("Error occurred while deleting following files from the
local index directory [{}]. " +
- "This can happen on Windows based system. Attempt would be
made to remove them " +
- "in next attempt ", local, failedToDelete);
-
filesToBeDeleted = new HashSet<String>(filesToBeDeleted);
filesToBeDeleted.removeAll(failedToDelete);
if(!filesToBeDeleted.isEmpty()) {
@@ -332,6 +388,40 @@ class IndexCopier implements CopyOnReadS
}
}
+ private long startCopy(LocalIndexFile file) {
+ updateMaxInProgress(copyInProgressCount.incrementAndGet());
+ copyInProgressSize.addAndGet(file.size);
+ copyInProgressFiles.add(file);
+ return System.currentTimeMillis();
+ }
+
+ private void doneCopy(LocalIndexFile file, long start) {
+ copyInProgressFiles.remove(file);
+ copyInProgressCount.decrementAndGet();
+ copyInProgressSize.addAndGet(-file.size);
+
+ downloadTime.addAndGet(System.currentTimeMillis() - start);
+ downloadSize.addAndGet(file.size);
+ }
+
+ private void updateMaxScheduled(int val) {
+ synchronized (maxScheduledForCopyCount){
+ int current = maxScheduledForCopyCount.get();
+ if (val > current){
+ maxScheduledForCopyCount.set(val);
+ }
+ }
+ }
+
+ private void updateMaxInProgress(int val) {
+ synchronized (maxCopyInProgressCount){
+ int current = maxCopyInProgressCount.get();
+ if (val > current){
+ maxCopyInProgressCount.set(val);
+ }
+ }
+ }
+
private class DeleteOldDirOnClose extends FilterDirectory {
private final File oldIndexDir;
@@ -351,6 +441,107 @@ class IndexCopier implements CopyOnReadS
super.close();
}
}
+
+ static final class LocalIndexFile {
+ final File dir;
+ final String name;
+ final long size;
+ private volatile int deleteAttemptCount;
+ final long creationTime = System.currentTimeMillis();
+
+ public LocalIndexFile(Directory dir, String fileName, long size){
+ this.dir = getFSDir(dir);
+ this.name = fileName;
+ this.size = size;
+ }
+
+ public LocalIndexFile(Directory dir, String fileName){
+ this(dir, fileName, getFileLength(dir, fileName));
+ }
+
+ public String getKey(){
+ if (dir != null){
+ return new File(dir, name).getAbsolutePath();
+ }
+ return name;
+ }
+
+ public void incrementAttemptToDelete(){
+ deleteAttemptCount++;
+ }
+
+ public int getDeleteAttemptCount() {
+ return deleteAttemptCount;
+ }
+
+ public String deleteLog(){
+ return String.format("%s (%s, %d attempts, %d s)", name,
+ IOUtils.humanReadableByteCount(size), deleteAttemptCount,
timeTaken());
+ }
+
+ public String copyLog(){
+ return String.format("%s (%s, %1.1f%%, %s, %d s)", name,
+ IOUtils.humanReadableByteCount(actualSize()),
+ copyProgress(),
+ IOUtils.humanReadableByteCount(size), timeTaken());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LocalIndexFile localIndexFile = (LocalIndexFile) o;
+
+ if (dir != null ? !dir.equals(localIndexFile.dir) :
localIndexFile.dir != null)
+ return false;
+ return name.equals(localIndexFile.name);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = dir != null ? dir.hashCode() : 0;
+ result = 31 * result + name.hashCode();
+ return result;
+ }
+
+ private long timeTaken(){
+ return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()
- creationTime);
+ }
+
+ private float copyProgress(){
+ return actualSize() * 1.0f / size * 100;
+ }
+
+ private long actualSize(){
+ return dir != null ? new File(dir, name).length() : 0;
+ }
+ }
+
+ static File getFSDir(Directory dir) {
+ if (dir instanceof FilterDirectory){
+ dir = ((FilterDirectory) dir).getDelegate();
+ }
+
+ if (dir instanceof FSDirectory){
+ return ((FSDirectory) dir).getDirectory();
+ }
+
+ return null;
+ }
+
+ /**
+ * Get the file length in best effort basis.
+ * @return actual fileLength. -1 if cannot determine
+ */
+ private static long getFileLength(Directory dir, String fileName){
+ try{
+ return dir.fileLength(fileName);
+ } catch (Exception e){
+ return -1;
+ }
+ }
//~------------------------------------------< CopyOnReadStatsMBean >
@@ -362,9 +553,10 @@ class IndexCopier implements CopyOnReadS
"Lucene Index Stats", IndexMappingData.TYPE, new
String[]{"jcrPath"});
tds = new TabularDataSupport(tt);
for (Map.Entry<String, String> e : indexPathMapping.entrySet()){
+ String size =
IOUtils.humanReadableByteCount(FileUtils.sizeOfDirectory(new
File(e.getValue())));
tds.put(new CompositeDataSupport(IndexMappingData.TYPE,
IndexMappingData.FIELD_NAMES,
- new String[] {e.getKey(), e.getValue()}));
+ new String[]{e.getKey(), e.getValue(), size}));
}
} catch (OpenDataException e){
throw new IllegalStateException(e);
@@ -401,20 +593,89 @@ class IndexCopier implements CopyOnReadS
return
IOUtils.humanReadableByteCount(FileUtils.sizeOfDirectory(indexRootDir));
}
+ @Override
+ public String[] getGarbageDetails() {
+ return toArray(transform(failedToDeleteFiles.values(),
+ new Function<LocalIndexFile, String>() {
+ @Override
+ public String apply(LocalIndexFile input) {
+ return input.deleteLog();
+ }
+ }), String.class);
+ }
+
+ @Override
+ public String getGarbageSize() {
+ long garbageSize = 0;
+ for (LocalIndexFile failedToDeleteFile : failedToDeleteFiles.values()){
+ garbageSize += failedToDeleteFile.size;
+ }
+ return IOUtils.humanReadableByteCount(garbageSize);
+ }
+
+ @Override
+ public int getScheduledForCopyCount() {
+ return scheduledForCopyCount.get();
+ }
+
+ @Override
+ public int getCopyInProgressCount() {
+ return copyInProgressCount.get();
+ }
+
+ @Override
+ public String getCopyInProgressSize() {
+ return IOUtils.humanReadableByteCount(copyInProgressSize.get());
+ }
+
+ @Override
+ public int getMaxCopyInProgressCount() {
+ return maxCopyInProgressCount.get();
+ }
+
+ @Override
+ public int getMaxScheduledForCopyCount() {
+ return maxScheduledForCopyCount.get();
+ }
+
+ @Override
+ public String[] getCopyInProgressDetails() {
+ return toArray(transform(copyInProgressFiles,
+ new Function<LocalIndexFile, String>() {
+ @Override
+ public String apply(LocalIndexFile input) {
+ return input.copyLog();
+ }
+ }), String.class);
+ }
+
+ @Override
+ public int getDeletedFilesCount() {
+ return deletedFileCount.get();
+ }
+
+ @Override
+ public String getGarbageCollectedSize() {
+ return IOUtils.humanReadableByteCount(garbageCollectedSize.get());
+ }
+
private static class IndexMappingData {
static final String[] FIELD_NAMES = new String[]{
"jcrPath",
"fsPath",
+ "size",
};
static final String[] FIELD_DESCRIPTIONS = new String[]{
"JCR Path",
"Filesystem Path",
+ "Size",
};
static final OpenType[] FIELD_TYPES = new OpenType[]{
SimpleType.STRING,
SimpleType.STRING,
+ SimpleType.STRING,
};
static final CompositeType TYPE = createCompositeType();
Modified:
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1671489&r1=1671488&r2=1671489&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
Mon Apr 6 06:47:41 2015
@@ -22,11 +22,24 @@ package org.apache.jackrabbit.oak.plugin
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
-import org.apache.commons.io.FileUtils;
+import javax.management.openmbean.TabularData;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.lucene.store.Directory;
@@ -34,11 +47,13 @@ import org.apache.lucene.store.IOContext
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RAMDirectory;
-import org.junit.After;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import static com.google.common.collect.Lists.newArrayList;
import static
com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static
org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_COUNT;
import static
org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -51,6 +66,9 @@ public class IndexCopierTest {
private NodeState root = INITIAL_CONTENT;
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
private NodeBuilder builder = root.builder();
@Test
@@ -80,6 +98,72 @@ public class IndexCopierTest {
}
@Test
+ public void basicTestWithFS() throws Exception{
+ IndexDefinition defn = new IndexDefinition(root,
builder.getNodeState());
+ IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());
+
+ Directory remote = new RAMDirectory();
+ Directory wrapped = c1.wrap("/foo" , defn, remote);
+
+ byte[] t1 = writeFile(remote, "t1");
+ byte[] t2 = writeFile(remote , "t2");
+
+ assertEquals(2, wrapped.listAll().length);
+
+ assertTrue(wrapped.fileExists("t1"));
+ assertTrue(wrapped.fileExists("t2"));
+
+ assertEquals(t1.length, wrapped.fileLength("t1"));
+ assertEquals(t2.length, wrapped.fileLength("t2"));
+
+ readAndAssert(wrapped, "t1", t1);
+
+ //t1 should now be added to testDir
+ File indexBaseDir = c1.getIndexDir("/foo");
+ File indexDir = new File(indexBaseDir, "0");
+ assertTrue(new File(indexDir, "t1").exists());
+
+ TabularData td = c1.getIndexPathMapping();
+ assertEquals(1, td.size());
+ }
+
+ @Test
+ public void deleteOldPostReindex() throws Exception{
+ IndexDefinition defn = new IndexDefinition(root,
builder.getNodeState());
+ IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());
+
+ Directory remote = new CloseSafeDir();
+ Directory w1 = c1.wrap("/foo" , defn, remote);
+
+ byte[] t1 = writeFile(remote , "t1");
+ byte[] t2 = writeFile(remote , "t2");
+
+ readAndAssert(w1, "t1", t1);
+ readAndAssert(w1, "t2", t2);
+
+ //t1 should now be added to testDir
+ File indexBaseDir = c1.getIndexDir("/foo");
+ File indexDir = new File(indexBaseDir, "0");
+ assertTrue(new File(indexDir, "t1").exists());
+
+ builder.setProperty(REINDEX_COUNT, 1);
+ defn = new IndexDefinition(root, builder.getNodeState());
+
+ //Close old version
+ w1.close();
+ //Get a new one with updated reindexCount
+ Directory w2 = c1.wrap("/foo" , defn, remote);
+
+ readAndAssert(w2, "t1", t1);
+
+ w2.close();
+ assertFalse("Old index directory should have been removed",
indexDir.exists());
+
+ File indexDir2 = new File(indexBaseDir, "1");
+ assertTrue(new File(indexDir2, "t1").exists());
+ }
+
+ @Test
public void concurrentRead() throws Exception{
Directory baseDir = new RAMDirectory();
IndexDefinition defn = new IndexDefinition(root,
builder.getNodeState());
@@ -88,18 +172,20 @@ public class IndexCopierTest {
IndexCopier c1 = new RAMIndexCopier(baseDir, executor, getWorkDir());
TestRAMDirectory remote = new TestRAMDirectory();
- Directory wrapped = c1.wrap("/foo" , defn, remote);
+ Directory wrapped = c1.wrap("/foo", defn, remote);
byte[] t1 = writeFile(remote , "t1");
//1. Trigger a read which should go to remote
readAndAssert(wrapped, "t1", t1);
+ assertEquals(1, c1.getScheduledForCopyCount());
assertEquals(1, remote.openedFiles.size());
assertEquals(1, executor.commands.size());
//2. Trigger another read and this should also be
//served from remote
readAndAssert(wrapped, "t1", t1);
+ assertEquals(1, c1.getScheduledForCopyCount());
assertEquals(2, remote.openedFiles.size());
//Second read should not add a new copy task
assertEquals(1, executor.commands.size());
@@ -112,6 +198,74 @@ public class IndexCopierTest {
readAndAssert(wrapped, "t1", t1);
// Now read should be served from local and not from remote
assertEquals(0, remote.openedFiles.size());
+ assertEquals(0, c1.getScheduledForCopyCount());
+ }
+
+ @Test
+ public void copyInProgressStats() throws Exception{
+ Directory baseDir = new RAMDirectory();
+ IndexDefinition defn = new IndexDefinition(root,
builder.getNodeState());
+
+ final List<ListenableFuture<?>> submittedTasks = Lists.newArrayList();
+ ExecutorService executor = new ForwardingListeningExecutorService() {
+ @Override
+ protected ListeningExecutorService delegate() {
+ return
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ submittedTasks.add(super.submit(command));
+ }
+ };
+
+ IndexCopier c1 = new RAMIndexCopier(baseDir, executor, getWorkDir());
+
+ final CountDownLatch copyProceed = new CountDownLatch(1);
+ final CountDownLatch copyRequestArrived = new CountDownLatch(1);
+ TestRAMDirectory remote = new TestRAMDirectory(){
+ @Override
+ public void copy(Directory to, String src, String dest, IOContext
context) throws IOException {
+ copyRequestArrived.countDown();
+ try {
+ copyProceed.await();
+ } catch (InterruptedException e) {
+
+ }
+ super.copy(to, src, dest, context);
+ }
+ };
+ Directory wrapped = c1.wrap("/foo", defn, remote);
+
+ byte[] t1 = writeFile(remote , "t1");
+
+ //1. Trigger a read which should go to remote
+ readAndAssert(wrapped, "t1", t1);
+ copyRequestArrived.await();
+ assertEquals(1, c1.getCopyInProgressCount());
+ assertEquals(1, remote.openedFiles.size());
+
+ //2. Trigger another read and this should also be
+ //served from remote
+ readAndAssert(wrapped, "t1", t1);
+ assertEquals(1, c1.getCopyInProgressCount());
+ assertEquals(IOUtils.humanReadableByteCount(t1.length),
c1.getCopyInProgressSize());
+ assertEquals(1, c1.getCopyInProgressDetails().length);
+ System.out.println(Arrays.toString(c1.getCopyInProgressDetails()));
+ assertEquals(2, remote.openedFiles.size());
+
+ //3. Perform copy
+ copyProceed.countDown();
+ Futures.allAsList(submittedTasks).get();
+ remote.reset();
+
+ //4. Now read again after copy is done
+ readAndAssert(wrapped, "t1", t1);
+ // Now read should be served from local and not from remote
+ assertEquals(0, remote.openedFiles.size());
+ assertEquals(0, c1.getCopyInProgressCount());
+
+ executor.shutdown();
}
/**
@@ -127,14 +281,14 @@ public class IndexCopierTest {
TestRAMDirectory remote = new TestRAMDirectory();
Directory wrapped = c1.wrap("/foo" , defn, remote);
- byte[] t1 = writeFile(remote , "t1");
+ byte[] t1 = writeFile(remote, "t1");
//1. Read for the first time should be served from remote
readAndAssert(wrapped, "t1", t1);
assertEquals(1, remote.openedFiles.size());
//2. Reuse the testDir and read again
- Directory wrapped2 = c1.wrap("/foo" , defn, remote);
+ Directory wrapped2 = c1.wrap("/foo", defn, remote);
remote.reset();
//3. Now read should be served from local
@@ -219,9 +373,65 @@ public class IndexCopierTest {
assertTrue(baseDir.fileExists("t2"));
}
- @After
- public void close() throws IOException {
- FileUtils.deleteQuietly(getWorkDir());
+
+ @Test
+ public void failureInDelete() throws Exception{
+ final Set<String> testFiles = new HashSet<String>();
+ Directory baseDir = new CloseSafeDir() {
+ @Override
+ public void deleteFile(String name) throws IOException {
+ if (testFiles.contains(name)){
+ throw new IOException("Not allowed to delete " + name);
+ }
+ super.deleteFile(name);
+ }
+ };
+
+ IndexDefinition defn = new IndexDefinition(root,
builder.getNodeState());
+ IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(),
getWorkDir());
+
+ Directory r1 = new RAMDirectory();
+
+ byte[] t1 = writeFile(r1, "t1");
+ byte[] t2 = writeFile(r1 , "t2");
+
+ Directory w1 = c1.wrap("/foo" , defn, r1);
+ readAndAssert(w1, "t1", t1);
+ readAndAssert(w1, "t2", t2);
+
+ // t1 and t2 should now be present in local (base dir which back local)
+ assertTrue(baseDir.fileExists("t1"));
+ assertTrue(baseDir.fileExists("t2"));
+
+ Directory r2 = new CloseSafeDir();
+ copy(r1, r2);
+ r2.deleteFile("t1");
+
+ Directory w2 = c1.wrap("/foo" , defn, r2);
+
+ //Close would trigger removal of file which are not present in remote
+ testFiles.add("t1");
+ w2.close();
+
+ assertEquals(1, c1.getFailedToDeleteFiles().size());
+ IndexCopier.LocalIndexFile testFile =
c1.getFailedToDeleteFiles().values().iterator().next();
+
+ assertEquals(1, testFile.getDeleteAttemptCount());
+ assertEquals(IOUtils.humanReadableByteCount(t1.length),
c1.getGarbageSize());
+ assertEquals(1, c1.getGarbageDetails().length);
+
+ Directory w3 = c1.wrap("/foo" , defn, r2);
+ w3.close();
+ assertEquals(2, testFile.getDeleteAttemptCount());
+
+ //Now let the file to be deleted
+ testFiles.clear();
+
+ Directory w4 = c1.wrap("/foo" , defn, r2);
+ w4.close();
+
+ //No pending deletes left
+ assertEquals(0, c1.getFailedToDeleteFiles().size());
}
private byte[] writeFile(Directory dir, String name) throws IOException {
@@ -239,7 +449,7 @@ public class IndexCopierTest {
}
private File getWorkDir(){
- return new File("target", "IndexClonerTest");
+ return temporaryFolder.getRoot();
}
private static void readAndAssert(Directory wrapped, String fileName,
byte[] expectedData) throws IOException {