This is an automated email from the ASF dual-hosted git repository.

jmark99 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 485114a  Remove continue point from Garbage Collector (#2214)
485114a is described below

commit 485114a4760028e70d2eb4ed23b795ed386fbaa1
Author: Mark Owens <jmar...@apache.org>
AuthorDate: Thu Jul 29 08:07:56 2021 -0400

    Remove continue point from Garbage Collector (#2214)
    
    Updated Garbage Collection code to no longer use a continue point when 
processing deletion candidates. The GC  now uses an iterator that lasts during 
the lifetime of a GC cycle.
    
    The GarbageCollectionTest was updated to work with the update, as was the 
GC integration test.
    
    Closes #1351
---
 .../accumulo/core/metadata/schema/Ample.java       |   2 +-
 .../accumulo/server/metadata/ServerAmpleImpl.java  |  14 +--
 .../accumulo/server/util/ListVolumesUsed.java      |   2 +-
 .../accumulo/gc/GarbageCollectionAlgorithm.java    |  41 +++---
 .../accumulo/gc/GarbageCollectionEnvironment.java  |  25 ++--
 .../apache/accumulo/gc/SimpleGarbageCollector.java |  22 ++--
 .../apache/accumulo/gc/GarbageCollectionTest.java  | 139 ++++++++++++++++++++-
 .../test/functional/GarbageCollectorIT.java        |   3 +
 8 files changed, 185 insertions(+), 63 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index fd5634e..18bc12a 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -198,7 +198,7 @@ public interface Ample {
     throw new UnsupportedOperationException();
   }
 
-  default Iterator<String> getGcCandidates(DataLevel level, String 
continuePoint) {
+  default Iterator<String> getGcCandidates(DataLevel level) {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 1f7eefe..5ef7237 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -32,9 +32,7 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -163,7 +161,7 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
   }
 
   @Override
-  public Iterator<String> getGcCandidates(DataLevel level, String 
continuePoint) {
+  public Iterator<String> getGcCandidates(DataLevel level) {
     if (level == DataLevel.ROOT) {
       var zooReader = new ZooReader(context.getZooKeepers(), 
context.getZooKeepersSessionTimeOut());
       byte[] json;
@@ -173,19 +171,9 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
         throw new RuntimeException(e);
       }
       Stream<String> candidates = 
RootGcCandidates.fromJson(json).stream().sorted();
-
-      if (continuePoint != null && !continuePoint.isEmpty()) {
-        candidates = candidates.dropWhile(candidate -> 
candidate.compareTo(continuePoint) <= 0);
-      }
-
       return candidates.iterator();
     } else if (level == DataLevel.METADATA || level == DataLevel.USER) {
       Range range = DeletesSection.getRange();
-      if (continuePoint != null && !continuePoint.isEmpty()) {
-        String continueRow = DeletesSection.encodeRow(continuePoint);
-        range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), 
true,
-            range.getEndKey(), range.isEndKeyInclusive());
-      }
 
       Scanner scanner;
       try {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 3eb8c80..212822f 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -78,7 +78,7 @@ public class ListVolumesUsed {
         + " deletes section (volume replacement occurs at deletion time)");
     volumes.clear();
 
-    Iterator<String> delPaths = context.getAmple().getGcCandidates(level, "");
+    Iterator<String> delPaths = context.getAmple().getGcCandidates(level);
     while (delPaths.hasNext()) {
       volumes.add(getTableURI(delPaths.next()));
     }
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
index 595af24..643f46c 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
@@ -267,10 +267,10 @@ public class GarbageCollectionAlgorithm {
 
   }
 
-  private boolean getCandidates(GarbageCollectionEnvironment gce, String 
lastCandidate,
-      List<String> candidates) throws TableNotFoundException {
+  private Iterator<String> getCandidates(GarbageCollectionEnvironment gce)
+      throws TableNotFoundException, IOException {
     try (TraceScope candidatesSpan = Trace.startSpan("getCandidates")) {
-      return gce.getCandidates(lastCandidate, candidates);
+      return gce.getCandidates();
     }
   }
 
@@ -292,28 +292,29 @@ public class GarbageCollectionAlgorithm {
 
   public void collect(GarbageCollectionEnvironment gce) throws 
TableNotFoundException, IOException {
 
-    String lastCandidate = "";
+    Iterator<String> candidatesIter = getCandidates(gce);
 
-    boolean outOfMemory = true;
-    while (outOfMemory) {
-      List<String> candidates = new ArrayList<>();
-
-      outOfMemory = getCandidates(gce, lastCandidate, candidates);
+    while (candidatesIter.hasNext()) {
+      List<String> batchOfCandidates = 
gce.readCandidatesThatFitInMemory(candidatesIter);
+      deleteBatch(gce, batchOfCandidates);
+    }
+  }
 
-      if (candidates.isEmpty())
-        break;
-      else
-        lastCandidate = candidates.get(candidates.size() - 1);
+  /**
+   * Given a sub-list of possible deletion candidates, process and remove 
valid deletion candidates.
+   */
+  private void deleteBatch(GarbageCollectionEnvironment gce, List<String> 
currentBatch)
+      throws TableNotFoundException, IOException {
 
-      long origSize = candidates.size();
-      gce.incrementCandidatesStat(origSize);
+    long origSize = currentBatch.size();
+    gce.incrementCandidatesStat(origSize);
 
-      SortedMap<String,String> candidateMap = makeRelative(candidates);
+    SortedMap<String,String> candidateMap = makeRelative(currentBatch);
 
-      confirmDeletesTrace(gce, candidateMap);
-      gce.incrementInUseStat(origSize - candidateMap.size());
+    confirmDeletesTrace(gce, candidateMap);
+    gce.incrementInUseStat(origSize - candidateMap.size());
 
-      deleteConfirmed(gce, candidateMap);
-    }
+    deleteConfirmed(gce, candidateMap);
   }
+
 }
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
index ae8ffbb..add7c7f 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
@@ -40,19 +40,22 @@ import 
org.apache.accumulo.server.replication.proto.Replication.Status;
 public interface GarbageCollectionEnvironment {
 
   /**
-   * Return a list of paths to files and dirs which are candidates for 
deletion from a given table,
-   * {@link RootTable#NAME} or {@link MetadataTable#NAME}
+   * Return an iterator which points to a list of paths to files and dirs 
which are candidates for
+   * deletion from a given table, {@link RootTable#NAME} or {@link 
MetadataTable#NAME}
+   *
+   * @return an iterator referencing a List containing deletion candidates
+   */
+  Iterator<String> getCandidates() throws TableNotFoundException;
+
+  /**
+   * Given an iterator to a deletion candidate list, return a sub-list of 
candidates which fit
+   * within provided memory constraints.
    *
-   * @param continuePoint
-   *          A row to resume from if a previous invocation was stopped due to 
finding an extremely
-   *          large number of candidates to remove which would have exceeded 
memory limitations
-   * @param candidates
-   *          A collection of candidates files for deletion, may not be the 
complete collection of
-   *          files for deletion at this point in time
-   * @return true if the results are short due to insufficient memory, 
otherwise false
+   * @param candidatesIter
+   *          iterator referencing a List of possible deletion candidates
+   * @return a List of possible deletion candidates
    */
-  boolean getCandidates(String continuePoint, List<String> candidates)
-      throws TableNotFoundException;
+  List<String> readCandidatesThatFitInMemory(Iterator<String> candidatesIter);
 
   /**
    * Fetch a list of paths for all bulk loads in progress (blip) from a given 
table,
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 2223294..10bed91 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -209,29 +209,29 @@ public class SimpleGarbageCollector extends 
AbstractServer implements Iface {
     }
 
     @Override
-    public boolean getCandidates(String continuePoint, List<String> result)
-        throws TableNotFoundException {
+    public Iterator<String> getCandidates() throws TableNotFoundException {
+      return getContext().getAmple().getGcCandidates(level);
+    }
 
-      Iterator<String> candidates = 
getContext().getAmple().getGcCandidates(level, continuePoint);
+    @Override
+    public List<String> readCandidatesThatFitInMemory(Iterator<String> 
candidates) {
       long candidateLength = 0;
       // Converting the bytes to approximate number of characters for batch 
size.
       long candidateBatchSize = getCandidateBatchSize() / 2;
 
-      result.clear();
+      List<String> candidatesBatch = new ArrayList<>();
 
       while (candidates.hasNext()) {
         String candidate = candidates.next();
         candidateLength += candidate.length();
-        result.add(candidate);
+        candidatesBatch.add(candidate);
         if (candidateLength > candidateBatchSize) {
-          log.info(
-              "Candidate batch of size {} has exceeded the"
-                  + " threshold. Attempting to delete what has been gathered 
so far.",
-              candidateLength);
-          return true;
+          log.info("Candidate batch of size {} has exceeded the threshold. 
Attempting to delete "
+              + "what has been gathered so far.", candidateLength);
+          return candidatesBatch;
         }
       }
-      return false;
+      return candidatesBatch;
     }
 
     @Override
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
index cd557ae..c6736ed 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
@@ -34,12 +34,18 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.stream.Stream;
 
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class GarbageCollectionTest {
+
+  private static final Logger log = 
LoggerFactory.getLogger(GarbageCollectionTest.class);
+
   static class TestGCE implements GarbageCollectionEnvironment {
     TreeSet<String> candidates = new TreeSet<>();
     ArrayList<String> blips = new ArrayList<>();
@@ -51,13 +57,17 @@ public class GarbageCollectionTest {
     TreeMap<String,Status> filesToReplicate = new TreeMap<>();
 
     @Override
-    public boolean getCandidates(String continuePoint, List<String> ret) {
-      Iterator<String> iter = candidates.tailSet(continuePoint, 
false).iterator();
-      while (iter.hasNext() && ret.size() < 3) {
-        ret.add(iter.next());
-      }
+    public Iterator<String> getCandidates() throws TableNotFoundException {
+      return List.copyOf(candidates).iterator();
+    }
 
-      return ret.size() == 3;
+    @Override
+    public List<String> readCandidatesThatFitInMemory(Iterator<String> 
candidatesIter) {
+      List<String> candidatesBatch = new ArrayList<>();
+      while (candidatesIter.hasNext() && candidatesBatch.size() < 3) {
+        candidatesBatch.add(candidatesIter.next());
+      }
+      return candidatesBatch;
     }
 
     @Override
@@ -123,6 +133,28 @@ public class GarbageCollectionTest {
     assertEquals(0, gce.deletes.size());
   }
 
+  // This test was created to help track down a 
ConcurrentModificationException error that was
+  // occurring with the unit tests once the GC was refactored to use a single 
iterator for the
+  // collect process. This was a minimal test case that would cause the 
exception to occur.
+  @Test
+  public void minimalDelete() throws Exception {
+    TestGCE gce = new TestGCE();
+
+    gce.candidates.add("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t0/F006.rf");
+
+    gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
+    gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+    gce.addFileReference("6", null, 
"hdfs://foo.com:6000/accumulo/tables/6/t0/F006.rf");
+
+    GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
+    gca.collect(gce);
+
+    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+  }
+
   @Test
   public void testBasic() throws Exception {
     TestGCE gce = new TestGCE();
@@ -166,6 +198,101 @@ public class GarbageCollectionTest {
 
   }
 
+  /*
+   * Additional test with more candidates. Also, not a multiple of 3 as the 
test above. Since the
+   * unit tests always return 3 candidates in a batch, some edge cases could 
be missed if that was
+   * always the case.
+   */
+  @Test
+  public void testBasic2() throws Exception {
+    TestGCE gce = new TestGCE();
+
+    gce.candidates.add("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+
+    gce.candidates.add("hdfs://foo:6000/accumulo/tables/5/t0/F000.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F001.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t1/F005.rf");
+
+    gce.candidates.add("hdfs://foo:6000/accumulo/tables/6/t0/F000.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t0/F001.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/7/t0/F005.rf");
+
+    gce.candidates.add("hdfs://foo:6000/accumulo/tables/7/t0/F000.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/7/t0/F001.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/8/t0/F005.rf");
+
+    gce.candidates.add("hdfs://foo:6000/accumulo/tables/8/t0/F000.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/8/t0/F001.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/9/t0/F005.rf");
+
+    gce.candidates.add("hdfs://foo:6000/accumulo/tables/9/t0/F000.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/9/t0/F001.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/10/t0/F005.rf");
+
+    gce.candidates.add("hdfs://foo:6000/accumulo/tables/10/t0/F000.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/10/t0/F001.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/11/t0/F005.rf");
+
+    gce.candidates.add("hdfs://foo:6000/accumulo/tables/11/t0/F000.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/11/t0/F001.rf");
+
+    gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
+    gce.addFileReference("4", null, 
"hdfs://foo:6000/accumulo/tables/4/t0/F001.rf");
+    gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0//F002.rf");
+    gce.addFileReference("5", null, 
"hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+
+    GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
+
+    gca.collect(gce);
+    // items to be removed from candidates
+    String[] toBeRemoved = {"hdfs://foo.com:6000/accumulo/tables/5/t0/F001.rf",
+        "hdfs://foo:6000/accumulo/tables/5/t0/F000.rf",
+        "hdfs://foo.com:6000/accumulo/tables/6/t1/F005.rf",
+        "hdfs://foo:6000/accumulo/tables/6/t0/F000.rf",
+        "hdfs://foo.com:6000/accumulo/tables/6/t0/F001.rf",
+        "hdfs://foo.com:6000/accumulo/tables/7/t0/F005.rf",
+        "hdfs://foo:6000/accumulo/tables/7/t0/F000.rf",
+        "hdfs://foo.com:6000/accumulo/tables/7/t0/F001.rf",
+        "hdfs://foo.com:6000/accumulo/tables/8/t0/F005.rf",
+        "hdfs://foo:6000/accumulo/tables/8/t0/F000.rf",
+        "hdfs://foo.com:6000/accumulo/tables/8/t0/F001.rf",
+        "hdfs://foo.com:6000/accumulo/tables/9/t0/F005.rf",
+        "hdfs://foo:6000/accumulo/tables/9/t0/F000.rf",
+        "hdfs://foo.com:6000/accumulo/tables/9/t0/F001.rf",
+        "hdfs://foo.com:6000/accumulo/tables/10/t0/F005.rf",
+        "hdfs://foo:6000/accumulo/tables/10/t0/F000.rf",
+        "hdfs://foo.com:6000/accumulo/tables/10/t0/F001.rf",
+        "hdfs://foo.com:6000/accumulo/tables/11/t0/F005.rf",
+        "hdfs://foo:6000/accumulo/tables/11/t0/F000.rf",
+        "hdfs://foo.com:6000/accumulo/tables/11/t0/F001.rf"};
+    assertRemoved(gce, toBeRemoved);
+
+    // Remove the reference to this flush file, run the GC which should not 
trim it from the
+    // candidates, and assert that it's gone
+    gce.removeFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
+    gca.collect(gce);
+    assertRemoved(gce, "hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+
+    // Removing a reference to a file that wasn't in the candidates should do 
nothing
+    gce.removeFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F002.rf");
+    gca.collect(gce);
+    assertRemoved(gce);
+
+    // Remove the reference to a file in the candidates should cause it to be 
removed
+    gce.removeFileReference("4", null, 
"hdfs://foo:6000/accumulo/tables/4/t0/F001.rf");
+    gca.collect(gce);
+    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+
+    // Adding more candidates which do no have references should be removed
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf");
+    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf");
+    gca.collect(gce);
+    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf",
+        "hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf");
+  }
+
   @Test
   public void testRelative() throws Exception {
     TestGCE gce = new TestGCE();
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index f0ac9c1..6452f20 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -90,6 +90,9 @@ public class GarbageCollectorIT extends ConfigurableMacBase {
     cfg.setProperty(Property.GC_PORT, "0");
     cfg.setProperty(Property.TSERV_MAXMEM, "5K");
     cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+    // reduce the batch size significantly in order to cause the integration 
tests to have
+    // to process many batches of deletion candidates.
+    cfg.setProperty(Property.GC_CANDIDATE_BATCH_SIZE, "256K");
 
     // use raw local file system so walogs sync and flush will work
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());

Reply via email to