Author: catholicon
Date: Tue Jul 24 11:59:40 2018
New Revision: 1836548

URL: http://svn.apache.org/viewvc?rev=1836548&view=rev
Log:
OAK-7246: Improve cleanup of locally copied index files

Implement what was discussed except for s/creation time/modified time/

Update IndexCopierTest to use FSDirectory for CoR and CoW and also added
synthetic update of last modified timestamp delayes to simulate reality
without really sleep (this test would still pass with old impl... except
for constants of course)

Added IndexCopierCleanupTest to test new logic.

Added:
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1836548&r1=1836547&r2=1836548&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
 Tue Jul 24 11:59:40 2018
@@ -70,7 +70,7 @@ public class IndexCopier implements Copy
     private static final int MAX_FAILURE_ENTRIES = 10000;
     private static final String WORK_DIR_NAME = "indexWriterDir";
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    private static final Logger log = 
LoggerFactory.getLogger(IndexCopier.class);
     private final Executor executor;
     private final File indexWorkDir;
 
@@ -95,7 +95,6 @@ public class IndexCopier implements Copy
     private final AtomicLong uploadTime = new AtomicLong();
 
 
-    private final Map<String, Set<String>> sharedWorkingSetMap = newHashMap();
     private final Map<String, String> indexPathVersionMapping = 
newConcurrentMap();
     private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = 
newConcurrentMap();
     private final Set<LocalIndexFile> copyInProgressFiles = 
Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
@@ -174,18 +173,6 @@ public class IndexCopier implements Copy
         return indexRootDirectory.getIndexDir(definition, indexPath, dirName);
     }
 
-    public void addIndexFileBeingWritten(String indexPath, String name) {
-        getSharedWorkingSet(indexPath).add(name);
-    }
-
-    public void clearIndexFilesBeingWritten(String indexPath) {
-        getSharedWorkingSet(indexPath).clear();
-    }
-
-    public Set<String> getIndexFilesBeingWritten(String indexPath) {
-        return getSharedWorkingSet(indexPath);
-    }
-
     Map<String, LocalIndexFile> getFailedToDeleteFiles() {
         return Collections.unmodifiableMap(failedToDeleteFiles);
     }
@@ -216,26 +203,6 @@ public class IndexCopier implements Copy
         }
     }
 
