Author: amitj
Date: Mon Sep 24 11:04:18 2018
New Revision: 1841829

URL: http://svn.apache.org/viewvc?rev=1841829&view=rev
Log:
OAK-7777: Fix getStats in GC to account also for duplicate entries of references

- Handling duplicate entries for a particular id

Modified:
    
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java

Modified: 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1841829&r1=1841828&r2=1841829&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
 Mon Sep 24 11:04:18 2018
@@ -51,6 +51,9 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.StandardSystemProperty;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -242,55 +245,63 @@ public class MarkSweepGarbageCollector i
             // Get all the references available
             List<DataRecord> refFiles =
                 ((SharedDataStore) 
blobStore).getAllMetadataRecords(SharedStoreRecordType.REFERENCES.getType());
-            Map<String, DataRecord> references = Maps.uniqueIndex(refFiles, 
new Function<DataRecord, String>() {
-                @Override 
-                public String apply(DataRecord input) {
-                    return 
SharedStoreRecordType.REFERENCES.getIdFromName(input.getIdentifier().toString());
-                }
-            });
-    
+            ImmutableListMultimap<String, DataRecord> references =
+                FluentIterable.from(refFiles).index(new Function<DataRecord, 
String>() {
+                    @Override public String apply(DataRecord input) {
+                        return 
SharedStoreRecordType.REFERENCES.getIdFromName(input.getIdentifier().toString());
+
+                    }
+                });
+
             // Get all the markers available
             List<DataRecord> markerFiles =
                 ((SharedDataStore) 
blobStore).getAllMetadataRecords(SharedStoreRecordType.MARKED_START_MARKER.getType());
             Map<String, DataRecord> markers = Maps.uniqueIndex(markerFiles, 
new Function<DataRecord, String>() {
                 @Override
                 public String apply(DataRecord input) {
-                    return 
SharedStoreRecordType.MARKED_START_MARKER.getIdFromName(input.getIdentifier().toString());
+                    return 
input.getIdentifier().toString().substring(SharedStoreRecordType.MARKED_START_MARKER.getType().length()
 + 1);
                 }
             });
