Author: chetanm
Date: Mon Nov 7 08:46:13 2016
New Revision: 1768450
URL: http://svn.apache.org/viewvc?rev=1768450&view=rev
Log:
OAK-5075 - Refactor IndexCopier to make it more modular
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
(with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
(with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java
(with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1768450&r1=1768449&r2=1768450&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
Mon Nov 7 08:46:13 2016
@@ -21,27 +21,15 @@ package org.apache.jackrabbit.oak.plugin
import java.io.Closeable;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
@@ -54,26 +42,22 @@ import javax.management.openmbean.Tabula
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
-import org.apache.jackrabbit.oak.commons.IOUtils;
-import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
+import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory;
+import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnWriteDirectory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryUtils;
import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.IndexRootDirectory;
import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexDir;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexFile;
import org.apache.jackrabbit.oak.util.PerfLogger;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.NoLockFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.toArray;
import static com.google.common.collect.Iterables.transform;
@@ -82,12 +66,11 @@ import static com.google.common.collect.
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
public class IndexCopier implements CopyOnReadStatsMBean, Closeable {
- private static final Set<String> REMOTE_ONLY =
ImmutableSet.of("segments.gen");
+ public static final Set<String> REMOTE_ONLY =
ImmutableSet.of("segments.gen");
private static final int MAX_FAILURE_ENTRIES = 10000;
private static final String WORK_DIR_NAME = "indexWriterDir";
private final Logger log = LoggerFactory.getLogger(getClass());
- private final PerfLogger PERF_LOGGER = new
PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf"));
private final Executor executor;
private final File indexWorkDir;
@@ -134,13 +117,13 @@ public class IndexCopier implements Copy
public Directory wrapForRead(String indexPath, IndexDefinition definition,
Directory remote, String dirName) throws
IOException {
Directory local = createLocalDirForIndexReader(indexPath, definition,
dirName);
- return new CopyOnReadDirectory(remote, local, prefetchEnabled,
indexPath, getSharedWorkingSet(indexPath));
+ return new CopyOnReadDirectory(this, remote, local, prefetchEnabled,
indexPath, getSharedWorkingSet(indexPath), executor);
}
public Directory wrapForWrite(IndexDefinition definition, Directory
remote, boolean reindexMode, String dirName) throws IOException {
Directory local = createLocalDirForIndexWriter(definition, dirName);
- return new CopyOnWriteDirectory(remote, local, reindexMode,
- getIndexPathForLogging(definition),
getSharedWorkingSet(definition.getIndexPathFromConfig()));
+ return new CopyOnWriteDirectory(this, remote, local, reindexMode,
+ getIndexPathForLogging(definition),
getSharedWorkingSet(definition.getIndexPathFromConfig()), executor);
}
@Override
@@ -148,6 +131,10 @@ public class IndexCopier implements Copy
this.closed = true;
}
+ public boolean isClosed() {
+ return closed;
+ }
+
File getIndexWorkDir() {
return indexWorkDir;
}
@@ -209,7 +196,7 @@ public class IndexCopier implements Copy
}
if (fileExisted){
- garbageCollectedSize.addAndGet(file.size);
+ garbageCollectedSize.addAndGet(file.getSize());
deletedFileCount.incrementAndGet();
}
}
@@ -262,762 +249,8 @@ public class IndexCopier implements Copy
return indexPath.concat(dirName);
}
- /**
- * Directory implementation which lazily copies the index files from a
- * remote directory in background.
- */
- class CopyOnReadDirectory extends FilterDirectory {
- private final Directory remote;
- private final Directory local;
- private final String indexPath;
-
- private final ConcurrentMap<String, CORFileReference> files =
newConcurrentMap();
- /**
- * Set of fileNames bound to current local dir. It is updated with any
new file
- * which gets added by this directory
- */
- private final Set<String> localFileNames = Sets.newConcurrentHashSet();
-
- public CopyOnReadDirectory(Directory remote, Directory local, boolean
prefetch,
- String indexPath, Set<String>
sharedWorkingSet) throws IOException {
- super(remote);
- this.remote = remote;
- this.local = local;
- this.indexPath = indexPath;
-
- this.localFileNames.addAll(Arrays.asList(local.listAll()));
- //Remove files which are being worked upon by COW
- this.localFileNames.removeAll(sharedWorkingSet);
-
- if (prefetch) {
- prefetchIndexFiles();
- }
- }
-
- @Override
- public void deleteFile(String name) throws IOException {
- throw new UnsupportedOperationException("Cannot delete in a
ReadOnly directory");
- }
-
- @Override
- public IndexOutput createOutput(String name, IOContext context) throws
IOException {
- throw new UnsupportedOperationException("Cannot write in a
ReadOnly directory");
- }
-
- @Override
- public IndexInput openInput(String name, IOContext context) throws
IOException {
- if (REMOTE_ONLY.contains(name)) {
- log.trace("[{}] opening remote only file {}", indexPath, name);
- return remote.openInput(name, context);
- }
-
- CORFileReference ref = files.get(name);
- if (ref != null) {
- if (ref.isLocalValid()) {
- log.trace("[{}] opening existing local file {}",
indexPath, name);
- return files.get(name).openLocalInput(context);
- } else {
- readerRemoteReadCount.incrementAndGet();
- log.trace(
- "[{}] opening existing remote file as local
version is not valid {}",
- indexPath, name);
- return remote.openInput(name, context);
- }
- }
-
- //If file does not exist then just delegate to remote and not
- //schedule a copy task
- if (!remote.fileExists(name)){
- if (log.isDebugEnabled()) {
- log.debug("[{}] Looking for non existent file {}. Current
known files {}",
- indexPath, name,
Arrays.toString(remote.listAll()));
- }
- return remote.openInput(name, context);
- }
-
- CORFileReference toPut = new CORFileReference(name);
- CORFileReference old = files.putIfAbsent(name, toPut);
- if (old == null) {
- log.trace("[{}] scheduled local copy for {}", indexPath, name);
- copy(toPut);
- }
-
- //If immediate executor is used the result would be ready right
away
- if (toPut.isLocalValid()) {
- log.trace("[{}] opening new local file {}", indexPath, name);
- return toPut.openLocalInput(context);
- }
-
- log.trace("[{}] opening new remote file {}", indexPath, name);
- readerRemoteReadCount.incrementAndGet();
- return remote.openInput(name, context);
- }
-
- Directory getLocal() {
- return local;
- }
-
- private void copy(final CORFileReference reference) {
- updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
- executor.execute(new Runnable() {
- @Override
- public void run() {
- scheduledForCopyCount.decrementAndGet();
- copyFilesToLocal(reference, true, true);
- }
- });
- }
-
- private void prefetchIndexFiles() throws IOException {
- long start = PERF_LOGGER.start();
- long totalSize = 0;
- int copyCount = 0;
- List<String> copiedFileNames = Lists.newArrayList();
- for (String name : remote.listAll()) {
- if (REMOTE_ONLY.contains(name)) {
- continue;
- }
- CORFileReference fileRef = new CORFileReference(name);
- files.putIfAbsent(name, fileRef);
- long fileSize = copyFilesToLocal(fileRef, false, false);
- if (fileSize > 0) {
- copyCount++;
- totalSize += fileSize;
- copiedFileNames.add(name);
- }
- }
-
- local.sync(copiedFileNames);
- PERF_LOGGER.end(start, -1, "[{}] Copied {} files totaling {}",
indexPath, copyCount, humanReadableByteCount(totalSize));
- }
-
- private long copyFilesToLocal(CORFileReference reference, boolean
sync, boolean logDuration) {
- String name = reference.name;
- boolean success = false;
- boolean copyAttempted = false;
- long fileSize = 0;
- try {
- if (!local.fileExists(name)) {
- long perfStart = -1;
- if (logDuration) {
- perfStart = PERF_LOGGER.start();
- }
-
- fileSize = remote.fileLength(name);
- LocalIndexFile file = new LocalIndexFile(local, name,
fileSize, true);
- long start = startCopy(file);
- copyAttempted = true;
-
- remote.copy(local, name, name, IOContext.READ);
- reference.markValid();
-
- if (sync) {
- local.sync(Collections.singleton(name));
- }
-
- doneCopy(file, start);
- if (logDuration) {
- PERF_LOGGER.end(perfStart, 0,
- "[{}] Copied file {} of size {}", indexPath,
- name, humanReadableByteCount(fileSize));
- }
- } else {
- long localLength = local.fileLength(name);
- long remoteLength = remote.fileLength(name);
-
- //Do a simple consistency check. Ideally Lucene index
files are never
- //updated but still do a check if the copy is consistent
- if (localLength != remoteLength) {
- log.warn("[{}] Found local copy for {} in {} but size
of local {} differs from remote {}. " +
- "Content would be read from remote
file only",
- indexPath, name, local, localLength,
remoteLength);
- invalidFileCount.incrementAndGet();
- } else {
- reference.markValid();
- log.trace("[{}] found local copy of file {}",
- indexPath, name);
- }
- }
- success = true;
- } catch (IOException e) {
- //TODO In case of exception there would not be any other
attempt
- //to download the file. Look into support for retry
- log.warn("[{}] Error occurred while copying file [{}] from {}
to {}", indexPath, name, remote, local, e);
- } finally {
- if (copyAttempted && !success){
- try {
- if (local.fileExists(name)) {
- local.deleteFile(name);
- }
- } catch (IOException e) {
- log.warn("[{}] Error occurred while deleting corrupted
file [{}] from [{}]", indexPath, name, local, e);
- }
- }
- }
- return fileSize;
- }
-
- /**
- * On close file which are not present in remote are removed from
local.
- * CopyOnReadDir is opened at different revisions of the index state
- *
- * CDir1 - V1
- * CDir2 - V2
- *
- * Its possible that two different IndexSearcher are opened at same
local
- * directory but pinned to different revisions. So while removing it
must
- * be ensured that any currently opened IndexSearcher does not get
affected.
- * The way IndexSearchers get created in IndexTracker it ensures that
new searcher
- * pinned to newer revision gets opened first and then existing ones
are closed.
- *
- *
- * @throws IOException
- */
- @Override
- public void close() throws IOException {
- //Always remove old index file on close as it ensures that
- //no other IndexSearcher are opened with previous revision of
Index due to
- //way IndexTracker closes IndexNode. At max there would be only
two IndexNode
- //opened pinned to different revision of same Lucene index
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try{
- removeDeletedFiles();
- } catch (IOException e) {
- log.warn(
- "[{}] Error occurred while removing deleted
files from Local {}, Remote {}",
- indexPath, local, remote, e);
- }
-
- try {
- //This would also remove old index files if current
- //directory was based on newerRevision as local would
- //be of type DeleteOldDirOnClose
- local.close();
- remote.close();
- } catch (IOException e) {
- log.warn(
- "[{}] Error occurred while closing directory ",
- indexPath, e);
- }
- }
- });
- }
-
- @Override
- public String toString() {
- return String.format("[COR] Local %s, Remote %s", local, remote);
- }
-
- private void removeDeletedFiles() throws IOException {
- //Files present in dest but not present in source have to be
deleted
- Set<String> filesToBeDeleted = Sets.difference(
- ImmutableSet.copyOf(localFileNames),
- ImmutableSet.copyOf(remote.listAll())
- );
-
- Set<String> failedToDelete = Sets.newHashSet();
-
- for (String fileName : filesToBeDeleted) {
- boolean deleted = IndexCopier.this.deleteFile(local, fileName,
true);
- if (!deleted){
- failedToDelete.add(fileName);
- }
- }
-
- filesToBeDeleted = new HashSet<String>(filesToBeDeleted);
- filesToBeDeleted.removeAll(failedToDelete);
- if(!filesToBeDeleted.isEmpty()) {
- log.debug(
- "[{}] Following files have been removed from Lucene
index directory {}",
- indexPath, filesToBeDeleted);
- }
- }
-
- private class CORFileReference {
- final String name;
- private volatile boolean valid;
-
- private CORFileReference(String name) {
- this.name = name;
- }
-
- boolean isLocalValid(){
- return valid;
- }
-
- IndexInput openLocalInput( IOContext context) throws IOException {
- readerLocalReadCount.incrementAndGet();
- return local.openInput(name, context);
- }
-
- void markValid(){
- this.valid = true;
- localFileNames.add(name);
- }
- }
- }
-
- private class CopyOnWriteDirectory extends FilterDirectory {
- /**
- * Signal for the background thread to stop processing changes.
- */
- private final Callable<Void> STOP = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- return null;
- }
- };
- private final Directory remote;
- private final Directory local;
- private final ConcurrentMap<String, COWFileReference> fileMap =
newConcurrentMap();
- private final Set<String> deletedFilesLocal =
Sets.newConcurrentHashSet();
- private final Set<String> skippedFiles = Sets.newConcurrentHashSet();
-
- private final BlockingQueue<Callable<Void>> queue = new
LinkedBlockingQueue<Callable<Void>>();
- private final AtomicReference<Throwable> errorInCopy = new
AtomicReference<Throwable>();
- private final CountDownLatch copyDone = new CountDownLatch(1);
- private final boolean reindexMode;
- private final String indexPathForLogging;
- private final Set<String> sharedWorkingSet;
-
- /**
- * Current background task
- */
- private volatile NotifyingFutureTask currentTask =
NotifyingFutureTask.completed();
-
- /**
- * Completion handler: set the current task to the next task and
schedules that one
- * on the background thread.
- */
- private final Runnable completionHandler = new Runnable() {
- Callable<Void> task = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- try {
- Callable<Void> task = queue.poll();
- if (task != null && task != STOP) {
- if (errorInCopy.get() != null) {
- log.trace("[COW][{}] Skipping task {} as some
exception occurred in previous run",
- indexPathForLogging, task);
- } else {
- task.call();
- }
- currentTask.onComplete(completionHandler);
- }
-
- //Signal that all tasks completed
- if (task == STOP){
- copyDone.countDown();
- }
- } catch (Throwable t) {
- errorInCopy.set(t);
- log.debug("[COW][{}] Error occurred while copying
files. Further processing would " +
- "be skipped", indexPathForLogging, t);
- currentTask.onComplete(completionHandler);
- }
- return null;
- }
- };
-
- @Override
- public void run() {
- currentTask = new NotifyingFutureTask(task);
- try {
- executor.execute(currentTask);
- } catch (RejectedExecutionException e){
- checkIfClosed(false);
- throw e;
- }
- }
- };
-
- public CopyOnWriteDirectory(Directory remote, Directory local, boolean
reindexMode,
- String indexPathForLogging, Set<String>
sharedWorkingSet) throws IOException {
- super(local);
- this.remote = remote;
- this.local = local;
- this.indexPathForLogging = indexPathForLogging;
- this.reindexMode = reindexMode;
- this.sharedWorkingSet = sharedWorkingSet;
- initialize();
- }
-
- @Override
- public String[] listAll() throws IOException {
- return Iterables.toArray(fileMap.keySet(), String.class);
- }
-
- @Override
- public boolean fileExists(String name) throws IOException {
- return fileMap.containsKey(name);
- }
-
- @Override
- public void deleteFile(String name) throws IOException {
- log.trace("[COW][{}] Deleted file {}", indexPathForLogging, name);
- COWFileReference ref = fileMap.remove(name);
- if (ref != null) {
- ref.delete();
- }
- }
-
- @Override
- public long fileLength(String name) throws IOException {
- COWFileReference ref = fileMap.get(name);
- if (ref == null) {
- throw new FileNotFoundException(name);
- }
- return ref.fileLength();
- }
-
- @Override
- public IndexOutput createOutput(String name, IOContext context) throws
IOException {
- COWFileReference ref = fileMap.remove(name);
- if (ref != null) {
- ref.delete();
- }
- ref = new COWLocalFileReference(name);
- fileMap.put(name, ref);
- sharedWorkingSet.add(name);
- return ref.createOutput(context);
- }
-
- @Override
- public void sync(Collection<String> names) throws IOException {
- for (String name : names){
- COWFileReference file = fileMap.get(name);
- if (file != null){
- file.sync();
- }
- }
- }
-
- @Override
- public IndexInput openInput(String name, IOContext context) throws
IOException {
- COWFileReference ref = fileMap.get(name);
- if (ref == null) {
- throw new FileNotFoundException(name);
- }
- return ref.openInput(context);
- }
-
- @Override
- public void close() throws IOException {
- int pendingCopies = queue.size();
- addTask(STOP);
-
- //Wait for all pending copy task to finish
- try {
- long start = PERF_LOGGER.start();
-
- //Loop untill queue finished or IndexCopier
- //found to be closed. Doing it with timeout to
- //prevent any bug causing the thread to wait indefinitely
- while (!copyDone.await(10, TimeUnit.SECONDS)) {
- if (closed) {
- throw new IndexCopierClosedException("IndexCopier
found to be closed " +
- "while processing copy task for" +
remote.toString());
- }
- }
- PERF_LOGGER.end(start, -1, "[COW][{}] Completed pending
copying task {}", indexPathForLogging, pendingCopies);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
-
- Throwable t = errorInCopy.get();
- if (t != null){
- throw new IOException("Error occurred while copying files for
" + indexPathForLogging, t);
- }
-
- //Sanity check
- checkArgument(queue.isEmpty(), "Copy queue still " +
- "has pending task left [%d]. %s", queue.size(), queue);
-
- long skippedFilesSize = getSkippedFilesSize();
-
- for (String fileName : deletedFilesLocal){
- deleteLocalFile(fileName);
- }
-
- skippedFromUploadSize.addAndGet(skippedFilesSize);
-
- String msg = "[COW][{}] CopyOnWrite stats : Skipped copying {}
files with total size {}";
- if ((reindexMode && skippedFilesSize > 0) || skippedFilesSize > 10
* FileUtils.ONE_MB){
- log.info(msg, indexPathForLogging, skippedFiles.size(),
humanReadableByteCount(skippedFilesSize));
- } else {
- log.debug(msg,indexPathForLogging, skippedFiles.size(),
humanReadableByteCount(skippedFilesSize));
- }
-
- if (log.isTraceEnabled()){
- log.trace("[COW][{}] File listing - Upon completion {}",
indexPathForLogging, Arrays.toString(remote.listAll()));
- }
-
- local.close();
- remote.close();
- sharedWorkingSet.clear();
- }
-
- @Override
- public String toString() {
- return String.format("[COW][%s] Local %s, Remote %s",
indexPathForLogging, local, remote);
- }
-
- private long getSkippedFilesSize() {
- long size = 0;
- for (String name : skippedFiles){
- try{
- if (local.fileExists(name)){
- size += local.fileLength(name);
- }
- } catch (Exception ignore){
-
- }
- }
- return size;
- }
-
- private void deleteLocalFile(String fileName) {
- IndexCopier.this.deleteFile(local, fileName, false);
- }
-
- private void initialize() throws IOException {
- for (String name : remote.listAll()) {
- fileMap.put(name, new COWRemoteFileReference(name));
- }
-
- if (log.isTraceEnabled()){
- log.trace("[COW][{}] File listing - At start {}",
indexPathForLogging, Arrays.toString(remote.listAll()));
- }
- }
-
- private void addCopyTask(final String name){
- updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
- addTask(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- scheduledForCopyCount.decrementAndGet();
- if (deletedFilesLocal.contains(name)){
- skippedFiles.add(name);
- log.trace("[COW][{}] Skip copying of deleted file {}",
indexPathForLogging, name);
- return null;
- }
- long fileSize = local.fileLength(name);
- LocalIndexFile file = new LocalIndexFile(local, name,
fileSize, false);
- long perfStart = PERF_LOGGER.start();
- long start = startCopy(file);
-
- local.copy(remote, name, name, IOContext.DEFAULT);
-
- doneCopy(file, start);
- PERF_LOGGER.end(perfStart, 0, "[COW][{}] Copied to remote
{} -- size: {}",
- indexPathForLogging, name,
IOUtils.humanReadableByteCount(fileSize));
- return null;
- }
-
- @Override
- public String toString() {
- return "Copy: " + name;
- }
- });
- }
-
- private void addDeleteTask(final String name){
- addTask(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- if (!skippedFiles.contains(name)) {
- log.trace("[COW][{}] Marking as deleted {}",
indexPathForLogging, name);
- remote.deleteFile(name);
- }
- return null;
- }
-
- @Override
- public String toString() {
- return "Delete : " + name;
- }
- });
- }
-
- private void addTask(Callable<Void> task){
- checkIfClosed(true);
- queue.add(task);
- currentTask.onComplete(completionHandler);
- }
-
- private void checkIfClosed(boolean throwException) {
- if (closed) {
- IndexCopierClosedException e = new
IndexCopierClosedException("IndexCopier found to be closed " +
- "while processing" +remote.toString());
- errorInCopy.set(e);
- copyDone.countDown();
-
- if (throwException) {
- throw e;
- }
- }
- }
-
- private abstract class COWFileReference {
- protected final String name;
-
- public COWFileReference(String name) {
- this.name = name;
- }
-
- public abstract long fileLength() throws IOException;
-
- public abstract IndexInput openInput(IOContext context) throws
IOException;
-
- public abstract IndexOutput createOutput(IOContext context) throws
IOException;
-
- public abstract void delete() throws IOException;
-
- public void sync() throws IOException {
-
- }
- }
-
- private class COWRemoteFileReference extends COWFileReference {
- private boolean validLocalCopyPresent;
- private final long length;
-
- public COWRemoteFileReference(String name) throws IOException {
- super(name);
- this.length = remote.fileLength(name);
- }
-
- @Override
- public long fileLength() throws IOException {
- return length;
- }
-
- @Override
- public IndexInput openInput(IOContext context) throws IOException {
- checkIfLocalValid();
- if (validLocalCopyPresent && !REMOTE_ONLY.contains(name)) {
- writerLocalReadCount.incrementAndGet();
- return local.openInput(name, context);
- }
- writerRemoteReadCount.incrementAndGet();
- return remote.openInput(name, context);
- }
-
- @Override
- public IndexOutput createOutput(IOContext context) throws
IOException {
- throw new UnsupportedOperationException("Cannot create output
for existing remote file " + name);
- }
-
- @Override
- public void delete() throws IOException {
- //Remote file should not be deleted locally as it might be
- //in use by existing opened IndexSearcher. It would anyway
- //get deleted by CopyOnRead later
- //For now just record that these need to be deleted to avoid
- //potential concurrent access of the NodeBuilder
- addDeleteTask(name);
- }
-
- private void checkIfLocalValid() throws IOException {
- validLocalCopyPresent = local.fileExists(name)
- && local.fileLength(name) == remote.fileLength(name);
- }
- }
-
- private class COWLocalFileReference extends COWFileReference {
- public COWLocalFileReference(String name) {
- super(name);
- }
-
- @Override
- public long fileLength() throws IOException {
- return local.fileLength(name);
- }
-
- @Override
- public IndexInput openInput(IOContext context) throws IOException {
- return local.openInput(name, context);
- }
-
- @Override
- public IndexOutput createOutput(IOContext context) throws
IOException {
- log.debug("[COW][{}] Creating output {}", indexPathForLogging,
name);
- return new CopyOnCloseIndexOutput(local.createOutput(name,
context));
- }
-
- @Override
- public void delete() throws IOException {
- addDeleteTask(name);
- deletedFilesLocal.add(name);
- }
-
- @Override
- public void sync() throws IOException {
- local.sync(Collections.singleton(name));
- }
-
- /**
- * Implementation note - As we are decorating existing
implementation
- * we would need to ensure that we also override methods (non
abstract)
- * which might be implemented in say FSIndexInput like setLength
- */
- private class CopyOnCloseIndexOutput extends IndexOutput {
- private final IndexOutput delegate;
-
- public CopyOnCloseIndexOutput(IndexOutput delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public void flush() throws IOException {
- delegate.flush();
- }
-
- @Override
- public void close() throws IOException {
- delegate.close();
- //Schedule this file to be copied in background
- addCopyTask(name);
- }
-
- @Override
- public long getFilePointer() {
- return delegate.getFilePointer();
- }
-
- @Override
- public void seek(long pos) throws IOException {
- delegate.seek(pos);
- }
-
- @Override
- public long length() throws IOException {
- return delegate.length();
- }
-
- @Override
- public void writeByte(byte b) throws IOException {
- delegate.writeByte(b);
- }
-
- @Override
- public void writeBytes(byte[] b, int offset, int length)
throws IOException {
- delegate.writeBytes(b, offset, length);
- }
-
- @Override
- public void setLength(long length) throws IOException {
- delegate.setLength(length);
- }
- }
- }
- }
-
- private boolean deleteFile(Directory dir, String fileName, boolean
copiedFromRemote){
- LocalIndexFile file = new LocalIndexFile(dir, fileName,
getFileLength(dir, fileName), copiedFromRemote);
+ public boolean deleteFile(Directory dir, String fileName, boolean
copiedFromRemote){
+ LocalIndexFile file = new LocalIndexFile(dir, fileName,
DirectoryUtils.getFileLength(dir, fileName), copiedFromRemote);
boolean successFullyDeleted = false;
try {
boolean fileExisted = false;
@@ -1035,24 +268,24 @@ public class IndexCopier implements Copy
return successFullyDeleted;
}
- private long startCopy(LocalIndexFile file) {
+ public long startCopy(LocalIndexFile file) {
updateMaxInProgress(copyInProgressCount.incrementAndGet());
- copyInProgressSize.addAndGet(file.size);
+ copyInProgressSize.addAndGet(file.getSize());
copyInProgressFiles.add(file);
return System.currentTimeMillis();
}
- private void doneCopy(LocalIndexFile file, long start) {
+ public void doneCopy(LocalIndexFile file, long start) {
copyInProgressFiles.remove(file);
copyInProgressCount.decrementAndGet();
- copyInProgressSize.addAndGet(-file.size);
+ copyInProgressSize.addAndGet(-file.getSize());
- if(file.copyFromRemote) {
+ if(file.isCopyFromRemote()) {
downloadTime.addAndGet(System.currentTimeMillis() - start);
- downloadSize.addAndGet(file.size);
+ downloadSize.addAndGet(file.getSize());
downloadCount.incrementAndGet();
} else {
- uploadSize.addAndGet(file.size);
+ uploadSize.addAndGet(file.getSize());
uploadTime.addAndGet(System.currentTimeMillis() - start);
uploadCount.incrementAndGet();
}
@@ -1077,6 +310,7 @@ public class IndexCopier implements Copy
}
}
+
private class DeleteOldDirOnClose extends FilterDirectory {
private final File oldIndexDir;
@@ -1109,114 +343,39 @@ public class IndexCopier implements Copy
return "DeleteOldDirOnClose wrapper for " + getDelegate();
}
}
-
- static final class LocalIndexFile {
- final File dir;
- final String name;
- final long size;
- final boolean copyFromRemote;
- private volatile int deleteAttemptCount;
- final long creationTime = System.currentTimeMillis();
-
- public LocalIndexFile(Directory dir, String fileName,
- long size, boolean copyFromRemote){
- this.copyFromRemote = copyFromRemote;
- this.dir = getFSDir(dir);
- this.name = fileName;
- this.size = size;
- }
- public LocalIndexFile(Directory dir, String fileName){
- this(dir, fileName, getFileLength(dir, fileName), true);
- }
+ //~------------------------------------------< Stats Collection >
- public String getKey(){
- if (dir != null){
- return new File(dir, name).getAbsolutePath();
- }
- return name;
- }
-
- public void incrementAttemptToDelete(){
- deleteAttemptCount++;
- }
-
- public int getDeleteAttemptCount() {
- return deleteAttemptCount;
- }
-
- public String deleteLog(){
- return String.format("%s (%s, %d attempts, %d s)", name,
- humanReadableByteCount(size), deleteAttemptCount,
timeTaken());
- }
-
- public String copyLog(){
- return String.format("%s (%s, %1.1f%%, %s, %d s)", name,
- humanReadableByteCount(actualSize()),
- copyProgress(),
- humanReadableByteCount(size), timeTaken());
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- LocalIndexFile localIndexFile = (LocalIndexFile) o;
-
- if (dir != null ? !dir.equals(localIndexFile.dir) :
localIndexFile.dir != null)
- return false;
- return name.equals(localIndexFile.name);
-
- }
-
- @Override
- public int hashCode() {
- int result = dir != null ? dir.hashCode() : 0;
- result = 31 * result + name.hashCode();
- return result;
- }
-
- private long timeTaken(){
- return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()
- creationTime);
- }
+ public void skippedUpload(long skippedFilesSize) {
+ skippedFromUploadSize.addAndGet(skippedFilesSize);
+ }
- private float copyProgress(){
- return actualSize() * 1.0f / size * 100;
- }
+ public void scheduledForCopy() {
+ updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
+ }
- private long actualSize(){
- return dir != null ? new File(dir, name).length() : 0;
- }
+ public void copyDone(){
+ scheduledForCopyCount.decrementAndGet();
}
- static File getFSDir(Directory dir) {
- if (dir instanceof FilterDirectory){
- dir = ((FilterDirectory) dir).getDelegate();
+ public void readFromRemote(boolean reader) {
+ if (reader) {
+ readerRemoteReadCount.incrementAndGet();
+ } else {
+ writerRemoteReadCount.incrementAndGet();
}
+ }
- if (dir instanceof FSDirectory){
- return ((FSDirectory) dir).getDirectory();
+ public void readFromLocal(boolean reader) {
+ if (reader) {
+ readerLocalReadCount.incrementAndGet();
+ } else {
+ writerLocalReadCount.incrementAndGet();
}
-
- return null;
}
- /**
- * Get the file length in best effort basis.
- * @return actual fileLength. -1 if cannot determine
- */
- private static long getFileLength(Directory dir, String fileName){
- try{
- //Check for file presence otherwise internally it results in
- //an exception to be created
- if (dir.fileExists(fileName)) {
- return dir.fileLength(fileName);
- }
- } catch (Exception ignore){
-
- }
- return -1;
+ public void foundInvalidFile(){
+ invalidFileCount.incrementAndGet();
}
//~------------------------------------------< CopyOnReadStatsMBean >
@@ -1321,7 +480,7 @@ public class IndexCopier implements Copy
public String getGarbageSize() {
long garbageSize = 0;
for (LocalIndexFile failedToDeleteFile : failedToDeleteFiles.values()){
- garbageSize += failedToDeleteFile.size;
+ garbageSize += failedToDeleteFile.getSize();
}
return humanReadableByteCount(garbageSize);
}
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java?rev=1768450&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
(added)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
Mon Nov 7 08:46:13 2016
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.jackrabbit.oak.util.PerfLogger;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.Maps.newConcurrentMap;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+
+/**
+ * Directory implementation which lazily copies the index files from a
+ * remote directory in background.
+ */
+public class CopyOnReadDirectory extends FilterDirectory {
+ private static final Logger log =
LoggerFactory.getLogger(CopyOnReadDirectory.class);
+ private static final PerfLogger PERF_LOGGER = new
PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf"));
+ private final IndexCopier indexCopier;
+ private final Directory remote;
+ private final Directory local;
+ private final String indexPath;
+ private final Executor executor;
+
+ private final ConcurrentMap<String, CORFileReference> files =
newConcurrentMap();
+ /**
+ * Set of fileNames bound to current local dir. It is updated with any new
file
+ * which gets added by this directory
+ */
+ private final Set<String> localFileNames = Sets.newConcurrentHashSet();
+
+ public CopyOnReadDirectory(IndexCopier indexCopier, Directory remote,
Directory local, boolean prefetch,
+ String indexPath, Set<String> sharedWorkingSet,
Executor executor) throws IOException {
+ super(remote);
+ this.indexCopier = indexCopier;
+ this.executor = executor;
+ this.remote = remote;
+ this.local = local;
+ this.indexPath = indexPath;
+
+ this.localFileNames.addAll(Arrays.asList(local.listAll()));
+ //Remove files which are being worked upon by COW
+ this.localFileNames.removeAll(sharedWorkingSet);
+
+ if (prefetch) {
+ prefetchIndexFiles();
+ }
+ }
+
+ @Override
+ public void deleteFile(String name) throws IOException {
+ throw new UnsupportedOperationException("Cannot delete in a ReadOnly
directory");
+ }
+
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws
IOException {
+ throw new UnsupportedOperationException("Cannot write in a ReadOnly
directory");
+ }
+
+ @Override
+ public IndexInput openInput(String name, IOContext context) throws
IOException {
+ if (IndexCopier.REMOTE_ONLY.contains(name)) {
+ log.trace("[{}] opening remote only file {}", indexPath, name);
+ return remote.openInput(name, context);
+ }
+
+ CORFileReference ref = files.get(name);
+ if (ref != null) {
+ if (ref.isLocalValid()) {
+ log.trace("[{}] opening existing local file {}", indexPath,
name);
+ return files.get(name).openLocalInput(context);
+ } else {
+ indexCopier.readFromRemote(true);
+ log.trace(
+ "[{}] opening existing remote file as local version is
not valid {}",
+ indexPath, name);
+ return remote.openInput(name, context);
+ }
+ }
+
+ //If file does not exist then just delegate to remote and not
+ //schedule a copy task
+ if (!remote.fileExists(name)){
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Looking for non existent file {}. Current
known files {}",
+ indexPath, name, Arrays.toString(remote.listAll()));
+ }
+ return remote.openInput(name, context);
+ }
+
+ CORFileReference toPut = new CORFileReference(name);
+ CORFileReference old = files.putIfAbsent(name, toPut);
+ if (old == null) {
+ log.trace("[{}] scheduled local copy for {}", indexPath, name);
+ copy(toPut);
+ }
+
+ //If immediate executor is used the result would be ready right away
+ if (toPut.isLocalValid()) {
+ log.trace("[{}] opening new local file {}", indexPath, name);
+ return toPut.openLocalInput(context);
+ }
+
+ log.trace("[{}] opening new remote file {}", indexPath, name);
+ indexCopier.readFromRemote(true);
+ return remote.openInput(name, context);
+ }
+
+ public Directory getLocal() {
+ return local;
+ }
+
+ private void copy(final CORFileReference reference) {
+ indexCopier.scheduledForCopy();
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ indexCopier.copyDone();
+ copyFilesToLocal(reference, true, true);
+ }
+ });
+ }
+
+ private void prefetchIndexFiles() throws IOException {
+ long start = PERF_LOGGER.start();
+ long totalSize = 0;
+ int copyCount = 0;
+ List<String> copiedFileNames = Lists.newArrayList();
+ for (String name : remote.listAll()) {
+ if (IndexCopier.REMOTE_ONLY.contains(name)) {
+ continue;
+ }
+ CORFileReference fileRef = new CORFileReference(name);
+ files.putIfAbsent(name, fileRef);
+ long fileSize = copyFilesToLocal(fileRef, false, false);
+ if (fileSize > 0) {
+ copyCount++;
+ totalSize += fileSize;
+ copiedFileNames.add(name);
+ }
+ }
+
+ local.sync(copiedFileNames);
+ PERF_LOGGER.end(start, -1, "[{}] Copied {} files totaling {}",
indexPath, copyCount, humanReadableByteCount(totalSize));
+ }
+
+ private long copyFilesToLocal(CORFileReference reference, boolean sync,
boolean logDuration) {
+ String name = reference.name;
+ boolean success = false;
+ boolean copyAttempted = false;
+ long fileSize = 0;
+ try {
+ if (!local.fileExists(name)) {
+ long perfStart = -1;
+ if (logDuration) {
+ perfStart = PERF_LOGGER.start();
+ }
+
+ fileSize = remote.fileLength(name);
+ LocalIndexFile file = new LocalIndexFile(local, name,
fileSize, true);
+ long start = indexCopier.startCopy(file);
+ copyAttempted = true;
+
+ remote.copy(local, name, name, IOContext.READ);
+ reference.markValid();
+
+ if (sync) {
+ local.sync(Collections.singleton(name));
+ }
+
+ indexCopier.doneCopy(file, start);
+ if (logDuration) {
+ PERF_LOGGER.end(perfStart, 0,
+ "[{}] Copied file {} of size {}", indexPath,
+ name, humanReadableByteCount(fileSize));
+ }
+ } else {
+ long localLength = local.fileLength(name);
+ long remoteLength = remote.fileLength(name);
+
+ //Do a simple consistency check. Ideally Lucene index files
are never
+ //updated but still do a check if the copy is consistent
+ if (localLength != remoteLength) {
+ log.warn("[{}] Found local copy for {} in {} but size of
local {} differs from remote {}. " +
+ "Content would be read from remote file
only",
+ indexPath, name, local, localLength, remoteLength);
+ indexCopier.foundInvalidFile();
+ } else {
+ reference.markValid();
+ log.trace("[{}] found local copy of file {}",
+ indexPath, name);
+ }
+ }
+ success = true;
+ } catch (IOException e) {
+ //TODO In case of exception there would not be any other attempt
+ //to download the file. Look into support for retry
+ log.warn("[{}] Error occurred while copying file [{}] from {} to
{}", indexPath, name, remote, local, e);
+ } finally {
+ if (copyAttempted && !success){
+ try {
+ if (local.fileExists(name)) {
+ local.deleteFile(name);
+ }
+ } catch (IOException e) {
+ log.warn("[{}] Error occurred while deleting corrupted
file [{}] from [{}]", indexPath, name, local, e);
+ }
+ }
+ }
+ return fileSize;
+ }
+
+ /**
+ * On close file which are not present in remote are removed from local.
+ * CopyOnReadDir is opened at different revisions of the index state
+ *
+ * CDir1 - V1
+ * CDir2 - V2
+ *
+ * Its possible that two different IndexSearcher are opened at same local
+ * directory but pinned to different revisions. So while removing it must
+ * be ensured that any currently opened IndexSearcher does not get
affected.
+ * The way IndexSearchers get created in IndexTracker it ensures that new
searcher
+ * pinned to newer revision gets opened first and then existing ones are
closed.
+ *
+ *
+ */
+ @Override
+ public void close() throws IOException {
+ //Always remove old index file on close as it ensures that
+ //no other IndexSearcher are opened with previous revision of Index
due to
+ //way IndexTracker closes IndexNode. At max there would be only two
IndexNode
+ //opened pinned to different revision of same Lucene index
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try{
+ removeDeletedFiles();
+ } catch (IOException e) {
+ log.warn(
+ "[{}] Error occurred while removing deleted files
from Local {}, Remote {}",
+ indexPath, local, remote, e);
+ }
+
+ try {
+ //This would also remove old index files if current
+ //directory was based on newerRevision as local would
+ //be of type DeleteOldDirOnClose
+ local.close();
+ remote.close();
+ } catch (IOException e) {
+ log.warn(
+ "[{}] Error occurred while closing directory ",
+ indexPath, e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[COR] Local %s, Remote %s", local, remote);
+ }
+
+ private void removeDeletedFiles() throws IOException {
+ //Files present in dest but not present in source have to be deleted
+ Set<String> filesToBeDeleted = Sets.difference(
+ ImmutableSet.copyOf(localFileNames),
+ ImmutableSet.copyOf(remote.listAll())
+ );
+
+ Set<String> failedToDelete = Sets.newHashSet();
+
+ for (String fileName : filesToBeDeleted) {
+ boolean deleted = indexCopier.deleteFile(local, fileName, true);
+ if (!deleted){
+ failedToDelete.add(fileName);
+ }
+ }
+
+ filesToBeDeleted = new HashSet<String>(filesToBeDeleted);
+ filesToBeDeleted.removeAll(failedToDelete);
+ if(!filesToBeDeleted.isEmpty()) {
+ log.debug(
+ "[{}] Following files have been removed from Lucene index
directory {}",
+ indexPath, filesToBeDeleted);
+ }
+ }
+
+ private class CORFileReference {
+ final String name;
+ private volatile boolean valid;
+
+ private CORFileReference(String name) {
+ this.name = name;
+ }
+
+ boolean isLocalValid(){
+ return valid;
+ }
+
+ IndexInput openLocalInput( IOContext context) throws IOException {
+ indexCopier.readFromLocal(true);
+ return local.openInput(name, context);
+ }
+
+ void markValid(){
+ this.valid = true;
+ localFileNames.add(name);
+ }
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java?rev=1768450&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
(added)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
Mon Nov 7 08:46:13 2016
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import
org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopierClosedException;
+import org.apache.jackrabbit.oak.util.PerfLogger;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Maps.newConcurrentMap;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+
+public class CopyOnWriteDirectory extends FilterDirectory {
+ private static final Logger log =
LoggerFactory.getLogger(CopyOnWriteDirectory.class);
+ private static final PerfLogger PERF_LOGGER = new
PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf"));
+ private IndexCopier indexCopier;
+ /**
+ * Signal for the background thread to stop processing changes.
+ */
+ private final Callable<Void> STOP = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ return null;
+ }
+ };
+ private final Directory remote;
+ private final Directory local;
+ private final Executor executor;
+ private final ConcurrentMap<String, COWFileReference> fileMap =
newConcurrentMap();
+ private final Set<String> deletedFilesLocal = Sets.newConcurrentHashSet();
+ private final Set<String> skippedFiles = Sets.newConcurrentHashSet();
+
+ private final BlockingQueue<Callable<Void>> queue = new
LinkedBlockingQueue<Callable<Void>>();
+ private final AtomicReference<Throwable> errorInCopy = new
AtomicReference<Throwable>();
+ private final CountDownLatch copyDone = new CountDownLatch(1);
+ private final boolean reindexMode;
+ private final String indexPathForLogging;
+ private final Set<String> sharedWorkingSet;
+
+ /**
+ * Current background task
+ */
+ private volatile NotifyingFutureTask currentTask =
NotifyingFutureTask.completed();
+
+ /**
+ * Completion handler: set the current task to the next task and schedules
that one
+ * on the background thread.
+ */
+ private final Runnable completionHandler = new Runnable() {
+ Callable<Void> task = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ Callable<Void> task = queue.poll();
+ if (task != null && task != STOP) {
+ if (errorInCopy.get() != null) {
+ log.trace("[COW][{}] Skipping task {} as some
exception occurred in previous run",
+ indexPathForLogging, task);
+ } else {
+ task.call();
+ }
+ currentTask.onComplete(completionHandler);
+ }
+
+ //Signal that all tasks completed
+ if (task == STOP){
+ copyDone.countDown();
+ }
+ } catch (Throwable t) {
+ errorInCopy.set(t);
+ log.debug("[COW][{}] Error occurred while copying files.
Further processing would " +
+ "be skipped", indexPathForLogging, t);
+ currentTask.onComplete(completionHandler);
+ }
+ return null;
+ }
+ };
+
+ @Override
+ public void run() {
+ currentTask = new NotifyingFutureTask(task);
+ try {
+ executor.execute(currentTask);
+ } catch (RejectedExecutionException e){
+ checkIfClosed(false);
+ throw e;
+ }
+ }
+ };
+
+ public CopyOnWriteDirectory(IndexCopier indexCopier, Directory remote,
Directory local, boolean reindexMode,
+ String indexPathForLogging, Set<String>
sharedWorkingSet, Executor executor) throws
+ IOException {
+ super(local);
+ this.indexCopier = indexCopier;
+ this.remote = remote;
+ this.local = local;
+ this.executor = executor;
+ this.indexPathForLogging = indexPathForLogging;
+ this.reindexMode = reindexMode;
+ this.sharedWorkingSet = sharedWorkingSet;
+ initialize();
+ }
+
+ @Override
+ public String[] listAll() throws IOException {
+ return Iterables.toArray(fileMap.keySet(), String.class);
+ }
+
+ @Override
+ public boolean fileExists(String name) throws IOException {
+ return fileMap.containsKey(name);
+ }
+
+ @Override
+ public void deleteFile(String name) throws IOException {
+ log.trace("[COW][{}] Deleted file {}", indexPathForLogging, name);
+ COWFileReference ref = fileMap.remove(name);
+ if (ref != null) {
+ ref.delete();
+ }
+ }
+
+ @Override
+ public long fileLength(String name) throws IOException {
+ COWFileReference ref = fileMap.get(name);
+ if (ref == null) {
+ throw new FileNotFoundException(name);
+ }
+ return ref.fileLength();
+ }
+
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws
IOException {
+ COWFileReference ref = fileMap.remove(name);
+ if (ref != null) {
+ ref.delete();
+ }
+ ref = new COWLocalFileReference(name);
+ fileMap.put(name, ref);
+ sharedWorkingSet.add(name);
+ return ref.createOutput(context);
+ }
+
+ @Override
+ public void sync(Collection<String> names) throws IOException {
+ for (String name : names){
+ COWFileReference file = fileMap.get(name);
+ if (file != null){
+ file.sync();
+ }
+ }
+ }
+
+ @Override
+ public IndexInput openInput(String name, IOContext context) throws
IOException {
+ COWFileReference ref = fileMap.get(name);
+ if (ref == null) {
+ throw new FileNotFoundException(name);
+ }
+ return ref.openInput(context);
+ }
+
+ @Override
+ public void close() throws IOException {
+ int pendingCopies = queue.size();
+ addTask(STOP);
+
+ //Wait for all pending copy task to finish
+ try {
+ long start = PERF_LOGGER.start();
+
+ //Loop untill queue finished or IndexCopier
+ //found to be closed. Doing it with timeout to
+ //prevent any bug causing the thread to wait indefinitely
+ while (!copyDone.await(10, TimeUnit.SECONDS)) {
+ if (indexCopier.isClosed()) {
+ throw new IndexCopierClosedException("IndexCopier found to
be closed " +
+ "while processing copy task for" +
remote.toString());
+ }
+ }
+ PERF_LOGGER.end(start, -1, "[COW][{}] Completed pending copying
task {}", indexPathForLogging, pendingCopies);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+
+ Throwable t = errorInCopy.get();
+ if (t != null){
+ throw new IOException("Error occurred while copying files for " +
indexPathForLogging, t);
+ }
+
+ //Sanity check
+ checkArgument(queue.isEmpty(), "Copy queue still " +
+ "has pending task left [%d]. %s", queue.size(), queue);
+
+ long skippedFilesSize = getSkippedFilesSize();
+
+ for (String fileName : deletedFilesLocal){
+ deleteLocalFile(fileName);
+ }
+
+ indexCopier.skippedUpload(skippedFilesSize);
+
+ String msg = "[COW][{}] CopyOnWrite stats : Skipped copying {} files
with total size {}";
+ if ((reindexMode && skippedFilesSize > 0) || skippedFilesSize > 10 *
FileUtils.ONE_MB){
+ log.info(msg, indexPathForLogging, skippedFiles.size(),
humanReadableByteCount(skippedFilesSize));
+ } else {
+ log.debug(msg,indexPathForLogging, skippedFiles.size(),
humanReadableByteCount(skippedFilesSize));
+ }
+
+ if (log.isTraceEnabled()){
+ log.trace("[COW][{}] File listing - Upon completion {}",
indexPathForLogging, Arrays.toString(remote.listAll()));
+ }
+
+ local.close();
+ remote.close();
+ sharedWorkingSet.clear();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[COW][%s] Local %s, Remote %s",
indexPathForLogging, local, remote);
+ }
+
+ private long getSkippedFilesSize() {
+ long size = 0;
+ for (String name : skippedFiles){
+ try{
+ if (local.fileExists(name)){
+ size += local.fileLength(name);
+ }
+ } catch (Exception ignore){
+
+ }
+ }
+ return size;
+ }
+
+ private void deleteLocalFile(String fileName) {
+ indexCopier.deleteFile(local, fileName, false);
+ }
+
+ private void initialize() throws IOException {
+ for (String name : remote.listAll()) {
+ fileMap.put(name, new COWRemoteFileReference(name));
+ }
+
+ if (log.isTraceEnabled()){
+ log.trace("[COW][{}] File listing - At start {}",
indexPathForLogging, Arrays.toString(remote.listAll()));
+ }
+ }
+
+ private void addCopyTask(final String name){
+ indexCopier.scheduledForCopy();
+ addTask(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ indexCopier.copyDone();
+ if (deletedFilesLocal.contains(name)){
+ skippedFiles.add(name);
+ log.trace("[COW][{}] Skip copying of deleted file {}",
indexPathForLogging, name);
+ return null;
+ }
+ long fileSize = local.fileLength(name);
+ LocalIndexFile file = new LocalIndexFile(local, name,
fileSize, false);
+ long perfStart = PERF_LOGGER.start();
+ long start = indexCopier.startCopy(file);
+
+ local.copy(remote, name, name, IOContext.DEFAULT);
+
+ indexCopier.doneCopy(file, start);
+ PERF_LOGGER.end(perfStart, 0, "[COW][{}] Copied to remote {}
-- size: {}",
+ indexPathForLogging, name,
IOUtils.humanReadableByteCount(fileSize));
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "Copy: " + name;
+ }
+ });
+ }
+
+ private void addDeleteTask(final String name){
+ addTask(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (!skippedFiles.contains(name)) {
+ log.trace("[COW][{}] Marking as deleted {}",
indexPathForLogging, name);
+ remote.deleteFile(name);
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "Delete : " + name;
+ }
+ });
+ }
+
+ private void addTask(Callable<Void> task){
+ checkIfClosed(true);
+ queue.add(task);
+ currentTask.onComplete(completionHandler);
+ }
+
+ private void checkIfClosed(boolean throwException) {
+ if (indexCopier.isClosed()) {
+ IndexCopierClosedException e = new
IndexCopierClosedException("IndexCopier found to be closed " +
+ "while processing" +remote.toString());
+ errorInCopy.set(e);
+ copyDone.countDown();
+
+ if (throwException) {
+ throw e;
+ }
+ }
+ }
+
+ private abstract class COWFileReference {
+ protected final String name;
+
+ public COWFileReference(String name) {
+ this.name = name;
+ }
+
+ public abstract long fileLength() throws IOException;
+
+ public abstract IndexInput openInput(IOContext context) throws
IOException;
+
+ public abstract IndexOutput createOutput(IOContext context) throws
IOException;
+
+ public abstract void delete() throws IOException;
+
+ public void sync() throws IOException {
+
+ }
+ }
+
+ private class COWRemoteFileReference extends COWFileReference {
+ private boolean validLocalCopyPresent;
+ private final long length;
+
+ public COWRemoteFileReference(String name) throws IOException {
+ super(name);
+ this.length = remote.fileLength(name);
+ }
+
+ @Override
+ public long fileLength() throws IOException {
+ return length;
+ }
+
+ @Override
+ public IndexInput openInput(IOContext context) throws IOException {
+ checkIfLocalValid();
+ if (validLocalCopyPresent &&
!IndexCopier.REMOTE_ONLY.contains(name)) {
+ indexCopier.readFromLocal(false);
+ return local.openInput(name, context);
+ }
+ indexCopier.readFromRemote(false);
+ return remote.openInput(name, context);
+ }
+
+ @Override
+ public IndexOutput createOutput(IOContext context) throws IOException {
+ throw new UnsupportedOperationException("Cannot create output for
existing remote file " + name);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ //Remote file should not be deleted locally as it might be
+ //in use by existing opened IndexSearcher. It would anyway
+ //get deleted by CopyOnRead later
+ //For now just record that these need to be deleted to avoid
+ //potential concurrent access of the NodeBuilder
+ addDeleteTask(name);
+ }
+
+ private void checkIfLocalValid() throws IOException {
+ validLocalCopyPresent = local.fileExists(name)
+ && local.fileLength(name) == remote.fileLength(name);
+ }
+ }
+
+ private class COWLocalFileReference extends COWFileReference {
+ public COWLocalFileReference(String name) {
+ super(name);
+ }
+
+ @Override
+ public long fileLength() throws IOException {
+ return local.fileLength(name);
+ }
+
+ @Override
+ public IndexInput openInput(IOContext context) throws IOException {
+ return local.openInput(name, context);
+ }
+
+ @Override
+ public IndexOutput createOutput(IOContext context) throws IOException {
+ log.debug("[COW][{}] Creating output {}", indexPathForLogging,
name);
+ return new
COWLocalFileReference.CopyOnCloseIndexOutput(local.createOutput(name, context));
+ }
+
+ @Override
+ public void delete() throws IOException {
+ addDeleteTask(name);
+ deletedFilesLocal.add(name);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ local.sync(Collections.singleton(name));
+ }
+
+ /**
+ * Implementation note - As we are decorating existing implementation
+ * we would need to ensure that we also override methods (non abstract)
+ * which might be implemented in say FSIndexInput like setLength
+ */
+ private class CopyOnCloseIndexOutput extends IndexOutput {
+ private final IndexOutput delegate;
+
+ public CopyOnCloseIndexOutput(IndexOutput delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ //Schedule this file to be copied in background
+ addCopyTask(name);
+ }
+
+ @Override
+ public long getFilePointer() {
+ return delegate.getFilePointer();
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ delegate.seek(pos);
+ }
+
+ @Override
+ public long length() throws IOException {
+ return delegate.length();
+ }
+
+ @Override
+ public void writeByte(byte b) throws IOException {
+ delegate.writeByte(b);
+ }
+
+ @Override
+ public void writeBytes(byte[] b, int offset, int length) throws
IOException {
+ delegate.writeBytes(b, offset, length);
+ }
+
+ @Override
+ public void setLength(long length) throws IOException {
+ delegate.setLength(length);
+ }
+ }
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java?rev=1768450&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java
(added)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java
Mon Nov 7 08:46:13 2016
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import org.apache.lucene.store.Directory;
+
+public class DirectoryUtils {
+ /**
+ * Get the file length in best effort basis.
+ * @return actual fileLength. -1 if cannot determine
+ */
+ public static long getFileLength(Directory dir, String fileName){
+ try{
+ //Check for file presence otherwise internally it results in
+ //an exception to be created
+ if (dir.fileExists(fileName)) {
+ return dir.fileLength(fileName);
+ }
+ } catch (Exception ignore){
+
+ }
+ return -1;
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/DirectoryUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java?rev=1768450&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
(added)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
Mon Nov 7 08:46:13 2016
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
+
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+
+public final class LocalIndexFile {
+ final File dir;
+ final String name;
+ final long size;
+ final boolean copyFromRemote;
+ private volatile int deleteAttemptCount;
+ final long creationTime = System.currentTimeMillis();
+
+ public LocalIndexFile(Directory dir, String fileName,
+ long size, boolean copyFromRemote){
+ this.copyFromRemote = copyFromRemote;
+ this.dir = getFSDir(dir);
+ this.name = fileName;
+ this.size = size;
+ }
+
+ public LocalIndexFile(Directory dir, String fileName){
+ this(dir, fileName, DirectoryUtils.getFileLength(dir, fileName), true);
+ }
+
+ public String getKey(){
+ if (dir != null){
+ return new File(dir, name).getAbsolutePath();
+ }
+ return name;
+ }
+
+ public boolean isCopyFromRemote() {
+ return copyFromRemote;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public void incrementAttemptToDelete(){
+ deleteAttemptCount++;
+ }
+
+ public int getDeleteAttemptCount() {
+ return deleteAttemptCount;
+ }
+
+ public String deleteLog(){
+ return String.format("%s (%s, %d attempts, %d s)", name,
+ humanReadableByteCount(size), deleteAttemptCount, timeTaken());
+ }
+
+ public String copyLog(){
+ return String.format("%s (%s, %1.1f%%, %s, %d s)", name,
+ humanReadableByteCount(actualSize()),
+ copyProgress(),
+ humanReadableByteCount(size), timeTaken());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LocalIndexFile localIndexFile = (LocalIndexFile) o;
+
+ if (dir != null ? !dir.equals(localIndexFile.dir) : localIndexFile.dir
!= null)
+ return false;
+ return name.equals(localIndexFile.name);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = dir != null ? dir.hashCode() : 0;
+ result = 31 * result + name.hashCode();
+ return result;
+ }
+
+ private long timeTaken(){
+ return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
creationTime);
+ }
+
+ private float copyProgress(){
+ return actualSize() * 1.0f / size * 100;
+ }
+
+ private long actualSize(){
+ return dir != null ? new File(dir, name).length() : 0;
+ }
+
+ static File getFSDir(Directory dir) {
+ if (dir instanceof FilterDirectory){
+ dir = ((FilterDirectory) dir).getDelegate();
+ }
+
+ if (dir instanceof FSDirectory){
+ return ((FSDirectory) dir).getDirectory();
+ }
+
+ return null;
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1768450&r1=1768449&r2=1768450&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
Mon Nov 7 08:46:13 2016
@@ -52,6 +52,7 @@ import com.google.common.util.concurrent
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexFile;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.lucene.store.Directory;
@@ -521,7 +522,7 @@ public class IndexCopierTest {
w2.close();
assertEquals(1, c1.getFailedToDeleteFiles().size());
- IndexCopier.LocalIndexFile testFile =
c1.getFailedToDeleteFiles().values().iterator().next();
+ LocalIndexFile testFile =
c1.getFailedToDeleteFiles().values().iterator().next();
assertEquals(1, testFile.getDeleteAttemptCount());
assertEquals(IOUtils.humanReadableByteCount(t1.length),
c1.getGarbageSize());
Modified:
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java?rev=1768450&r1=1768449&r2=1768450&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
Mon Nov 7 08:46:13 2016
@@ -107,6 +107,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText;
import
org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText.ExtractionResult;
import
org.apache.jackrabbit.oak.plugins.index.fulltext.PreExtractedTextProvider;
+import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory;
import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider;
import
org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
@@ -196,7 +197,7 @@ public class LucenePropertyIndexTest ext
}
private String getFSDirPath(Directory dir){
- if (dir instanceof IndexCopier.CopyOnReadDirectory){
+ if (dir instanceof CopyOnReadDirectory){
dir = ((CopyOnReadDirectory) dir).getLocal();
}