-    /**
-     * Provide the corresponding shared state to enable COW inform COR
-     * about new files it is creating while indexing. This would allow COR to 
ignore
-     * such files while determining the deletion candidates.
-     *
-     * @param defn index definition for which the directory is being created
-     * @return a set to maintain the state of new files being created by the 
COW Directory
-     */
-    private Set<String> getSharedWorkingSet(String indexPath){
-        Set<String> sharedSet;
-        synchronized (sharedWorkingSetMap){
-            sharedSet = sharedWorkingSetMap.get(indexPath);
-            if (sharedSet == null){
-                sharedSet = Sets.newConcurrentHashSet();
-                sharedWorkingSetMap.put(indexPath, sharedSet);
-            }
-        }
-        return sharedSet;
-    }
-
     private void checkIntegrity(String indexPath, Directory local, Directory 
remote) throws IOException {
         if (validatedIndexPaths.contains(indexPath)){
             return;
@@ -291,6 +258,77 @@ public class IndexCopier implements Copy
         return successFullyDeleted;
     }
 
+    /**
+     * This method would return the latest modification timestamp from the set 
of file{@code names}
+     * on the file system.
+     * The parameter {@code localDir} is expected to be an instance of {@link 
FSDirectory} (or wrapped one in
+     * {@link FilterDirectory}. If this assumption doesn't hold, the method 
would return -1.
+     * Each of file names are expected to be existing in {@code localDir}. If 
this fails the method shall return -1.
+     * In case of any error while computing modified timestamps on the file 
system, the method shall return -1.
+     * @param names file names to evaluate on local FS
+     * @param localDir {@link Directory} implementation to be used to get the 
files
+     * @return latest timestamp or -1 (with logs) in case of any doubt
+     */
+    public static long getNewestLocalFSTimestampFor(Set<String> names, 
Directory localDir) {
+        File localFSDir = LocalIndexFile.getFSDir(localDir);
+
+        if (localFSDir == null) {
+            log.warn("Couldn't get FSDirectory instance for {}.", localDir);
+            return -1;
+        }
+
+        long maxTS = 0L;
+        for (String  name : names) {
+            File f = new File(localFSDir, name);
+
+            if (!f.exists()) {
+                log.warn("File {} doesn't exist in {}", name, localFSDir);
+                return -1;
+            }
+
+            long modTS = f.lastModified();
+            if (modTS == 0L) {
+                log.warn("Couldn't get lastModification timestamp for {} in 
{}", name, localFSDir);
+                return -1;
+            }
+
+            if (modTS > maxTS) {
+                maxTS  = modTS;
+            }
+        }
+
+        return maxTS;
+    }
+
+    /**
+     * @param name file name to evaluate on local FS
+     * @param localDir {@link Directory} implementation to be used to get the 
file
+     * @param millis timestamp to compare file's modified timestamp against
+     * @return {@code true} if file referred to be {@code name} is modified 
before {@code millis}; false otherwise
+     */
+    public static boolean isFileModifiedBefore(String name, Directory 
localDir, long millis) {
+        File localFSDir = LocalIndexFile.getFSDir(localDir);
+
+        if (localFSDir == null) {
+            log.warn("Couldn't get FSDirectory instance for {}.", localDir);
+            return false;
+        }
+
+        File f = new File(localFSDir, name);
+        if (!f.exists()) {
+            log.warn("File {} doesn't exist in {}", name, localFSDir);
+            return false;
+        }
+
+        long modTS = f.lastModified();
+        if (modTS == 0L) {
+            log.warn("Couldn't get lastModification timestamp for {} in {}", 
name, localFSDir);
+            return false;
+        }
+
+        return modTS < millis;
+    }
+
     public long startCopy(LocalIndexFile file) {
         updateMaxInProgress(copyInProgressCount.incrementAndGet());
         copyInProgressSize.addAndGet(file.getSize());

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java?rev=1836548&r1=1836547&r2=1836548&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
 Tue Jul 24 11:59:40 2018
@@ -27,7 +27,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -53,6 +55,10 @@ import static org.apache.jackrabbit.oak.
 public class CopyOnReadDirectory extends FilterDirectory {
     private static final Logger log = 
LoggerFactory.getLogger(CopyOnReadDirectory.class);
     private static final PerfLogger PERF_LOGGER = new 
PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf"));
+
+    public static final String DELETE_MARGIN_MILLIS_NAME = 
"oak.lucene.delete.margin";
+    public final long DELETE_MARGIN_MILLIS = 
Long.getLong(DELETE_MARGIN_MILLIS_NAME, TimeUnit.MINUTES.toMillis(5));
+
     private final IndexCopier indexCopier;
     private final Directory remote;
     private final Directory local;
@@ -61,11 +67,6 @@ public class CopyOnReadDirectory extends
     private final AtomicBoolean closed = new AtomicBoolean();
 
     private final ConcurrentMap<String, CORFileReference> files = 
newConcurrentMap();
-    /**
-     * Set of fileNames bound to current local dir. It is updated with any new 
file
-     * which gets added by this directory
-     */
-    private final Set<String> localFileNames = Sets.newConcurrentHashSet();
 
     public CopyOnReadDirectory(IndexCopier indexCopier, Directory remote, 
Directory local, boolean prefetch,
                                String indexPath, Executor executor) throws 
IOException {
@@ -76,10 +77,6 @@ public class CopyOnReadDirectory extends
         this.local = local;
         this.indexPath = indexPath;
 
-        this.localFileNames.addAll(Arrays.asList(local.listAll()));
-        //Remove files which are being worked upon by COW
-        
this.localFileNames.removeAll(indexCopier.getIndexFilesBeingWritten(indexPath));
-
         if (prefetch) {
             prefetchIndexFiles();
         }
@@ -309,11 +306,27 @@ public class CopyOnReadDirectory extends
     }
 
     private void removeDeletedFiles() throws IOException {
-        //Files present in dest but not present in source have to be deleted
-        Set<String> filesToBeDeleted = Sets.difference(
-                ImmutableSet.copyOf(localFileNames),
-                ImmutableSet.copyOf(remote.listAll())
-        );
+        Set<String> remoteFiles = ImmutableSet.copyOf(remote.listAll());
+
+        long maxTS = IndexCopier.getNewestLocalFSTimestampFor(remoteFiles, 
local);
+        if (maxTS == -1) {
+            log.warn("Couldn't compute safe timestamp to delete files from 
{}", local);
+            return;
+        }
+
+        // subtract DELETE_MARGIN_MILLIS from maxTS for safety (you can never 
be too careful with time)
+        final long deleteBeforeTS = maxTS - DELETE_MARGIN_MILLIS;
+
+        Set<String> filesToBeDeleted =
+                // Files present locally
+                ImmutableSet.copyOf(local.listAll()).stream()
+                // but not in my view
+                .filter(name -> !remoteFiles.contains(name))
+                // and also older than a safe timestamp (deleteBeforeTS)
+                .filter(name -> IndexCopier.isFileModifiedBefore(name, local, 
deleteBeforeTS))
+                // can be deleted
+                .collect(Collectors.toSet())
+        ;
 
         Set<String> failedToDelete = Sets.newHashSet();
 
@@ -352,7 +365,6 @@ public class CopyOnReadDirectory extends
 
         void markValid(){
             this.valid = true;
-            localFileNames.add(name);
         }
     }
 }

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java?rev=1836548&r1=1836547&r2=1836548&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
 Tue Jul 24 11:59:40 2018
@@ -143,7 +143,6 @@ public class CopyOnWriteDirectory extend
         this.executor = executor;
         this.indexPath = indexPath;
         this.reindexMode = reindexMode;
-        indexCopier.clearIndexFilesBeingWritten(indexPath);
         initialize();
     }
 
@@ -183,7 +182,6 @@ public class CopyOnWriteDirectory extend
         }
         ref = new COWLocalFileReference(name);
         fileMap.put(name, ref);
-        indexCopier.addIndexFileBeingWritten(indexPath, name);
         return ref.createOutput(context);
     }
 
