Author: catholicon
Date: Wed Jun 14 12:15:49 2017
New Revision: 1798670

URL: http://svn.apache.org/viewvc?rev=1798670&view=rev
Log:
OAK-6314: ActiveDeletedBlobCollectorTest.multiThreadedCommits is failing 
intermittently for a few users

There were 2 issues that we fixed here:
* Blocking queue sometimes won't give all items to "consumer" immediately (we 
tried iterating a few times and yet the items didn't show up)
** For this, we not keep deleting some "MARKER*" blobs and wait for them to be 
purged
* Data appened to deletedBlobFile doesn't immediately show up during
read while purging
** For this, we change the implementation a bit to not delete last file that 
might have been used for writing when purge is called. This file would be 
read/processed/deleted in the next run now.

Modified:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java?rev=1798670&r1=1798669&r2=1798670&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
 Wed Jun 14 12:15:49 2017
@@ -155,13 +155,14 @@ public class ActiveDeletedBlobCollectorF
         /**
          * Purges blobs form blob-store which were tracked earlier to deleted.
          * @param before only purge blobs which were deleted before this 
timestamps
-         * @param blobStore
+         * @param blobStore used to purge blobs/chunks
          */
         public void purgeBlobsDeleted(long before, @Nonnull 
GarbageCollectableBlobStore blobStore) {
             long numBlobsDeleted = 0;
             long numChunksDeleted = 0;
 
             long lastCheckedBlobTimestamp = readLastCheckedBlobTimestamp();
+            String currInUseFileName = deletedBlobsFileWriter.inUseFileName;
             deletedBlobsFileWriter.releaseInUseFile();
             for (File deletedBlobListFile : FileUtils.listFiles(rootDirectory, 
blobFileNameFilter, null)) {
                 if 
(deletedBlobListFile.getName().equals(deletedBlobsFileWriter.inUseFileName)) {
@@ -219,7 +220,10 @@ public class ActiveDeletedBlobCollectorF
                         LOG.warn("Couldn't read deleted blob list file - " + 
deletedBlobListFile, ioe);
                     }
 
-                    if (!deletedBlobListFile.delete()) {
+                    // OAK-6314 revealed that blobs appended might not be 
immediately available. So, we'd skip
+                    // the file that was being processed when purge started - 
next cycle would re-process and
+                    // delete
+                    if 
(!deletedBlobListFile.getName().equals(currInUseFileName) && 
!deletedBlobListFile.delete()) {
                         LOG.warn("File {} couldn't be deleted while all blobs 
listed in it have been purged", deletedBlobListFile);
                     }
                 } else {
@@ -232,6 +236,10 @@ public class ActiveDeletedBlobCollectorF
 
         private long readLastCheckedBlobTimestamp() {
             File blobCollectorInfoFile = new File(rootDirectory, 
"collection-info.txt");
+            if (!blobCollectorInfoFile.exists()) {
+                LOG.debug("Couldn't read last checked blob timestamp (file not 
found). Would do a bit more scan");
+                return -1;
+            }
             InputStream is = null;
             Properties p;
             try {
@@ -239,7 +247,8 @@ public class ActiveDeletedBlobCollectorF
                 p = new Properties();
                 p.load(is);
             } catch (IOException e) {
-                LOG.info("Couldn't read last checked blob timestamp... would 
do a bit more scan");
+                LOG.warn("Couldn't read last checked blob timestamp from {} 
... would do a bit more scan",
+                        blobCollectorInfoFile, e);
                 return -1;
             } finally {
                 org.apache.commons.io.IOUtils.closeQuietly(is);
@@ -292,16 +301,23 @@ public class ActiveDeletedBlobCollectorF
         }
 
         private void addDeletedBlobs(Collection<BlobIdInfoStruct> 
deletedBlobs) {
+            int addedForFlush = 0;
             for (BlobIdInfoStruct info : deletedBlobs) {
                 try {
                     if (!this.deletedBlobs.offer(info, 1, TimeUnit.SECONDS)) {
                         LOG.warn("Timed out while offer-ing {} into queue.", 
info);
                     }
+                    if (LOG.isDebugEnabled()) {
+                        addedForFlush++;
+                    }
                 } catch (InterruptedException e) {
                     LOG.warn("Interrupted while adding " + info, e);
                 }
             }
-            LOG.debug("Added {} to be flushed", deletedBlobs.size());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Added {} (out of {} tried) to be flushed. QSize: 
{}",
+                        addedForFlush, deletedBlobs.size(), 
this.deletedBlobs.size());
+            }
             deletedBlobsFileWriter.scheduleFileFlushIfNeeded();
         }
 
@@ -312,9 +328,7 @@ public class ActiveDeletedBlobCollectorF
 
             private synchronized void flushDeletedBlobs() {
                 List<BlobIdInfoStruct> localDeletedBlobs = new LinkedList<>();
-                while (deletedBlobs.peek() != null) {
-                    localDeletedBlobs.add(deletedBlobs.poll());
-                }
+                deletedBlobs.drainTo(localDeletedBlobs);
                 if (localDeletedBlobs.size() > 0) {
                     File outFile = new File(rootDirectory, getBlobFileName());
                     try {
@@ -324,6 +338,9 @@ public class ActiveDeletedBlobCollectorF
                     } catch (IOException e) {
                         LOG.error("Couldn't write out to " + outFile, e);
                     }
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Flushed {} blobs to {}", 
localDeletedBlobs.size(), outFile.getName());
+                    }
                 }
             }
 

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java?rev=1798670&r1=1798669&r2=1798670&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
 Wed Jun 14 12:15:49 2017
@@ -27,7 +27,6 @@ import org.apache.jackrabbit.oak.spi.blo
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -43,6 +42,7 @@ import java.nio.file.Path;
 import java.nio.file.attribute.PosixFilePermission;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -55,7 +55,9 @@ import static org.apache.jackrabbit.oak.
 import static 
org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback.IndexProgress.COMMIT_SUCCEDED;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
 public class ActiveDeletedBlobCollectorTest {
@@ -166,13 +168,12 @@ public class ActiveDeletedBlobCollectorT
         verifyBlobsDeleted("blobId1", "blobId2", "blobId3");
     }
 
-    @Ignore("OAK-6314")
     @Test
     public void multiThreadedCommits() throws Exception {
-        clock = Clock.SIMPLE;
         ExecutorService executorService = Executors.newFixedThreadPool(3);
-        adbc = ActiveDeletedBlobCollectorFactory.newInstance(
-                new File(blobCollectionRoot.getRoot(), "b"), executorService);
+        File rootDirectory = new File(blobCollectionRoot.getRoot(), "b");
+        FileUtils.forceMkdir(rootDirectory);
+        adbc = new ActiveDeletedBlobCollectorImpl(clock, rootDirectory, 
executorService);
 
         int numThreads = 4;
         int numBlobsPerThread = 500;
@@ -214,10 +215,8 @@ public class ActiveDeletedBlobCollectorT
             t.join();
         }
 
-        // Push one more commit to flush out any remaining ones
-        adbc.getBlobDeletionCallback().commitProgress(COMMIT_SUCCEDED);
-
-        executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
+        boolean timeout = executorService.awaitTermination(100, 
TimeUnit.MILLISECONDS);
+        assertFalse(timeout);
 
         List<String> deletedChunks = new 
ArrayList<>(numThreads*numBlobsPerThread*2);
         for (int threadNum = 0; threadNum < numThreads; threadNum++) {
@@ -227,7 +226,36 @@ public class ActiveDeletedBlobCollectorT
             }
         }
 
-        adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+        // Blocking queue doesn't supply all the items immediately.
+        // So, we'd push "MARKER*" blob ids and purge until some marker blob
+        // gets purged. BUT, we'd time-out this activity in 3 seconds
+        long until = Clock.SIMPLE.getTime() + TimeUnit.SECONDS.toMillis(3);
+        List<String> markerChunks = Lists.newArrayList();
+        int i = 0;
+        while (Clock.SIMPLE.getTime() < until) {
+            // Push commit with a marker blob-id and wait for it to be purged
+            BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+            String markerBlobId = "MARKER-" + (i++);
+            bdc.deleted(markerBlobId, Lists.newArrayList(markerBlobId));
+            bdc.commitProgress(COMMIT_SUCCEDED);
+
+            Iterators.addAll(markerChunks, 
blobStore.resolveChunks(markerBlobId));
+            clock.waitUntil(clock.getTime() + TimeUnit.SECONDS.toMillis(5));
+            adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+            if (blobStore.markerChunkDeleted) {
+                break;
+            }
+        }
+
+        assertTrue("Timed out while waiting for marker chunk to be purged", 
blobStore.markerChunkDeleted);
+
+        // don't care how many marker blobs are purged
+        blobStore.deletedChunkIds.removeAll(markerChunks);
+
+        HashSet<String> list = new HashSet<>(deletedChunks);
+        list.removeAll(blobStore.deletedChunkIds);
+        assertTrue("size: " + list.size() + "; list: " + list.toString(), 
list.isEmpty());
 
         assertThat(blobStore.deletedChunkIds, 
containsInAnyOrder(deletedChunks.toArray()));
     }
