Author: amitj
Date: Mon Sep 24 11:04:18 2018
New Revision: 1841829
URL: http://svn.apache.org/viewvc?rev=1841829&view=rev
Log:
OAK-7777: Fix getStats in GC to account also for duplicate entries of references
- Handling duplicate entries for a particular id
Modified:
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
Modified:
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1841829&r1=1841828&r2=1841829&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
(original)
+++
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
Mon Sep 24 11:04:18 2018
@@ -51,6 +51,9 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.StandardSystemProperty;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -242,55 +245,63 @@ public class MarkSweepGarbageCollector i
// Get all the references available
List<DataRecord> refFiles =
((SharedDataStore)
blobStore).getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType());
- Map<String, DataRecord> references = Maps.uniqueIndex(refFiles,
new Function<DataRecord, String>() {
- @Override
- public String apply(DataRecord input) {
- return
SharedStoreRecordType.REFERENCES.getIdFromName(input.getIdentifier().toString());
- }
- });
-
+ ImmutableListMultimap<String, DataRecord> references =
+ FluentIterable.from(refFiles).index(new Function<DataRecord,
String>() {
+ @Override public String apply(DataRecord input) {
+ return
SharedStoreRecordType.REFERENCES.getIdFromName(input.getIdentifier().toString());
+
+ }
+ });
+
// Get all the markers available
List<DataRecord> markerFiles =
((SharedDataStore)
blobStore).getAllMetadataRecords(SharedStoreRecordType.MARKED_START_MARKER.getType());
Map<String, DataRecord> markers = Maps.uniqueIndex(markerFiles,
new Function<DataRecord, String>() {
@Override
public String apply(DataRecord input) {
- return
SharedStoreRecordType.MARKED_START_MARKER.getIdFromName(input.getIdentifier().toString());
+ return
input.getIdentifier().toString().substring(SharedStoreRecordType.MARKED_START_MARKER.getType().length()
+ 1);
}
});
-
+
// Get all the repositories registered
List<DataRecord> repoFiles =
((SharedDataStore)
blobStore).getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType());
-
+
for (DataRecord repoRec : repoFiles) {
- String id =
SharedStoreRecordType.REFERENCES.getIdFromName(repoRec.getIdentifier().toString());
+ String id =
SharedStoreRecordType.REPOSITORY.getIdFromName(repoRec.getIdentifier().toString());
+
GarbageCollectionRepoStats stat = new
GarbageCollectionRepoStats();
+ stats.add(stat);
+
stat.setRepositoryId(id);
if (id != null && id.equals(repoId)) {
stat.setLocal(true);
}
if (references.containsKey(id)) {
- DataRecord refRec = references.get(id);
- stat.setEndTime(refRec.getLastModified());
- stat.setLength(refRec.getLength());
-
- if (markers.containsKey(id)) {
- stat.setStartTime(markers.get(id).getLastModified());
- }
-
- LineNumberReader reader = null;
- try {
- reader = new LineNumberReader(new
InputStreamReader(refRec.getStream()));
- while (reader.readLine() != null) {
+ ImmutableList<DataRecord> refRecs = references.get(id);
+ for(DataRecord refRec : refRecs) {
+ String uniqueSessionId =
refRec.getIdentifier().toString()
+
.substring(SharedStoreRecordType.REFERENCES.getType().length() + 1);
+
+ stat.setEndTime(refRec.getLastModified());
+ stat.setLength(refRec.getLength());
+
+ if (markers.containsKey(uniqueSessionId)) {
+
stat.setStartTime(markers.get(uniqueSessionId).getLastModified());
+ }
+
+ LineNumberReader reader = null;
+ try {
+ reader = new LineNumberReader(new
InputStreamReader(refRec.getStream()));
+ while (reader.readLine() != null) {
+ }
+ stat.setNumLines(reader.getLineNumber());
+ } finally {
+ Closeables.close(reader, true);
}
- stat.setNumLines(reader.getLineNumber());
- } finally {
- Closeables.close(reader, true);
}
}
- stats.add(stat);
}
}
return stats;
@@ -367,14 +378,16 @@ public class MarkSweepGarbageCollector i
protected void mark(GarbageCollectorFileState fs) throws IOException,
DataStoreException {
LOG.debug("Starting mark phase of the garbage collector");
+ String uniqueSuffix = UUID.randomUUID().toString();
+
// Create a time marker in the data store if applicable
- GarbageCollectionType.get(blobStore).addMarkedStartMarker(blobStore,
repoId);
+ GarbageCollectionType.get(blobStore).addMarkedStartMarker(blobStore,
repoId, uniqueSuffix);
// Mark all used references
iterateNodeTree(fs, false);
// Move the marked references file to the data store meta area if
applicable
- GarbageCollectionType.get(blobStore).addMarked(blobStore, fs, repoId);
+ GarbageCollectionType.get(blobStore).addMarked(blobStore, fs, repoId,
uniqueSuffix);
LOG.debug("Ending mark phase of the garbage collector");
}
@@ -809,12 +822,13 @@ public class MarkSweepGarbageCollector i
* @param blobStore the blob store
* @param fs the fs
* @param repoId the repo id
+ * @param uniqueSuffix the unique session suffix
* @throws DataStoreException the data store exception
* @throws IOException Signals that an I/O exception has occurred.
*/
@Override
- void addMarked(GarbageCollectableBlobStore blobStore,
GarbageCollectorFileState fs,
- String repoId) throws DataStoreException, IOException {
+ void addMarked(GarbageCollectableBlobStore blobStore,
GarbageCollectorFileState fs, String repoId,
+ String uniqueSuffix) throws DataStoreException, IOException {
boolean exists = ((SharedDataStore) blobStore)
.metadataRecordExists(SharedStoreRecordType.REFERENCES.getNameFromId(repoId));
if (exists) {
@@ -823,15 +837,16 @@ public class MarkSweepGarbageCollector i
}
((SharedDataStore)
blobStore).addMetadataRecord(fs.getMarkedRefs(),
SharedStoreRecordType.REFERENCES
- .getNameFromIdPrefix(repoId,
UUID.randomUUID().toString()));
+ .getNameFromIdPrefix(repoId, uniqueSuffix));
}
@Override
- public void addMarkedStartMarker(GarbageCollectableBlobStore
blobStore, String repoId) {
+ public void addMarkedStartMarker(GarbageCollectableBlobStore
blobStore, String repoId,
+ String uniqueSuffix) {
try {
((SharedDataStore) blobStore).addMetadataRecord(new
ByteArrayInputStream(new byte[0]),
SharedStoreRecordType.MARKED_START_MARKER
- .getNameFromIdPrefix(repoId,
UUID.randomUUID().toString()));
+ .getNameFromIdPrefix(repoId, uniqueSuffix));
} catch (DataStoreException e) {
LOG.debug("Error creating marked time marker for repo :
{}", repoId);
}
@@ -841,8 +856,8 @@ public class MarkSweepGarbageCollector i
void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore)
{}
- void addMarked(GarbageCollectableBlobStore blobStore,
GarbageCollectorFileState fs,
- String repoId) throws DataStoreException, IOException {}
+ void addMarked(GarbageCollectableBlobStore blobStore,
GarbageCollectorFileState fs, String repoId,
+ String uniqueSuffix) throws DataStoreException, IOException {}
long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore,
GarbageCollectorFileState fs)
@@ -861,7 +876,7 @@ public class MarkSweepGarbageCollector i
return DEFAULT;
}
- public void addMarkedStartMarker(GarbageCollectableBlobStore
blobStore, String repoId) {}
+ public void addMarkedStartMarker(GarbageCollectableBlobStore
blobStore, String repoId, String uniqueSuffix) {}
}
/**
Modified:
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java?rev=1841829&r1=1841828&r2=1841829&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
Mon Sep 24 11:04:18 2018
@@ -250,6 +250,66 @@ public class SharedBlobStoreGCTest {
assertTrue(existing.containsAll(cluster3.getInitBlobs()));
}
+ @Test
+ public void testGCStatsOnCloned() throws Exception {
+ log.debug("Running testGCStatsOnCloned()");
+ BlobStore blobeStore3 = getBlobStore(rootFolder);
+ DocumentNodeStore ds3 = new DocumentMK.Builder()
+ .setAsyncDelay(0)
+ .setDocumentStore(new MemoryDocumentStore())
+ .setBlobStore(blobeStore3)
+ .clock(clock)
+ .getNodeStore();
+ NodeBuilder a = ds3.getRoot().builder();
+ a.child(":clusterConfig").setProperty(":clusterId", cluster2.repoId);
+ Cluster cluster3 = new Cluster(ds3, cluster2.repoId, 120);
+ cluster3.init();
+
+ Set<String> actualRepoIds = Sets.newHashSet();
+ actualRepoIds.add(cluster1.repoId);
+ actualRepoIds.add(cluster2.repoId);
+
+ log.debug("Initialized {}", cluster3);
+
+ Set<String> observedRepoIds = Sets.newHashSet();
+ List<GarbageCollectionRepoStats> statsList = cluster1.gc.getStats();
+ for (GarbageCollectionRepoStats stat : statsList) {
+ assertEquals(0, stat.getNumLines());
+ observedRepoIds.add(stat.getRepositoryId());
+ if (stat.getRepositoryId().equals(cluster1.repoId)) {
+ assertTrue(stat.isLocal());
+ }
+ }
+
+ assertTrue(Sets.difference(actualRepoIds, observedRepoIds).isEmpty());
+
+ // Only run the mark phase on all the nodes to get the stats
+ cluster1.gc.collectGarbage(true);
+ cluster2.gc.collectGarbage(true);
+ cluster3.gc.collectGarbage(true);
+
+
+ Set<Integer> actualNumBlobs = Sets.newHashSet();
+ actualNumBlobs.add(cluster1.initBlobs.size());
+ actualNumBlobs.add(cluster2.initBlobs.size());
+ actualNumBlobs.add(cluster3.initBlobs.size());
+
+ statsList = cluster1.gc.getStats();
+ Set<Integer> observedNumBlobs = Sets.newHashSet();
+ observedRepoIds = Sets.newHashSet();
+ for (GarbageCollectionRepoStats stat : statsList) {
+ observedNumBlobs.add(stat.getNumLines());
+ observedRepoIds.add(stat.getRepositoryId());
+ assertTrue(stat.getStartTime() <= stat.getEndTime());
+ if (stat.getRepositoryId().equals(cluster1.repoId)) {
+ assertTrue(stat.isLocal());
+ }
+ }
+
+ assertTrue(Sets.difference(actualNumBlobs,
observedNumBlobs).isEmpty());
+ assertTrue(Sets.difference(actualRepoIds, observedRepoIds).isEmpty());
+ }
+
@After
public void tearDown() throws Exception {
DataStoreUtils.time = -1;