Author: catholicon
Date: Wed Jun 14 12:15:49 2017
New Revision: 1798670
URL: http://svn.apache.org/viewvc?rev=1798670&view=rev
Log:
OAK-6314: ActiveDeletedBlobCollectorTest.multiThreadedCommits is failing
intermittently for a few users
There were 2 issues that we fixed here:
* Blocking queue sometimes won't give all items to "consumer" immediately (we
tried iterating a few times and yet the items didn't show up)
** For this, we not keep deleting some "MARKER*" blobs and wait for them to be
purged
* Data appened to deletedBlobFile doesn't immediately show up during
read while purging
** For this, we change the implementation a bit to not delete last file that
might have been used for writing when purge is called. This file would be
read/processed/deleted in the next run now.
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java?rev=1798670&r1=1798669&r2=1798670&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
Wed Jun 14 12:15:49 2017
@@ -155,13 +155,14 @@ public class ActiveDeletedBlobCollectorF
/**
* Purges blobs form blob-store which were tracked earlier to deleted.
* @param before only purge blobs which were deleted before this
timestamps
- * @param blobStore
+ * @param blobStore used to purge blobs/chunks
*/
public void purgeBlobsDeleted(long before, @Nonnull
GarbageCollectableBlobStore blobStore) {
long numBlobsDeleted = 0;
long numChunksDeleted = 0;
long lastCheckedBlobTimestamp = readLastCheckedBlobTimestamp();
+ String currInUseFileName = deletedBlobsFileWriter.inUseFileName;
deletedBlobsFileWriter.releaseInUseFile();
for (File deletedBlobListFile : FileUtils.listFiles(rootDirectory,
blobFileNameFilter, null)) {
if
(deletedBlobListFile.getName().equals(deletedBlobsFileWriter.inUseFileName)) {
@@ -219,7 +220,10 @@ public class ActiveDeletedBlobCollectorF
LOG.warn("Couldn't read deleted blob list file - " +
deletedBlobListFile, ioe);
}
- if (!deletedBlobListFile.delete()) {
+ // OAK-6314 revealed that blobs appended might not be
immediately available. So, we'd skip
+ // the file that was being processed when purge started -
next cycle would re-process and
+ // delete
+ if
(!deletedBlobListFile.getName().equals(currInUseFileName) &&
!deletedBlobListFile.delete()) {
LOG.warn("File {} couldn't be deleted while all blobs
listed in it have been purged", deletedBlobListFile);
}
} else {
@@ -232,6 +236,10 @@ public class ActiveDeletedBlobCollectorF
private long readLastCheckedBlobTimestamp() {
File blobCollectorInfoFile = new File(rootDirectory,
"collection-info.txt");
+ if (!blobCollectorInfoFile.exists()) {
+ LOG.debug("Couldn't read last checked blob timestamp (file not
found). Would do a bit more scan");
+ return -1;
+ }
InputStream is = null;
Properties p;
try {
@@ -239,7 +247,8 @@ public class ActiveDeletedBlobCollectorF
p = new Properties();
p.load(is);
} catch (IOException e) {
- LOG.info("Couldn't read last checked blob timestamp... would
do a bit more scan");
+ LOG.warn("Couldn't read last checked blob timestamp from {}
... would do a bit more scan",
+ blobCollectorInfoFile, e);
return -1;
} finally {
org.apache.commons.io.IOUtils.closeQuietly(is);
@@ -292,16 +301,23 @@ public class ActiveDeletedBlobCollectorF
}
private void addDeletedBlobs(Collection<BlobIdInfoStruct>
deletedBlobs) {
+ int addedForFlush = 0;
for (BlobIdInfoStruct info : deletedBlobs) {
try {
if (!this.deletedBlobs.offer(info, 1, TimeUnit.SECONDS)) {
LOG.warn("Timed out while offer-ing {} into queue.",
info);
}
+ if (LOG.isDebugEnabled()) {
+ addedForFlush++;
+ }
} catch (InterruptedException e) {
LOG.warn("Interrupted while adding " + info, e);
}
}
- LOG.debug("Added {} to be flushed", deletedBlobs.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added {} (out of {} tried) to be flushed. QSize:
{}",
+ addedForFlush, deletedBlobs.size(),
this.deletedBlobs.size());
+ }
deletedBlobsFileWriter.scheduleFileFlushIfNeeded();
}
@@ -312,9 +328,7 @@ public class ActiveDeletedBlobCollectorF
private synchronized void flushDeletedBlobs() {
List<BlobIdInfoStruct> localDeletedBlobs = new LinkedList<>();
- while (deletedBlobs.peek() != null) {
- localDeletedBlobs.add(deletedBlobs.poll());
- }
+ deletedBlobs.drainTo(localDeletedBlobs);
if (localDeletedBlobs.size() > 0) {
File outFile = new File(rootDirectory, getBlobFileName());
try {
@@ -324,6 +338,9 @@ public class ActiveDeletedBlobCollectorF
} catch (IOException e) {
LOG.error("Couldn't write out to " + outFile, e);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Flushed {} blobs to {}",
localDeletedBlobs.size(), outFile.getName());
+ }
}
}
Modified:
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java?rev=1798670&r1=1798669&r2=1798670&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
Wed Jun 14 12:15:49 2017
@@ -27,7 +27,6 @@ import org.apache.jackrabbit.oak.spi.blo
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -43,6 +42,7 @@ import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -55,7 +55,9 @@ import static org.apache.jackrabbit.oak.
import static
org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback.IndexProgress.COMMIT_SUCCEDED;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
public class ActiveDeletedBlobCollectorTest {
@@ -166,13 +168,12 @@ public class ActiveDeletedBlobCollectorT
verifyBlobsDeleted("blobId1", "blobId2", "blobId3");
}
- @Ignore("OAK-6314")
@Test
public void multiThreadedCommits() throws Exception {
- clock = Clock.SIMPLE;
ExecutorService executorService = Executors.newFixedThreadPool(3);
- adbc = ActiveDeletedBlobCollectorFactory.newInstance(
- new File(blobCollectionRoot.getRoot(), "b"), executorService);
+ File rootDirectory = new File(blobCollectionRoot.getRoot(), "b");
+ FileUtils.forceMkdir(rootDirectory);
+ adbc = new ActiveDeletedBlobCollectorImpl(clock, rootDirectory,
executorService);
int numThreads = 4;
int numBlobsPerThread = 500;
@@ -214,10 +215,8 @@ public class ActiveDeletedBlobCollectorT
t.join();
}
- // Push one more commit to flush out any remaining ones
- adbc.getBlobDeletionCallback().commitProgress(COMMIT_SUCCEDED);
-
- executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
+ boolean timeout = executorService.awaitTermination(100,
TimeUnit.MILLISECONDS);
+ assertFalse(timeout);
List<String> deletedChunks = new
ArrayList<>(numThreads*numBlobsPerThread*2);
for (int threadNum = 0; threadNum < numThreads; threadNum++) {
@@ -227,7 +226,36 @@ public class ActiveDeletedBlobCollectorT
}
}
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+ // Blocking queue doesn't supply all the items immediately.
+ // So, we'd push "MARKER*" blob ids and purge until some marker blob
+ // gets purged. BUT, we'd time-out this activity in 3 seconds
+ long until = Clock.SIMPLE.getTime() + TimeUnit.SECONDS.toMillis(3);
+ List<String> markerChunks = Lists.newArrayList();
+ int i = 0;
+ while (Clock.SIMPLE.getTime() < until) {
+ // Push commit with a marker blob-id and wait for it to be purged
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+ String markerBlobId = "MARKER-" + (i++);
+ bdc.deleted(markerBlobId, Lists.newArrayList(markerBlobId));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ Iterators.addAll(markerChunks,
blobStore.resolveChunks(markerBlobId));
+ clock.waitUntil(clock.getTime() + TimeUnit.SECONDS.toMillis(5));
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+ if (blobStore.markerChunkDeleted) {
+ break;
+ }
+ }
+
+ assertTrue("Timed out while waiting for marker chunk to be purged",
blobStore.markerChunkDeleted);
+
+ // don't care how many marker blobs are purged
+ blobStore.deletedChunkIds.removeAll(markerChunks);
+
+ HashSet<String> list = new HashSet<>(deletedChunks);
+ list.removeAll(blobStore.deletedChunkIds);
+ assertTrue("size: " + list.size() + "; list: " + list.toString(),
list.isEmpty());
assertThat(blobStore.deletedChunkIds,
containsInAnyOrder(deletedChunks.toArray()));
}
@@ -271,6 +299,7 @@ public class ActiveDeletedBlobCollectorT
class ChunkDeletionTrackingBlobStore implements
GarbageCollectableBlobStore {
List<String> deletedChunkIds = Lists.newArrayList();
+ volatile boolean markerChunkDeleted = false;
@Override
public String writeBlob(InputStream in) throws IOException {
@@ -350,15 +379,28 @@ public class ActiveDeletedBlobCollectorT
@Override
public boolean deleteChunks(List<String> chunkIds, long
maxLastModifiedTime) throws Exception {
deletedChunkIds.addAll(chunkIds);
+ setMarkerChunkDeletedFlag(chunkIds);
return true;
}
@Override
public long countDeleteChunks(List<String> chunkIds, long
maxLastModifiedTime) throws Exception {
deletedChunkIds.addAll(chunkIds);
+ setMarkerChunkDeletedFlag(chunkIds);
return chunkIds.size();
}
+ private void setMarkerChunkDeletedFlag(List<String> deletedChunkIds) {
+ if (!markerChunkDeleted) {
+ for (String chunkId : deletedChunkIds) {
+ if (chunkId.startsWith("MARKER")) {
+ markerChunkDeleted = true;
+ break;
+ }
+ }
+ }
+ }
+
@Override
public Iterator<String> resolveChunks(String blobId) throws
IOException {
return Iterators.forArray(blobId + "-1", blobId + "-2");