ctubbsii commented on code in PR #4790:
URL: https://github.com/apache/accumulo/pull/4790#discussion_r1717946477


##########
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java:
##########
@@ -86,34 +89,32 @@ public class GarbageCollectWriteAheadLogs {
     this.fs = fs;
     this.useTrash = useTrash;
     this.liveServers = liveServers;
-    this.walMarker = new WalStateManager(context);
-    this.store = () -> Iterators.concat(
-        TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).iterator(),
-        TabletStateStore.getStoreForLevel(DataLevel.METADATA, 
context).iterator(),
-        TabletStateStore.getStoreForLevel(DataLevel.USER, context).iterator());
+    this.walMarker = createWalStateManager(context);
+    this.hasCollected = new AtomicBoolean(false);
   }
 
-  /**
-   * Creates a new GC WAL object. Meant for testing -- allows mocked objects.
-   *
-   * @param context the collection server's context
-   * @param fs volume manager to use
-   * @param useTrash true to move files to trash rather than delete them
-   * @param liveTServerSet a started LiveTServerSet instance
-   */
   @VisibleForTesting
-  GarbageCollectWriteAheadLogs(ServerContext context, VolumeManager fs, 
boolean useTrash,
-      LiveTServerSet liveTServerSet, WalStateManager walMarker,
-      Iterable<TabletLocationState> store) {
-    this.context = context;
-    this.fs = fs;
-    this.useTrash = useTrash;
-    this.liveServers = liveTServerSet;
-    this.walMarker = walMarker;
-    this.store = store;
+  WalStateManager createWalStateManager(ServerContext context) {
+    return new WalStateManager(context);
+  }
+
+  @VisibleForTesting
+  Stream<TabletLocationState> createStore() {
+    Stream<TabletLocationState> rootStream =
+        TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).stream();
+    Stream<TabletLocationState> metadataStream =
+        TabletStateStore.getStoreForLevel(DataLevel.METADATA, 
context).stream();
+    Stream<TabletLocationState> userStream =
+        TabletStateStore.getStoreForLevel(DataLevel.USER, context).stream();
+    return Streams.concat(rootStream, metadataStream, userStream).onClose(() 
-> {
+      rootStream.close();
+      metadataStream.close();
+      userStream.close();

Review Comment:
   If these throw a RTE, you probably want to continue trying to close the 
others. I don't know a clean way to express that... maybe something like:
   
   ```suggestion
         try {
           rootStream.close();
         } finally {
           try {
             metadataStream.close();
           } finally {
             userStream.close();
           }
         }
   ```



##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java:
##########
@@ -52,6 +61,22 @@ public interface TabletStateStore extends 
Iterable<TabletLocationState> {
   @Override
   ClosableIterator<TabletLocationState> iterator();
 
+  /**
+   * Create a stream of TabletLocationState that automatically closes the 
underlying iterator.
+   */
+  default Stream<TabletLocationState> stream() {
+    ClosableIterator<TabletLocationState> iterator = this.iterator();
+    return StreamSupport
+        .stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+        .onClose(() -> {
+          try {
+            iterator.close();
+          } catch (IOException e) {
+            log.warn("Error closing iterator", e);

Review Comment:
   Probably best to let it out as an unchecked exception
   
   ```suggestion
               throw new UncheckedIOException(e);
   ```



##########
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java:
##########
@@ -86,34 +89,32 @@ public class GarbageCollectWriteAheadLogs {
     this.fs = fs;
     this.useTrash = useTrash;
     this.liveServers = liveServers;
-    this.walMarker = new WalStateManager(context);
-    this.store = () -> Iterators.concat(
-        TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).iterator(),
-        TabletStateStore.getStoreForLevel(DataLevel.METADATA, 
context).iterator(),
-        TabletStateStore.getStoreForLevel(DataLevel.USER, context).iterator());
+    this.walMarker = createWalStateManager(context);
+    this.hasCollected = new AtomicBoolean(false);

Review Comment:
   You can also inline this initialization of the atomic boolean into the 
declaration rather than in the constructor.



##########
server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java:
##########
@@ -109,19 +110,29 @@ public void testRemoveUnusedLog() throws Exception {
     marker.removeWalMarker(server1, id);
     EasyMock.expectLastCall().once();
     EasyMock.replay(context, fs, marker, tserverSet);
-    GarbageCollectWriteAheadLogs gc = new 
GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer1List) {
-      @Override
-      @Deprecated
-      protected int removeReplicationEntries(Map<UUID,TServerInstance> 
candidates) {
-        return 0;
-      }
-
-      @Override
-      protected Map<UUID,Path> getSortedWALogs() {
-        return Collections.emptyMap();
-      }
-    };
+    GarbageCollectWriteAheadLogs gc =
+        new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false) {

Review Comment:
   ```suggestion
       var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, 
false) {
   ```



##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java:
##########
@@ -30,12 +35,16 @@
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Interface for storing information about tablet assignments. There are three 
implementations:
  */
 public interface TabletStateStore extends Iterable<TabletLocationState> {
 
+  Logger log = LoggerFactory.getLogger(TabletStateStore.class);

Review Comment:
   If this is kept...
   
   ```suggestion
     private static final Logger log = 
LoggerFactory.getLogger(TabletStateStore.class);
   ```



##########
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java:
##########
@@ -216,7 +217,7 @@ public void collect(GCStatus status) {
       } finally {
         span5.end();
       }
-
+      hasCollected.set(true);

Review Comment:
   You could also wrap this in an:
   
   ```java
     if (hasCollected.compareAndSet(false, true)) {
       // stuff in here can only be executed once (so long as it never flips 
back to false)
       // could use the return value in a Precondition or other check if you 
really want an error message instead
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to