@@ -271,6 +299,7 @@ public class ActiveDeletedBlobCollectorT
 
     class ChunkDeletionTrackingBlobStore implements 
GarbageCollectableBlobStore {
         List<String> deletedChunkIds = Lists.newArrayList();
+        volatile boolean markerChunkDeleted = false;
 
         @Override
         public String writeBlob(InputStream in) throws IOException {
@@ -350,15 +379,28 @@ public class ActiveDeletedBlobCollectorT
         @Override
         public boolean deleteChunks(List<String> chunkIds, long 
maxLastModifiedTime) throws Exception {
             deletedChunkIds.addAll(chunkIds);
+            setMarkerChunkDeletedFlag(chunkIds);
             return true;
         }
 
         @Override
         public long countDeleteChunks(List<String> chunkIds, long 
maxLastModifiedTime) throws Exception {
             deletedChunkIds.addAll(chunkIds);
+            setMarkerChunkDeletedFlag(chunkIds);
             return chunkIds.size();
         }
 
+        private void setMarkerChunkDeletedFlag(List<String> deletedChunkIds) {
+            if (!markerChunkDeleted) {
+                for (String chunkId : deletedChunkIds) {
+                    if (chunkId.startsWith("MARKER")) {
+                        markerChunkDeleted = true;
+                        break;
+                    }
+                }
+            }
+        }
+
         @Override
         public Iterator<String> resolveChunks(String blobId) throws 
IOException {
             return Iterators.forArray(blobId + "-1", blobId + "-2");


Reply via email to