Author: amitj
Date: Thu Dec 13 06:34:07 2018
New Revision: 1848822
URL: http://svn.apache.org/viewvc?rev=1848822&view=rev
Log:
OAK-7951: Datastore GC stats not updated with failure when "Not all
repositories have marked references available"
Based on patch from Wim Symons
- Bubble up exception in case of not able to run sweep because of not all
repositories not having references available
- Mark failure in stats which can then be queried
Added:
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
Modified:
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1848822&r1=1848821&r2=1848822&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
(original)
+++
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
Thu Dec 13 06:34:07 2018
@@ -85,10 +85,10 @@ import org.slf4j.LoggerFactory;
/**
* Mark and sweep garbage collector.
- *
+ *
* Uses the file system to store internal state while in process to account
for huge data.
* This class is not thread safe.
- *
+ *
*/
public class MarkSweepGarbageCollector implements BlobGarbageCollector {
@@ -97,7 +97,7 @@ public class MarkSweepGarbageCollector i
public static final String TEMP_DIR =
StandardSystemProperty.JAVA_IO_TMPDIR.value();
public static final int DEFAULT_BATCH_COUNT = 1024;
-
+
public static final String DELIM = ",";
private static final Function<String, String> transformer = new
Function<String, String>() {
@@ -234,7 +234,7 @@ public class MarkSweepGarbageCollector i
/**
* Returns the stats related to GC for all repos
- *
+ *
* @return a list of GarbageCollectionRepoStats objects
* @throws Exception
*/
@@ -348,16 +348,18 @@ public class MarkSweepGarbageCollector i
long deleteCount;
try {
deleteCount = sweep(fs, markStart, forceBlobRetrieve);
+
+ long maxTime = getMaxModifiedTime(markStart) > 0 ?
getMaxModifiedTime(markStart) : markStart;
+ LOG.info("Blob garbage collection completed in {} ({} ms).
Number of blobs deleted [{}] with max modification time of [{}]",
+ sw.toString(), sw.elapsed(TimeUnit.MILLISECONDS),
deleteCount, timestampToString(maxTime));
+
threw = false;
+ } catch (NotAllRepositoryMarkedException rm) {
+ statsCollector.finishFailure();
} finally {
sw.stop();
statsCollector.updateSweepDuration(sw.elapsed(TimeUnit.MILLISECONDS) -
markFinish, TimeUnit.MILLISECONDS);
}
-
- long maxTime = getMaxModifiedTime(markStart) > 0 ?
getMaxModifiedTime(markStart) : markStart;
-
- LOG.info("Blob garbage collection completed in {} ({} ms).
Number of blobs deleted [{}] with max modification time of [{}]",
- sw.toString(), sw.elapsed(TimeUnit.MILLISECONDS),
deleteCount, timestampToString(maxTime));
}
} catch (Exception e) {
statsCollector.finishFailure();
@@ -394,7 +396,7 @@ public class MarkSweepGarbageCollector i
/**
* Difference phase where the GC candidates are identified.
- *
+ *
* @param fs the garbage collector file state
* @throws IOException
* Signals that an I/O exception has occurred.
@@ -453,14 +455,10 @@ public class MarkSweepGarbageCollector i
long earliestRefAvailTime;
// Merge all the blob references available from all the reference
files in the data store meta store
// Only go ahead if merge succeeded
- try {
- earliestRefAvailTime =
-
GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
- LOG.debug("Earliest reference available for timestamp [{}]",
earliestRefAvailTime);
- earliestRefAvailTime = (earliestRefAvailTime < markStart ?
earliestRefAvailTime : markStart);
- } catch (Exception e) {
- return 0;
- }
+ earliestRefAvailTime =
+
GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
+ LOG.debug("Earliest reference available for timestamp [{}]",
earliestRefAvailTime);
+ earliestRefAvailTime = (earliestRefAvailTime < markStart ?
earliestRefAvailTime : markStart);
// Find all blob references after iterating over the whole repository
(new BlobIdRetriever(fs, forceBlobRetrieve)).call();
@@ -469,7 +467,7 @@ public class MarkSweepGarbageCollector i
difference(fs);
long count = 0;
long deleted = 0;
-
+
long maxModifiedTime = getMaxModifiedTime(earliestRefAvailTime);
LOG.debug("Starting sweep phase of the garbage collector");
LOG.debug("Sweeping blobs with modified time > than the configured max
deleted time ({}). ",
@@ -511,7 +509,7 @@ public class MarkSweepGarbageCollector i
BlobCollectionType.get(blobStore).handleRemoves(blobStore,
fs.getGarbage(), fs.getMarkedRefs());
if(count != deleted) {
- LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates
identified. This may happen if blob "
+ LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates
identified. This may happen if blob "
+ "modified time is > "
+ "than the max deleted time ({})", deleted, count,
timestampToString(maxModifiedTime));
@@ -643,10 +641,10 @@ public class MarkSweepGarbageCollector i
closeQuietly(writer);
}
}
-
+
/**
* Checks for the DataStore consistency and reports the number of missing
blobs still referenced.
- *
+ *
* @return the missing blobs
* @throws Exception
*/
@@ -661,12 +659,12 @@ public class MarkSweepGarbageCollector i
try {
LOG.info("Starting blob consistency check");
-
+
// Find all blobs available in the blob store
ListenableFutureTask<Integer> blobIdRetriever =
ListenableFutureTask.create(new BlobIdRetriever(fs,
true));
executor.execute(blobIdRetriever);
-
+
// Mark all used blob references
iterateNodeTree(fs, true);
consistencyStatsCollector.updateMarkDuration(sw.elapsed(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
@@ -678,7 +676,7 @@ public class MarkSweepGarbageCollector i
threw = false;
throw e;
}
-
+
LOG.trace("Starting difference phase of the consistency check");
FileLineDifferenceIterator iter = new FileLineDifferenceIterator(
fs.getAvailableRefs(),
@@ -721,7 +719,7 @@ public class MarkSweepGarbageCollector i
this.fs = fs;
this.forceRetrieve = forceBlobRetrieve;
}
-
+
@Override
public Integer call() throws Exception {
if (!forceRetrieve) {
@@ -756,7 +754,7 @@ public class MarkSweepGarbageCollector i
SHARED {
/**
* Remove the maked references and the marked markers from the
blob store root. Default NOOP.
- *
+ *
* @param blobStore the blobStore instance
*/
@Override
@@ -767,7 +765,7 @@ public class MarkSweepGarbageCollector i
/**
* Merge all marked references available from all repositories and
return the earliest time of the references.
- *
+ *
* @param blobStore the blob store
* @param fs the fs
* @return the long the earliest time of the available references
@@ -798,7 +796,7 @@ public class MarkSweepGarbageCollector i
}
merge(files, fs.getMarkedRefs());
-
+
// Get the timestamp to indicate the earliest mark phase
start
List<DataRecord> markerFiles =
((SharedDataStore) blobStore).getAllMetadataRecords(
@@ -812,13 +810,13 @@ public class MarkSweepGarbageCollector i
return (earliestMarker < earliestRef ? earliestMarker :
earliestRef);
} else {
LOG.error("Not all repositories have marked references
available : {}", unAvailRepos);
- throw new IOException("Not all repositories have marked
references available");
+ throw new NotAllRepositoryMarkedException("Not all
repositories have marked references available");
}
}
/**
* Adds the marked references to the blob store root. Default NOOP
- *
+ *
* @param blobStore the blob store
* @param fs the fs
* @param repoId the repo id
@@ -839,7 +837,7 @@ public class MarkSweepGarbageCollector i
((SharedDataStore)
blobStore).addMetadataRecord(fs.getMarkedRefs(),
SharedStoreRecordType.REFERENCES
.getNameFromIdPrefix(repoId, uniqueSuffix));
}
-
+
@Override
public void addMarkedStartMarker(GarbageCollectableBlobStore
blobStore, String repoId,
String uniqueSuffix) {
@@ -875,7 +873,7 @@ public class MarkSweepGarbageCollector i
}
return DEFAULT;
}
-
+
public void addMarkedStartMarker(GarbageCollectableBlobStore
blobStore, String repoId, String uniqueSuffix) {}
}
@@ -1151,4 +1149,15 @@ public class MarkSweepGarbageCollector i
return totalSizeDeletedCounter.getCount();
}
}
+
+
+ /**
+ * Marker IOException to identify sweep phase failure because of some
+ * repositories not having finished Mark phase.
+ */
+ static class NotAllRepositoryMarkedException extends IOException {
+ public NotAllRepositoryMarkedException(String message) {
+ super(message);
+ }
+ }
}
Modified:
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java?rev=1848822&r1=1848821&r2=1848822&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
(original)
+++
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
Thu Dec 13 06:34:07 2018
@@ -235,7 +235,7 @@ public class BlobGCTest {
Set<String> existingAfterGC = executeGarbageCollection(secondCluster,
secondCluster.getCollector(0), false);
assertEquals(totalAdded, existingAfterGC);
- assertStats(secondCluster.statsProvider, 1, 0, 0, 0, NAME);
+ assertStats(secondCluster.statsProvider, 1, 1, 0, 0, NAME);
}
@Test
Added:
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java?rev=1848822&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
(added)
+++
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
Thu Dec 13 06:34:07 2018
@@ -0,0 +1,125 @@
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Tracker;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static
org.apache.jackrabbit.oak.plugins.blob.SharedDataStore.Type.SHARED;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SharedDataStoreMarkSweepGarbageCollectorTest {
+
+ @Mock
+ private MockGarbageCollectableSharedDataStore blobStore;
+
+ @Mock
+ private BlobReferenceRetriever marker;
+
+ @Mock
+ private Whiteboard whiteboard;
+
+ @Mock
+ private Tracker<CheckpointMBean> tracker;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ private MarkSweepGarbageCollector collector;
+
+ @Mock
+ private CheckpointMBean checkpointMBean;
+
+ private ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+
+ @Before
+ public void setUp() throws IOException {
+ when(whiteboard.track(CheckpointMBean.class)).thenReturn(tracker);
+ when(tracker.getServices()).thenReturn(ImmutableList.of(checkpointMBean));
+
+ when(blobStore.getType()).thenReturn(SHARED);
+
+ collector = new MarkSweepGarbageCollector(
+ marker,
+ blobStore,
+ executor,
+ MarkSweepGarbageCollector.TEMP_DIR,
+ 1,
+ 0L,
+ "repo",
+ whiteboard,
+ new DefaultStatisticsProvider(executor)
+ );
+ }
+
+ @After
+ public void tear() {
+ new ExecutorCloser(executor).close();
+ }
+
+ @Test
+ public void
markAndSweepShouldFailIfNotAllRepositoriesHaveMarkedReferencesAvailable()
throws Exception {
+ setupSharedDataRecords("REPO1", "REPO2");
+
+ collector.markAndSweep(false, true);
+
+ assertThat(collector.getOperationStats().numDeleted(), is(0L));
+ assertThat(collector.getOperationStats().getFailureCount(), is(1L));
+ }
+
+ @Test
+ public void markAndSweepShouldSucceedWhenAllRepositoriesAreAvailable()
throws Exception {
+ setupSharedDataRecords("REPO1", "REPO1");
+
when(blobStore.getAllChunkIds(0L)).thenReturn(ImmutableList.<String>of().iterator());
+
+ collector.markAndSweep(false, true);
+
+ assertThat(collector.getOperationStats().numDeleted(), is(0L));
+ assertThat(collector.getOperationStats().getFailureCount(), is(0L));
+ }
+
+ private void setupSharedDataRecords(final String refRepoId, final String
repoRepoId) throws DataStoreException {
+ DataRecord refDataRecord = mock(DataRecord.class);
+ when(refDataRecord.getIdentifier()).thenReturn(new
DataIdentifier("references-" + refRepoId));
+ when(refDataRecord.getStream()).thenReturn(new ByteArrayInputStream(new
byte[0]));
+ when(refDataRecord.getLastModified()).thenReturn(10L);
+
+ DataRecord repoDataRecord = mock(DataRecord.class);
+ when(repoDataRecord.getIdentifier()).thenReturn(new
DataIdentifier("repository-" + repoRepoId));
+
+ List<DataRecord> refs = ImmutableList.of(refDataRecord);
+ List<DataRecord> repos = ImmutableList.of(repoDataRecord);
+
+
when(blobStore.getAllMetadataRecords(SharedDataStoreUtils.SharedStoreRecordType.REFERENCES.getType())).thenReturn(refs);
+
when(blobStore.getAllMetadataRecords(SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY.getType())).thenReturn(repos);
+
when(blobStore.getAllMetadataRecords(SharedDataStoreUtils.SharedStoreRecordType.MARKED_START_MARKER.getType())).thenReturn(refs);
+ }
+
+ private interface MockGarbageCollectableSharedDataStore extends
GarbageCollectableBlobStore, SharedDataStore {
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
------------------------------------------------------------------------------
svn:eol-style = native