This is an automated email from the ASF dual-hosted git repository. jmark99 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 485114a Remove continue point from Garbage Collector (#2214) 485114a is described below commit 485114a4760028e70d2eb4ed23b795ed386fbaa1 Author: Mark Owens <jmar...@apache.org> AuthorDate: Thu Jul 29 08:07:56 2021 -0400 Remove continue point from Garbage Collector (#2214) Updated Garbage Collection code to no longer use a continue point when processing deletion candidates. The GC now uses an iterator that lasts during the lifetime of a GC cycle. The GarbageCollectionTest was updated to work with the update, as was the GC integration test. Closes #1351 --- .../accumulo/core/metadata/schema/Ample.java | 2 +- .../accumulo/server/metadata/ServerAmpleImpl.java | 14 +-- .../accumulo/server/util/ListVolumesUsed.java | 2 +- .../accumulo/gc/GarbageCollectionAlgorithm.java | 41 +++--- .../accumulo/gc/GarbageCollectionEnvironment.java | 25 ++-- .../apache/accumulo/gc/SimpleGarbageCollector.java | 22 ++-- .../apache/accumulo/gc/GarbageCollectionTest.java | 139 ++++++++++++++++++++- .../test/functional/GarbageCollectorIT.java | 3 + 8 files changed, 185 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index fd5634e..18bc12a 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -198,7 +198,7 @@ public interface Ample { throw new UnsupportedOperationException(); } - default Iterator<String> getGcCandidates(DataLevel level, String continuePoint) { + default Iterator<String> getGcCandidates(DataLevel level) { throw new UnsupportedOperationException(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index 1f7eefe..5ef7237 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -32,9 +32,7 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -163,7 +161,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { } @Override - public Iterator<String> getGcCandidates(DataLevel level, String continuePoint) { + public Iterator<String> getGcCandidates(DataLevel level) { if (level == DataLevel.ROOT) { var zooReader = new ZooReader(context.getZooKeepers(), context.getZooKeepersSessionTimeOut()); byte[] json; @@ -173,19 +171,9 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { throw new RuntimeException(e); } Stream<String> candidates = RootGcCandidates.fromJson(json).stream().sorted(); - - if (continuePoint != null && !continuePoint.isEmpty()) { - candidates = candidates.dropWhile(candidate -> candidate.compareTo(continuePoint) <= 0); - } - return candidates.iterator(); } else if (level == DataLevel.METADATA || level == DataLevel.USER) { Range range = DeletesSection.getRange(); - if (continuePoint != null && !continuePoint.isEmpty()) { - String continueRow = DeletesSection.encodeRow(continuePoint); - range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true, - range.getEndKey(), range.isEndKeyInclusive()); - } Scanner scanner; try { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java index 3eb8c80..212822f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java @@ -78,7 +78,7 @@ public class ListVolumesUsed { + " deletes section (volume replacement occurs at deletion time)"); volumes.clear(); - Iterator<String> delPaths = context.getAmple().getGcCandidates(level, ""); + Iterator<String> delPaths = context.getAmple().getGcCandidates(level); while (delPaths.hasNext()) { volumes.add(getTableURI(delPaths.next())); } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java index 595af24..643f46c 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java @@ -267,10 +267,10 @@ public class GarbageCollectionAlgorithm { } - private boolean getCandidates(GarbageCollectionEnvironment gce, String lastCandidate, - List<String> candidates) throws TableNotFoundException { + private Iterator<String> getCandidates(GarbageCollectionEnvironment gce) + throws TableNotFoundException, IOException { try (TraceScope candidatesSpan = Trace.startSpan("getCandidates")) { - return gce.getCandidates(lastCandidate, candidates); + return gce.getCandidates(); } } @@ -292,28 +292,29 @@ public class GarbageCollectionAlgorithm { public void collect(GarbageCollectionEnvironment gce) throws TableNotFoundException, IOException { - String lastCandidate = ""; + Iterator<String> candidatesIter = getCandidates(gce); - boolean outOfMemory = true; - while (outOfMemory) { - List<String> candidates = new ArrayList<>(); - - outOfMemory = getCandidates(gce, lastCandidate, candidates); + while (candidatesIter.hasNext()) { + List<String> batchOfCandidates = gce.readCandidatesThatFitInMemory(candidatesIter); + deleteBatch(gce, batchOfCandidates); + } + } - if (candidates.isEmpty()) - break; - else - lastCandidate = candidates.get(candidates.size() - 1); + /** + * Given a sub-list of possible deletion candidates, process and remove valid deletion candidates. + */ + private void deleteBatch(GarbageCollectionEnvironment gce, List<String> currentBatch) + throws TableNotFoundException, IOException { - long origSize = candidates.size(); - gce.incrementCandidatesStat(origSize); + long origSize = currentBatch.size(); + gce.incrementCandidatesStat(origSize); - SortedMap<String,String> candidateMap = makeRelative(candidates); + SortedMap<String,String> candidateMap = makeRelative(currentBatch); - confirmDeletesTrace(gce, candidateMap); - gce.incrementInUseStat(origSize - candidateMap.size()); + confirmDeletesTrace(gce, candidateMap); + gce.incrementInUseStat(origSize - candidateMap.size()); - deleteConfirmed(gce, candidateMap); - } + deleteConfirmed(gce, candidateMap); } + } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java index ae8ffbb..add7c7f 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java @@ -40,19 +40,22 @@ import org.apache.accumulo.server.replication.proto.Replication.Status; public interface GarbageCollectionEnvironment { /** - * Return a list of paths to files and dirs which are candidates for deletion from a given table, - * {@link RootTable#NAME} or {@link MetadataTable#NAME} + * Return an iterator which points to a list of paths to files and dirs which are candidates for + * deletion from a given table, {@link RootTable#NAME} or {@link MetadataTable#NAME} + * + * @return an iterator referencing a List containing deletion candidates + */ + Iterator<String> getCandidates() throws TableNotFoundException; + + /** + * Given an iterator to a deletion candidate list, return a sub-list of candidates which fit + * within provided memory constraints. * - * @param continuePoint - * A row to resume from if a previous invocation was stopped due to finding an extremely - * large number of candidates to remove which would have exceeded memory limitations - * @param candidates - * A collection of candidates files for deletion, may not be the complete collection of - * files for deletion at this point in time - * @return true if the results are short due to insufficient memory, otherwise false + * @param candidatesIter + * iterator referencing a List of possible deletion candidates + * @return a List of possible deletion candidates */ - boolean getCandidates(String continuePoint, List<String> candidates) - throws TableNotFoundException; + List<String> readCandidatesThatFitInMemory(Iterator<String> candidatesIter); /** * Fetch a list of paths for all bulk loads in progress (blip) from a given table, diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 2223294..10bed91 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -209,29 +209,29 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { } @Override - public boolean getCandidates(String continuePoint, List<String> result) - throws TableNotFoundException { + public Iterator<String> getCandidates() throws TableNotFoundException { + return getContext().getAmple().getGcCandidates(level); + } - Iterator<String> candidates = getContext().getAmple().getGcCandidates(level, continuePoint); + @Override + public List<String> readCandidatesThatFitInMemory(Iterator<String> candidates) { long candidateLength = 0; // Converting the bytes to approximate number of characters for batch size. long candidateBatchSize = getCandidateBatchSize() / 2; - result.clear(); + List<String> candidatesBatch = new ArrayList<>(); while (candidates.hasNext()) { String candidate = candidates.next(); candidateLength += candidate.length(); - result.add(candidate); + candidatesBatch.add(candidate); if (candidateLength > candidateBatchSize) { - log.info( - "Candidate batch of size {} has exceeded the" - + " threshold. Attempting to delete what has been gathered so far.", - candidateLength); - return true; + log.info("Candidate batch of size {} has exceeded the threshold. Attempting to delete " + + "what has been gathered so far.", candidateLength); + return candidatesBatch; } } - return false; + return candidatesBatch; } @Override diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java index cd557ae..c6736ed 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java @@ -34,12 +34,18 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.stream.Stream; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.server.replication.StatusUtil; import org.apache.accumulo.server.replication.proto.Replication.Status; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class GarbageCollectionTest { + + private static final Logger log = LoggerFactory.getLogger(GarbageCollectionTest.class); + static class TestGCE implements GarbageCollectionEnvironment { TreeSet<String> candidates = new TreeSet<>(); ArrayList<String> blips = new ArrayList<>(); @@ -51,13 +57,17 @@ public class GarbageCollectionTest { TreeMap<String,Status> filesToReplicate = new TreeMap<>(); @Override - public boolean getCandidates(String continuePoint, List<String> ret) { - Iterator<String> iter = candidates.tailSet(continuePoint, false).iterator(); - while (iter.hasNext() && ret.size() < 3) { - ret.add(iter.next()); - } + public Iterator<String> getCandidates() throws TableNotFoundException { + return List.copyOf(candidates).iterator(); + } - return ret.size() == 3; + @Override + public List<String> readCandidatesThatFitInMemory(Iterator<String> candidatesIter) { + List<String> candidatesBatch = new ArrayList<>(); + while (candidatesIter.hasNext() && candidatesBatch.size() < 3) { + candidatesBatch.add(candidatesIter.next()); + } + return candidatesBatch; } @Override @@ -123,6 +133,28 @@ public class GarbageCollectionTest { assertEquals(0, gce.deletes.size()); } + // This test was created to help track down a ConcurrentModificationException error that was + // occurring with the unit tests once the GC was refactored to use a single iterator for the + // collect process. This was a minimal test case that would cause the exception to occur. + @Test + public void minimalDelete() throws Exception { + TestGCE gce = new TestGCE(); + + gce.candidates.add("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t0/F006.rf"); + + gce.addFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf"); + gce.addFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf"); + gce.addFileReference("6", null, "hdfs://foo.com:6000/accumulo/tables/6/t0/F006.rf"); + + GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm(); + gca.collect(gce); + + assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf"); + } + @Test public void testBasic() throws Exception { TestGCE gce = new TestGCE(); @@ -166,6 +198,101 @@ public class GarbageCollectionTest { } + /* + * Additional test with more candidates. Also, not a multiple of 3 as the test above. Since the + * unit tests always return 3 candidates in a batch, some edge cases could be missed if that was + * always the case. + */ + @Test + public void testBasic2() throws Exception { + TestGCE gce = new TestGCE(); + + gce.candidates.add("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf"); + + gce.candidates.add("hdfs://foo:6000/accumulo/tables/5/t0/F000.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F001.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t1/F005.rf"); + + gce.candidates.add("hdfs://foo:6000/accumulo/tables/6/t0/F000.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t0/F001.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/7/t0/F005.rf"); + + gce.candidates.add("hdfs://foo:6000/accumulo/tables/7/t0/F000.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/7/t0/F001.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/8/t0/F005.rf"); + + gce.candidates.add("hdfs://foo:6000/accumulo/tables/8/t0/F000.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/8/t0/F001.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/9/t0/F005.rf"); + + gce.candidates.add("hdfs://foo:6000/accumulo/tables/9/t0/F000.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/9/t0/F001.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/10/t0/F005.rf"); + + gce.candidates.add("hdfs://foo:6000/accumulo/tables/10/t0/F000.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/10/t0/F001.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/11/t0/F005.rf"); + + gce.candidates.add("hdfs://foo:6000/accumulo/tables/11/t0/F000.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/11/t0/F001.rf"); + + gce.addFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf"); + gce.addFileReference("4", null, "hdfs://foo:6000/accumulo/tables/4/t0/F001.rf"); + gce.addFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0//F002.rf"); + gce.addFileReference("5", null, "hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf"); + + GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm(); + + gca.collect(gce); + // items to be removed from candidates + String[] toBeRemoved = {"hdfs://foo.com:6000/accumulo/tables/5/t0/F001.rf", + "hdfs://foo:6000/accumulo/tables/5/t0/F000.rf", + "hdfs://foo.com:6000/accumulo/tables/6/t1/F005.rf", + "hdfs://foo:6000/accumulo/tables/6/t0/F000.rf", + "hdfs://foo.com:6000/accumulo/tables/6/t0/F001.rf", + "hdfs://foo.com:6000/accumulo/tables/7/t0/F005.rf", + "hdfs://foo:6000/accumulo/tables/7/t0/F000.rf", + "hdfs://foo.com:6000/accumulo/tables/7/t0/F001.rf", + "hdfs://foo.com:6000/accumulo/tables/8/t0/F005.rf", + "hdfs://foo:6000/accumulo/tables/8/t0/F000.rf", + "hdfs://foo.com:6000/accumulo/tables/8/t0/F001.rf", + "hdfs://foo.com:6000/accumulo/tables/9/t0/F005.rf", + "hdfs://foo:6000/accumulo/tables/9/t0/F000.rf", + "hdfs://foo.com:6000/accumulo/tables/9/t0/F001.rf", + "hdfs://foo.com:6000/accumulo/tables/10/t0/F005.rf", + "hdfs://foo:6000/accumulo/tables/10/t0/F000.rf", + "hdfs://foo.com:6000/accumulo/tables/10/t0/F001.rf", + "hdfs://foo.com:6000/accumulo/tables/11/t0/F005.rf", + "hdfs://foo:6000/accumulo/tables/11/t0/F000.rf", + "hdfs://foo.com:6000/accumulo/tables/11/t0/F001.rf"}; + assertRemoved(gce, toBeRemoved); + + // Remove the reference to this flush file, run the GC which should not trim it from the + // candidates, and assert that it's gone + gce.removeFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf"); + gca.collect(gce); + assertRemoved(gce, "hdfs://foo:6000/accumulo/tables/4/t0/F000.rf"); + + // Removing a reference to a file that wasn't in the candidates should do nothing + gce.removeFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0/F002.rf"); + gca.collect(gce); + assertRemoved(gce); + + // Remove the reference to a file in the candidates should cause it to be removed + gce.removeFileReference("4", null, "hdfs://foo:6000/accumulo/tables/4/t0/F001.rf"); + gca.collect(gce); + assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf"); + + // Adding more candidates which do no have references should be removed + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf"); + gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf"); + gca.collect(gce); + assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf", + "hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf"); + } + @Test public void testRelative() throws Exception { TestGCE gce = new TestGCE(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java index f0ac9c1..6452f20 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java @@ -90,6 +90,9 @@ public class GarbageCollectorIT extends ConfigurableMacBase { cfg.setProperty(Property.GC_PORT, "0"); cfg.setProperty(Property.TSERV_MAXMEM, "5K"); cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); + // reduce the batch size significantly in order to cause the integration tests to have + // to process many batches of deletion candidates. + cfg.setProperty(Property.GC_CANDIDATE_BATCH_SIZE, "256K"); // use raw local file system so walogs sync and flush will work hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());