@@ -260,7 +258,6 @@ public class CopyOnWriteDirectory extend
 
         local.close();
         remote.close();
-        indexCopier.clearIndexFilesBeingWritten(indexPath);
     }
 
     @Override

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java?rev=1836548&r1=1836547&r2=1836548&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
 Tue Jul 24 11:59:40 2018
@@ -116,7 +116,7 @@ public final class LocalIndexFile {
         return dir != null ? new File(dir, name).length() : 0;
     }
 
-    static File getFSDir(Directory dir) {
+    public static File getFSDir(Directory dir) {
         if (dir instanceof FilterDirectory){
             dir = ((FilterDirectory) dir).getDelegate();
         }

Added: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java?rev=1836548&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java
 Tue Jul 24 11:59:40 2018
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.store.SimpleFSDirectory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
+import static 
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
+import static 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory.DELETE_MARGIN_MILLIS_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IndexCopierCleanupTest {
+    private Random rnd = new Random();
+    private static final int maxFileSize = 7896;
+
+    private static final long SAFE_MARGIN_FOR_DELETION = 
TimeUnit.SECONDS.toMillis(5);
+    private static final long MARGIN_BUFFER_FOR_FS_GRANULARITY = 
TimeUnit.SECONDS.toMillis(1);
+
+    private NodeState root = INITIAL_CONTENT;
+
+    private static final Clock CLOCK = new Clock.Virtual();
+    static {
+        try {
+            CLOCK.waitUntil(Clock.SIMPLE.getTime());
+        } catch (InterruptedException e) {
+            // ignored
+        }
+    }
+
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder(new 
File("target"));
+
+    private NodeBuilder builder = root.builder();
+
+    private String indexPath = "/oak:index/test";
+
+    private final Closer closer = Closer.create();
+
+    private IndexDefinition defn = null;
+
+    private CloseSafeRemoteRAMDirectory remote = null;
+
+    private File localFSDir = null;
+
+    private RAMIndexCopier copier = null;
+
+    @Before
+    public void setUp() throws IOException {
+        System.setProperty(DELETE_MARGIN_MILLIS_NAME, 
String.valueOf(SAFE_MARGIN_FOR_DELETION));
+        LuceneIndexEditorContext.configureUniqueId(builder);
+
+        defn = new IndexDefinition(root, builder.getNodeState(), indexPath);
+        remote = new CloseSafeRemoteRAMDirectory(closer);
+
+        localFSDir = temporaryFolder.newFolder();
+
+        copier = new RAMIndexCopier(localFSDir, sameThreadExecutor(), 
temporaryFolder.getRoot(), true);
+
+        // convince copier that local FS dir is ok (avoid integrity check 
doing the cleanup)
+        copier.getCoRDir().close();
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        closer.close();
+        System.clearProperty(DELETE_MARGIN_MILLIS_NAME);
+    }
+
+    @Test
+    public void basicOperationSameNodeIndexing() throws Exception {
+        Directory cow = copier.getCoWDir();
+        writeFile(cow, "a");
+        cow.close();
+
+        Directory cor1 = copier.getCoRDir();
+
+        cow = copier.getCoWDir();
+        cow.deleteFile("a");
+        writeFile(cow, "b");
+        cow.close();
+
+        Directory cor2 = copier.getCoRDir();
+        cor1.close();
+
+        //CoR1 saw "a" and everything else is newer. Nothing should get deleted
+        assertTrue(existsLocally("a"));
+        assertTrue(existsLocally("b"));
+
+        cow = copier.getCoWDir();
+        cow.deleteFile("b");
+        writeFile(cow, "c");
+        cow.close();
+
+        Directory cor3 = copier.getCoRDir();
+        cor2.close();
+        assertFalse(existsLocally("a"));
+        assertTrue(existsLocally("b"));
+        assertTrue(existsLocally("c"));
+
+        cor3.close();
+        assertFalse(existsLocally("a"));
+        assertFalse(existsLocally("b"));
+        assertTrue(existsLocally("c"));
+    }
+
+    @Test
+    public void basicOperationRemoteNodeIndexing() throws Exception {
+        writeFile(remote, "a");
+        remote.close();
+
+        Directory cor1 = copier.getCoRDir();
+
+        remote.deleteFile("a");
+        writeFile(remote, "b");
+        remote.close();
+
+        Directory cor2 = copier.getCoRDir();
+        cor1.close();
+
+        //CoR1 saw "a" and everything else ("b" due to CoR2) is newer. Nothing 
should get deleted
+        assertTrue(existsLocally("a"));
+        assertTrue(existsLocally("b"));
+
+        remote.deleteFile("b");
+        writeFile(remote, "c");
+        remote.close();
+
+        Directory cor3 = copier.getCoRDir();
+        cor2.close();
+        assertFalse(existsLocally("a"));
+        assertTrue(existsLocally("b"));
+        assertTrue(existsLocally("c"));
+
+        cor3.close();
+        assertFalse(existsLocally("a"));
+        assertFalse(existsLocally("b"));
+        assertTrue(existsLocally("c"));
+    }
+
+    @Test
+    public void oak7246Description() throws Exception {
+        // Step 1
+        Directory cow1 = copier.getCoWDir();
+        writeFile(cow1, "a");
+        writeFile(cow1, "b");
+        cow1.close();
+
+        Directory remoteSnapshowCow1 = remote.snapshot();
+
+        // Step 2
+        Directory cow2 = copier.getCoWDir();
+        cow2.deleteFile("a");
+        cow2.deleteFile("b");
+        writeFile(cow2, "c");
+        writeFile(cow2, "d");
+
+        // Step 3
+        Directory cor1 = copier.getCoRDir(remoteSnapshowCow1);
+        // local listing
+        assertEquals(Sets.newHashSet("a", "b", "c", "d"),
+                Sets.newHashSet(new SimpleFSDirectory(localFSDir).listAll()));
+        // reader listing
+        assertEquals(Sets.newHashSet("a", "b"),
+                Sets.newHashSet(cor1.listAll()));
+
+        // Step 4
+        cow2.close();
+
+        Directory remoteSnapshotCow2 = remote.snapshot();
+
+        // Step 5
+        Directory cow3 = copier.getCoWDir();
+        cow3.deleteFile("c");
+        cow3.deleteFile("d");
+        writeFile(cow3, "e");
+        writeFile(cow3, "f");
+
+        // Step 6
+        Directory cor2 = copier.getCoRDir(remoteSnapshotCow2);
+        // local listing
+        assertEquals(Sets.newHashSet("a", "b", "c", "d", "e", "f"),
+                Sets.newHashSet(new SimpleFSDirectory(localFSDir).listAll()));
+        // reader listing
+        assertEquals(Sets.newHashSet("c", "d"),
+                Sets.newHashSet(cor2.listAll()));
+
+        // Step 7
+        cor1.close();
+
+        // nothing should get deleted as CoR1 sees "a", "b" and everything 
else is newer
+        assertEquals(Sets.newHashSet("a", "b", "c", "d", "e", "f"),
+                Sets.newHashSet(new SimpleFSDirectory(localFSDir).listAll()));
+    }
+
+    @Test
+    public void newlyWrittenFileMustNotBeDeletedDueToLateObservation() throws 
Exception {
+        Directory cow1 = copier.getCoWDir();
+        writeFile(cow1, "a");
+        cow1.close();
+
+        Directory snap1 = remote.snapshot();
+
+        Directory cow2 = copier.getCoWDir();
+        writeFile(cow2, "fileX");
+        cow2.close();
+
+        Directory cor1 = copier.getCoRDir(snap1);
+        cor1.close();
+
+        assertTrue(existsLocally("fileX"));
+    }
+
+    @Test
+    public void newlyWrittenFileMustNotBeDeletedDueToLateClose() throws 
Exception {
+        Directory cow1 = copier.getCoWDir();
+        writeFile(cow1, "a");
+        cow1.close();
+
+        Directory cor1 = copier.getCoRDir();
+
+        Directory cow2 = copier.getCoWDir();
+        writeFile(cow2, "fileX");
+        cow2.close();
+
+        cor1.close();
+
+        assertTrue(existsLocally("fileX"));
+    }
+
+    @Test
+    public void failedWritesGetCleanedUp() throws Exception {
+        CloseSafeRemoteRAMDirectory oldRemote = remote.snapshot();
+
+        Directory failedWriter = copier.getCoWDir();
+        writeFile(failedWriter, "a");
+        failedWriter.close();
+        // actually, everything would've worked for 'failedWriter', but we 
restore 'remote' to old state
+        // to fake failed remote update
+        remote = oldRemote;
+
+        assertTrue(existsLocally("a"));
+
+        // Create some files that get sent to remote
+        Directory cow = copier.getCoWDir();
+        writeFile(cow, "b");
+        cow.close();
+
+        // reader that would invoke cleanup according to its view on close
+        Directory cor = copier.getCoRDir();
+        cor.close();
+
+        assertFalse(existsLocally("a"));
+    }
+
+    @Test
+    public void strayFilesGetRemoved() throws Exception {
+        DelayCopyingSimpleFSDirectory strayDir = new 
DelayCopyingSimpleFSDirectory(localFSDir);
+
+        writeFile(strayDir, "oldestStray");
+
+        // add "a" directly to remote
+        writeFile(remote, "a");
+
+        copier.getCoRDir().close();
+
+        // "a" is added to remote and hence local FS gets when CoR is opened
+        assertFalse(existsLocally("oldestStray"));
+
+        // "b" gets created locally by CoW
+        Directory cow = copier.getCoWDir();
+        writeFile(cow, "b");
+
+        writeFile(strayDir, "oldStray");
+
+        copier.getCoRDir().close();
+
+        // "oldStray" is newer than "b"
+        // hence, doesn't get removed yet
+        assertTrue(existsLocally("oldStray"));
+
+        // "c" gets created locally
+        cow = copier.getCoWDir();
+        writeFile(cow, "c");
+
+        // "newStray" is newer than "c"
+        writeFile(strayDir, "newStray");
+
+        copier.getCoRDir().close();
+
+        assertFalse(existsLocally("oldStray"));
+        assertTrue(existsLocally("newStray"));
+    }
+
+    @Test
+    public void marginIsRespected() throws Exception {
+        writeFile(remote, "a");
+
+        FileUtils.write(new File(localFSDir, "beyond-margin"), 
"beyond-margin-data", (Charset) null);
+        DelayCopyingSimpleFSDirectory.updateLastModified(localFSDir, 
"beyond-margin");
+        // Delay 1 more second to avoid FS time granularity
+        CLOCK.waitUntil(CLOCK.getTime() + SAFE_MARGIN_FOR_DELETION + 
MARGIN_BUFFER_FOR_FS_GRANULARITY);
+
+        FileUtils.write(new File(localFSDir, "within-margin"), 
"within-margin-data", (Charset) null);
+        DelayCopyingSimpleFSDirectory.updateLastModified(localFSDir, 
"within-margin");
+
+        copier.getCoRDir().close();
+
+        assertEquals(Sets.newHashSet("within-margin", "a"),
+                Sets.newHashSet(new SimpleFSDirectory(localFSDir).listAll()));
+    }
+
+    private boolean existsLocally(String fileName) {
+        return new File(localFSDir, fileName).exists();
+    }
+
+    private void writeFile(Directory dir, String name) throws IOException {
+        byte[] data = new byte[(rnd.nextInt(maxFileSize) + 1)];
+        rnd.nextBytes(data);
+
+        IndexOutput o = dir.createOutput(name, IOContext.DEFAULT);
+        o.writeBytes(data, data.length);
+        o.close();
+
+        DelayCopyingSimpleFSDirectory.updateLastModified(dir, name);
+    }
+
+    private class RAMIndexCopier extends IndexCopier {
+        final File baseFSDir;
+
+        RAMIndexCopier(File baseFSDir, Executor executor, File indexRootDir,
+                       boolean prefetchEnabled) throws IOException {
+            super(executor, indexRootDir, prefetchEnabled);
+            this.baseFSDir = baseFSDir;
+        }
+
+        @Override
+        protected Directory createLocalDirForIndexReader(String indexPath, 
IndexDefinition definition, String dirName) throws IOException {
+            return new DelayCopyingSimpleFSDirectory(baseFSDir);
+        }
+
+        @Override
+        protected Directory createLocalDirForIndexWriter(IndexDefinition 
definition, String dirName) throws IOException {
+            return new DelayCopyingSimpleFSDirectory(baseFSDir);
+        }
+
+        Directory getCoRDir() throws IOException {
+            return getCoRDir(remote.snapshot());
+        }
+
+        Directory getCoRDir(Directory remoteSnapshot) throws IOException {
+            return wrapForRead(indexPath, defn, remoteSnapshot, 
INDEX_DATA_CHILD_NAME);
+        }
+
+        Directory getCoWDir() throws IOException {
+            return wrapForWrite(defn, remote, false, INDEX_DATA_CHILD_NAME);
+        }
+    }
+
+    private static class DelayCopyingSimpleFSDirectory extends 
SimpleFSDirectory {
+        DelayCopyingSimpleFSDirectory(File dir) throws IOException {
+            super(dir);
+        }
+
+        static void updateLastModified(Directory dir, String name) throws 
IOException {
+            DelayCopyingSimpleFSDirectory d = null;
+            if (dir instanceof DelayCopyingSimpleFSDirectory) {
+                d = (DelayCopyingSimpleFSDirectory)dir;
+            } else if (dir instanceof FilterDirectory) {
+                Directory delegate = ((FilterDirectory)dir).getDelegate();
+                if (delegate instanceof DelayCopyingSimpleFSDirectory) {
+                    d = (DelayCopyingSimpleFSDirectory)delegate;
+                }
+            }
+
+            if (d != null) {
+                d.updateLastModified(name);
+            }
+        }
+
+        void updateLastModified(String name) throws IOException {
+            try {
+                updateLastModified(directory, name);
+
+                CLOCK.waitUntil(CLOCK.getTime() + SAFE_MARGIN_FOR_DELETION + 
MARGIN_BUFFER_FOR_FS_GRANULARITY);
+            } catch (InterruptedException ie) {
+                // ignored
+            }
+        }
+
+        static void updateLastModified(File fsDirectory, String name) throws 
IOException {
+            // Update file timestamp manually to mimic last updated time 
updates without sleeping
+            File f = new File(fsDirectory, name);
+            if (!f.setLastModified(CLOCK.getTime())) {
+                throw new IOException("Failed to update last modified for " + 
name);
+            }
+        }
+    }
+
+    private static class CloseSafeRemoteRAMDirectory extends RAMDirectory {
+        private final Closer closer;
+
+        CloseSafeRemoteRAMDirectory(Closer closer) {
+            super();
+            this.closer = closer;
+            closer.register(this::close0);
+        }
+
+        CloseSafeRemoteRAMDirectory(CloseSafeRemoteRAMDirectory that) throws 
IOException {
+            super(that, IOContext.READ);
+            this.closer = that.closer;
+            closer.register(this::close0);
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void copy(Directory to, String src, String dest, IOContext 
context) throws IOException {
+            super.copy(to, src, dest, context);
+
+            if (to instanceof DelayCopyingSimpleFSDirectory) {
+                ((DelayCopyingSimpleFSDirectory)to).updateLastModified(dest);
+            }
+        }
+
+        CloseSafeRemoteRAMDirectory snapshot() throws IOException {
+            return new CloseSafeRemoteRAMDirectory(this);
+        }
+
+        private void close0() {
+            super.close();
+        }
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1836548&r1=1836547&r2=1836548&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
 Tue Jul 24 11:59:40 2018
@@ -44,6 +44,7 @@ import javax.management.openmbean.Tabula
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
 import com.google.common.util.concurrent.ForwardingListeningExecutorService;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -54,12 +55,15 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexFile;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.store.SimpleFSDirectory;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -70,6 +74,7 @@ import static com.google.common.collect.
 import static 
com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
 import static 
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
 import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
+import static 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory.DELETE_MARGIN_MILLIS_NAME;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -79,7 +84,6 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
 
 public class IndexCopierTest {
     private Random rnd = new Random();
@@ -87,6 +91,8 @@ public class IndexCopierTest {
 
     private NodeState root = INITIAL_CONTENT;
 
+    private static final Clock CLOCK = new Clock.Virtual();
+
     @Rule
     public TemporaryFolder temporaryFolder = new TemporaryFolder(new 
File("target"));
 
@@ -94,9 +100,26 @@ public class IndexCopierTest {
 
     private String indexPath = "/oak:index/test";
 
+    private final Closer closer = Closer.create();
+    static {
+        try {
+            CLOCK.waitUntil(Clock.SIMPLE.getTime());
+        } catch (InterruptedException e) {
+            // ignored
+        }
+    }
+
     @Before
     public void setUp(){
+        System.setProperty(DELETE_MARGIN_MILLIS_NAME, 
String.valueOf(TimeUnit.SECONDS.toMillis(1)));
         LuceneIndexEditorContext.configureUniqueId(builder);
+        DelayCopyingSimpleFSDirectory.temporaryFolder = temporaryFolder;
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        closer.close();
+        DelayCopyingSimpleFSDirectory.temporaryFolder = null;
     }
 
     @Test
@@ -277,7 +300,7 @@ public class IndexCopierTest {
 
         IndexCopier c1 = new RAMIndexCopier(baseDir, executor, getWorkDir());
 
-        TestRAMDirectory remote = new TestRAMDirectory();
+        FileTrackingDirectory remote = new FileTrackingDirectory();
         Directory wrapped = c1.wrapForRead("/foo", defn, remote, 
INDEX_DATA_CHILD_NAME);
 
         byte[] t1 = writeFile(remote , "t1");
@@ -329,7 +352,7 @@ public class IndexCopierTest {
 
         final CountDownLatch copyProceed = new CountDownLatch(1);
         final CountDownLatch copyRequestArrived = new CountDownLatch(1);
-        TestRAMDirectory remote = new TestRAMDirectory(){
+        FileTrackingDirectory remote = new FileTrackingDirectory(){
             @Override
             public void copy(Directory to, String src, String dest, IOContext 
context) throws IOException {
                 copyRequestArrived.countDown();
@@ -384,7 +407,7 @@ public class IndexCopierTest {
         IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState(), "/foo");
         IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), 
getWorkDir());
 
-        TestRAMDirectory remote = new TestRAMDirectory();
+        FileTrackingDirectory remote = new FileTrackingDirectory();
         Directory wrapped = c1.wrapForRead("/foo", defn, remote, 
INDEX_DATA_CHILD_NAME);
 
         byte[] t1 = writeFile(remote, "t1");
@@ -442,18 +465,14 @@ public class IndexCopierTest {
     }
 
     @Test
-    public void deletesOnClose() throws Exception{
-        //Use a close safe dir. In actual case the FSDir would
-        //be opened on same file system hence it can retain memory
-        //but RAMDirectory does not retain memory hence we simulate
-        //that by not closing the RAMDir and reuse it
+    public void deletesOnClose() throws Exception {
         Directory baseDir = new CloseSafeDir();
 
 
         IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState(), "/foo");
         IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), 
getWorkDir());
 
-        Directory r1 = new RAMDirectory();
+        Directory r1 = new DelayCopyingSimpleFSDirectory();
 
         byte[] t1 = writeFile(r1, "t1");
         byte[] t2 = writeFile(r1 , "t2");
@@ -496,7 +515,7 @@ public class IndexCopierTest {
         IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState(), "/foo");
         IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), 
getWorkDir());
 
-        Directory r1 = new RAMDirectory();
+        Directory r1 = new DelayCopyingSimpleFSDirectory();
 
         byte[] t1 = writeFile(r1, "t1");
         byte[] t2 = writeFile(r1 , "t2");
@@ -974,10 +993,15 @@ public class IndexCopierTest {
 
         Directory remote = new CloseSafeDir();
         byte[] f1 = writeFile(remote, "f1");
+        byte[] f1a = writeFile(remote, "f1a");
 
         Directory cor1 = copier.wrapForRead(indexPath, defn, remote, 
INDEX_DATA_CHILD_NAME);
         readAndAssert(cor1, "f1", f1);
+        readAndAssert(cor1, "f1a", f1a);
+        //Ensure that deletion task submitted to executor get processed 
immediately
+        executor.enableImmediateExecution();
         cor1.close();
+        executor.enableDelayedExecution();
 
         final CountDownLatch pauseCopyLatch = new CountDownLatch(1);
         Directory remote2 = new FilterDirectory(remote) {
@@ -999,8 +1023,13 @@ public class IndexCopierTest {
         //Before copy is done to remote lets delete f1 from remote and
         //open a COR and close it such that it triggers delete of f1
         remote.deleteFile("f1");
+        writeFile(remote, "f1b");
         Directory cor2 = copier.wrapForRead(indexPath, defn, remote, 
INDEX_DATA_CHILD_NAME);
 
+        // Since we're talking of hypothetical stuck cow1 stuck and are 
running next CoW cycle on 'remote', let's
+        // update timestamps on file that cow1 created to maintain some 
temporal sanity
+        updateLastModified(cow1, "f2");
+
         //Ensure that deletion task submitted to executor get processed 
immediately
         executor.enableImmediateExecution();
         cor2.close();
@@ -1062,9 +1091,16 @@ public class IndexCopierTest {
         IndexOutput o = dir.createOutput(name, IOContext.DEFAULT);
         o.writeBytes(data, data.length);
         o.close();
+
+        updateLastModified(dir, name);
+
         return data;
     }
 
+    private static void updateLastModified(Directory dir, String name) throws 
IOException {
+        DelayCopyingSimpleFSDirectory.updateLastModified(dir, name);
+    }
+
     private byte[] randomBytes(int size) {
         byte[] data = new byte[size];
         rnd.nextBytes(data);
@@ -1113,9 +1149,57 @@ public class IndexCopierTest {
         }
     }
 
-    private static class TestRAMDirectory extends RAMDirectory {
+    private static class DelayCopyingSimpleFSDirectory extends 
SimpleFSDirectory {
+        private static TemporaryFolder temporaryFolder;
+
+        public DelayCopyingSimpleFSDirectory() throws IOException {
+            super(temporaryFolder.newFolder());
+        }
+
+        public static void updateLastModified(Directory dir, String name) {
+            DelayCopyingSimpleFSDirectory d = null;
+            if (dir instanceof DelayCopyingSimpleFSDirectory) {
+                d = (DelayCopyingSimpleFSDirectory)dir;
+            } else if (dir instanceof FilterDirectory) {
+                Directory delegate = ((FilterDirectory)dir).getDelegate();
+                if (delegate instanceof DelayCopyingSimpleFSDirectory) {
+                    d = (DelayCopyingSimpleFSDirectory)delegate;
+                }
+            }
+
+            if (d != null) {
+                d.updateLastModified(name);
+            }
+        }
+
+        private void updateLastModified(String name) {
+            try {
+                // Update file timestamp manually to mimic last updated time 
updates without sleeping
+                CLOCK.waitUntil(CLOCK.getTime() + 
TimeUnit.SECONDS.toMillis(2));
+
+                File f = new File(directory, name);
+                f.setLastModified(CLOCK.getTimeIncreasing());
+            } catch (InterruptedException ie) {
+                // ignored
+            }
+        }
+
+        @Override
+        public void copy(Directory to, String src, String dest, IOContext 
context) throws IOException {
+            super.copy(to, src, dest, context);
+
+            if (to instanceof DelayCopyingSimpleFSDirectory) {
+                ((DelayCopyingSimpleFSDirectory)to).updateLastModified(dest);
+            }
+        }
+    }
+
+    private class FileTrackingDirectory extends DelayCopyingSimpleFSDirectory {
         final List<String> openedFiles = newArrayList();
 
+        public FileTrackingDirectory() throws IOException {
+        }
+
         @Override
         public IndexInput openInput(String name, IOContext context) throws 
IOException {
             openedFiles.add(name);
@@ -1127,10 +1211,17 @@ public class IndexCopierTest {
         }
     }
 
-    private static class CloseSafeDir extends RAMDirectory {
+    private class CloseSafeDir extends DelayCopyingSimpleFSDirectory {
+        public CloseSafeDir() throws IOException {
+        }
+
         @Override
         public void close() {
+            closer.register(this::close0);
+        }
 
+        private void close0() {
+            super.close();
         }
     }
 


Reply via email to