-            
+
             // Get all the repositories registered
             List<DataRecord> repoFiles =
                 ((SharedDataStore) 
blobStore).getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType());
-    
+
             for (DataRecord repoRec : repoFiles) {
-                String id = 
SharedStoreRecordType.REFERENCES.getIdFromName(repoRec.getIdentifier().toString());
+                String id = 
SharedStoreRecordType.REPOSITORY.getIdFromName(repoRec.getIdentifier().toString());
+
                 GarbageCollectionRepoStats stat = new 
GarbageCollectionRepoStats();
+                stats.add(stat);
+
                 stat.setRepositoryId(id);
                 if (id != null && id.equals(repoId)) {
                     stat.setLocal(true);
                 }
 
                 if (references.containsKey(id)) {
-                    DataRecord refRec = references.get(id);
-                    stat.setEndTime(refRec.getLastModified());
-                    stat.setLength(refRec.getLength());
-                    
-                    if (markers.containsKey(id)) {
-                        stat.setStartTime(markers.get(id).getLastModified());
-                    }
-                    
-                    LineNumberReader reader = null;
-                    try {
-                        reader = new LineNumberReader(new 
InputStreamReader(refRec.getStream()));
-                        while (reader.readLine() != null) {
+                    ImmutableList<DataRecord> refRecs = references.get(id);
+                    for(DataRecord refRec : refRecs) {
+                        String uniqueSessionId = 
refRec.getIdentifier().toString()
+                            
.substring(SharedStoreRecordType.REFERENCES.getType().length() + 1);
+
+                        stat.setEndTime(refRec.getLastModified());
+                        stat.setLength(refRec.getLength());
+
+                        if (markers.containsKey(uniqueSessionId)) {
+                            
stat.setStartTime(markers.get(uniqueSessionId).getLastModified());
+                        }
+
+                        LineNumberReader reader = null;
+                        try {
+                            reader = new LineNumberReader(new 
InputStreamReader(refRec.getStream()));
+                            while (reader.readLine() != null) {
+                            }
+                            stat.setNumLines(reader.getLineNumber());
+                        } finally {
+                            Closeables.close(reader, true);
                         }
-                        stat.setNumLines(reader.getLineNumber());
-                    } finally {
-                        Closeables.close(reader, true);
                     }
                 }
-                stats.add(stat);
             }
         }
         return stats;
@@ -367,14 +378,16 @@ public class MarkSweepGarbageCollector i
     protected void mark(GarbageCollectorFileState fs) throws IOException, 
DataStoreException {
         LOG.debug("Starting mark phase of the garbage collector");
 
+        String uniqueSuffix = UUID.randomUUID().toString();
+
         // Create a time marker in the data store if applicable
-        GarbageCollectionType.get(blobStore).addMarkedStartMarker(blobStore, 
repoId);
+        GarbageCollectionType.get(blobStore).addMarkedStartMarker(blobStore, 
repoId, uniqueSuffix);
 
         // Mark all used references
         iterateNodeTree(fs, false);
 
         // Move the marked references file to the data store meta area if 
applicable
-        GarbageCollectionType.get(blobStore).addMarked(blobStore, fs, repoId);
+        GarbageCollectionType.get(blobStore).addMarked(blobStore, fs, repoId, 
uniqueSuffix);
 
         LOG.debug("Ending mark phase of the garbage collector");
     }
@@ -809,12 +822,13 @@ public class MarkSweepGarbageCollector i
              * @param blobStore the blob store
              * @param fs the fs
              * @param repoId the repo id
+             * @param uniqueSuffix the unique session suffix
              * @throws DataStoreException the data store exception
              * @throws IOException Signals that an I/O exception has occurred.
              */
             @Override
-            void addMarked(GarbageCollectableBlobStore blobStore, 
GarbageCollectorFileState fs,
-                    String repoId) throws DataStoreException, IOException {
+            void addMarked(GarbageCollectableBlobStore blobStore, 
GarbageCollectorFileState fs, String repoId,
+                String uniqueSuffix) throws DataStoreException, IOException {
                 boolean exists = ((SharedDataStore) blobStore)
                     
.metadataRecordExists(SharedStoreRecordType.REFERENCES.getNameFromId(repoId));
                 if (exists) {
@@ -823,15 +837,16 @@ public class MarkSweepGarbageCollector i
                 }
 
                 ((SharedDataStore) 
blobStore).addMetadataRecord(fs.getMarkedRefs(), 
SharedStoreRecordType.REFERENCES
-                    .getNameFromIdPrefix(repoId, 
UUID.randomUUID().toString()));
+                    .getNameFromIdPrefix(repoId, uniqueSuffix));
             }
             
             @Override
-            public void addMarkedStartMarker(GarbageCollectableBlobStore 
blobStore, String repoId) {
+            public void addMarkedStartMarker(GarbageCollectableBlobStore 
blobStore, String repoId,
+                String uniqueSuffix) {
                 try {
                     ((SharedDataStore) blobStore).addMetadataRecord(new 
ByteArrayInputStream(new byte[0]),
                         SharedStoreRecordType.MARKED_START_MARKER
-                            .getNameFromIdPrefix(repoId, 
UUID.randomUUID().toString()));
+                            .getNameFromIdPrefix(repoId, uniqueSuffix));
                 } catch (DataStoreException e) {
                     LOG.debug("Error creating marked time marker for repo : 
{}", repoId);
                 }
@@ -841,8 +856,8 @@ public class MarkSweepGarbageCollector i
 
         void removeAllMarkedReferences(GarbageCollectableBlobStore blobStore) 
{}
 
-        void addMarked(GarbageCollectableBlobStore blobStore, 
GarbageCollectorFileState fs,
-                String repoId) throws DataStoreException, IOException {}
+        void addMarked(GarbageCollectableBlobStore blobStore, 
GarbageCollectorFileState fs, String repoId,
+            String uniqueSuffix) throws DataStoreException, IOException {}
 
         long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore,
                 GarbageCollectorFileState fs)
@@ -861,7 +876,7 @@ public class MarkSweepGarbageCollector i
             return DEFAULT;
         }
     
-        public void addMarkedStartMarker(GarbageCollectableBlobStore 
blobStore, String repoId) {}
+        public void addMarkedStartMarker(GarbageCollectableBlobStore 
blobStore, String repoId, String uniqueSuffix) {}
     }
 
     /**

Modified: 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java?rev=1841829&r1=1841828&r2=1841829&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/SharedBlobStoreGCTest.java
 Mon Sep 24 11:04:18 2018
@@ -250,6 +250,66 @@ public class SharedBlobStoreGCTest {
         assertTrue(existing.containsAll(cluster3.getInitBlobs()));
     }
 
+    @Test
+    public void testGCStatsOnCloned() throws Exception {
+        log.debug("Running testGCStatsOnCloned()");
+        BlobStore blobeStore3 = getBlobStore(rootFolder);
+        DocumentNodeStore ds3 = new DocumentMK.Builder()
+            .setAsyncDelay(0)
+            .setDocumentStore(new MemoryDocumentStore())
+            .setBlobStore(blobeStore3)
+            .clock(clock)
+            .getNodeStore();
+        NodeBuilder a = ds3.getRoot().builder();
+        a.child(":clusterConfig").setProperty(":clusterId", cluster2.repoId);
+        Cluster cluster3 = new Cluster(ds3, cluster2.repoId, 120);
+        cluster3.init();
+
+        Set<String> actualRepoIds = Sets.newHashSet();
+        actualRepoIds.add(cluster1.repoId);
+        actualRepoIds.add(cluster2.repoId);
+
+        log.debug("Initialized {}", cluster3);
+
+        Set<String> observedRepoIds = Sets.newHashSet();
+        List<GarbageCollectionRepoStats> statsList = cluster1.gc.getStats();
+        for (GarbageCollectionRepoStats stat : statsList) {
+            assertEquals(0, stat.getNumLines());
+            observedRepoIds.add(stat.getRepositoryId());
+            if (stat.getRepositoryId().equals(cluster1.repoId)) {
+                assertTrue(stat.isLocal());
+            }
+        }
+
+        assertTrue(Sets.difference(actualRepoIds, observedRepoIds).isEmpty());
+
+        // Only run the mark phase on all the nodes to get the stats
+        cluster1.gc.collectGarbage(true);
+        cluster2.gc.collectGarbage(true);
+        cluster3.gc.collectGarbage(true);
+
+
+        Set<Integer> actualNumBlobs = Sets.newHashSet();
+        actualNumBlobs.add(cluster1.initBlobs.size());
+        actualNumBlobs.add(cluster2.initBlobs.size());
+        actualNumBlobs.add(cluster3.initBlobs.size());
+
+        statsList = cluster1.gc.getStats();
+        Set<Integer> observedNumBlobs = Sets.newHashSet();
+        observedRepoIds = Sets.newHashSet();
+        for (GarbageCollectionRepoStats stat : statsList) {
+            observedNumBlobs.add(stat.getNumLines());
+            observedRepoIds.add(stat.getRepositoryId());
+            assertTrue(stat.getStartTime() <= stat.getEndTime());
+            if (stat.getRepositoryId().equals(cluster1.repoId)) {
+                assertTrue(stat.isLocal());
+            }
+        }
+
+        assertTrue(Sets.difference(actualNumBlobs, 
observedNumBlobs).isEmpty());
+        assertTrue(Sets.difference(actualRepoIds, observedRepoIds).isEmpty());
+    }
+
     @After
     public void tearDown() throws Exception {
         DataStoreUtils.time = -1;


Reply via email to