ctubbsii commented on code in PR #4790:
URL: https://github.com/apache/accumulo/pull/4790#discussion_r1717946477
##########
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java:
##########
@@ -86,34 +89,32 @@ public class GarbageCollectWriteAheadLogs {
this.fs = fs;
this.useTrash = useTrash;
this.liveServers = liveServers;
- this.walMarker = new WalStateManager(context);
- this.store = () -> Iterators.concat(
- TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).iterator(),
- TabletStateStore.getStoreForLevel(DataLevel.METADATA,
context).iterator(),
- TabletStateStore.getStoreForLevel(DataLevel.USER, context).iterator());
+ this.walMarker = createWalStateManager(context);
+ this.hasCollected = new AtomicBoolean(false);
}
- /**
- * Creates a new GC WAL object. Meant for testing -- allows mocked objects.
- *
- * @param context the collection server's context
- * @param fs volume manager to use
- * @param useTrash true to move files to trash rather than delete them
- * @param liveTServerSet a started LiveTServerSet instance
- */
@VisibleForTesting
- GarbageCollectWriteAheadLogs(ServerContext context, VolumeManager fs,
boolean useTrash,
- LiveTServerSet liveTServerSet, WalStateManager walMarker,
- Iterable<TabletLocationState> store) {
- this.context = context;
- this.fs = fs;
- this.useTrash = useTrash;
- this.liveServers = liveTServerSet;
- this.walMarker = walMarker;
- this.store = store;
+ WalStateManager createWalStateManager(ServerContext context) {
+ return new WalStateManager(context);
+ }
+
+ @VisibleForTesting
+ Stream<TabletLocationState> createStore() {
+ Stream<TabletLocationState> rootStream =
+ TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).stream();
+ Stream<TabletLocationState> metadataStream =
+ TabletStateStore.getStoreForLevel(DataLevel.METADATA,
context).stream();
+ Stream<TabletLocationState> userStream =
+ TabletStateStore.getStoreForLevel(DataLevel.USER, context).stream();
+ return Streams.concat(rootStream, metadataStream, userStream).onClose(()
-> {
+ rootStream.close();
+ metadataStream.close();
+ userStream.close();
Review Comment:
If these throw a RTE, you probably want to continue trying to close the
others. I don't know a clean way to express that... maybe something like:
```suggestion
try {
rootStream.close();
} finally {
try {
metadataStream.close();
} finally {
userStream.close();
}
}
```
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java:
##########
@@ -52,6 +61,22 @@ public interface TabletStateStore extends
Iterable<TabletLocationState> {
@Override
ClosableIterator<TabletLocationState> iterator();
+ /**
+ * Create a stream of TabletLocationState that automatically closes the
underlying iterator.
+ */
+ default Stream<TabletLocationState> stream() {
+ ClosableIterator<TabletLocationState> iterator = this.iterator();
+ return StreamSupport
+ .stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .onClose(() -> {
+ try {
+ iterator.close();
+ } catch (IOException e) {
+ log.warn("Error closing iterator", e);
Review Comment:
Probably best to let it out as an unchecked exception
```suggestion
throw new UncheckedIOException(e);
```
##########
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java:
##########
@@ -86,34 +89,32 @@ public class GarbageCollectWriteAheadLogs {
this.fs = fs;
this.useTrash = useTrash;
this.liveServers = liveServers;
- this.walMarker = new WalStateManager(context);
- this.store = () -> Iterators.concat(
- TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).iterator(),
- TabletStateStore.getStoreForLevel(DataLevel.METADATA,
context).iterator(),
- TabletStateStore.getStoreForLevel(DataLevel.USER, context).iterator());
+ this.walMarker = createWalStateManager(context);
+ this.hasCollected = new AtomicBoolean(false);
Review Comment:
You can also inline this initialization of the atomic boolean into the
declaration rather than in the constructor.
##########
server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java:
##########
@@ -109,19 +110,29 @@ public void testRemoveUnusedLog() throws Exception {
marker.removeWalMarker(server1, id);
EasyMock.expectLastCall().once();
EasyMock.replay(context, fs, marker, tserverSet);
- GarbageCollectWriteAheadLogs gc = new
GarbageCollectWriteAheadLogs(context, fs, false,
- tserverSet, marker, tabletOnServer1List) {
- @Override
- @Deprecated
- protected int removeReplicationEntries(Map<UUID,TServerInstance>
candidates) {
- return 0;
- }
-
- @Override
- protected Map<UUID,Path> getSortedWALogs() {
- return Collections.emptyMap();
- }
- };
+ GarbageCollectWriteAheadLogs gc =
+ new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false) {
Review Comment:
```suggestion
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet,
false) {
```
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java:
##########
@@ -30,12 +35,16 @@
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Interface for storing information about tablet assignments. There are three
implementations:
*/
public interface TabletStateStore extends Iterable<TabletLocationState> {
+ Logger log = LoggerFactory.getLogger(TabletStateStore.class);
Review Comment:
If this is kept...
```suggestion
private static final Logger log =
LoggerFactory.getLogger(TabletStateStore.class);
```
##########
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java:
##########
@@ -216,7 +217,7 @@ public void collect(GCStatus status) {
} finally {
span5.end();
}
-
+ hasCollected.set(true);
Review Comment:
You could also wrap this in an:
```java
if (hasCollected.compareAndSet(false, true)) {
// stuff in here can only be executed once (so long as it never flips
back to false)
// could use the return value in a Precondition or other check if you
really want an error message instead
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]