This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new f26c702a87 Clean up usage of TabletsMetadata (#3980) f26c702a87 is described below commit f26c702a87df996b6ccf21c3cef2bb5fd5e751e6 Author: Dom G <domgargu...@apache.org> AuthorDate: Wed Jan 10 16:16:36 2024 -0500 Clean up usage of TabletsMetadata (#3980) * Clean up usage of TabletsMetadata. Close resources and refactoring improvements. --- .../core/clientImpl/TableOperationsImpl.java | 62 ++++++++-------- .../clientImpl/bulk/ConcurrentKeyExtentCache.java | 17 ++--- .../org/apache/accumulo/core/summary/Gatherer.java | 22 +++--- .../java/org/apache/accumulo/core/util/Merge.java | 62 ++++++++-------- .../org/apache/accumulo/core/util/MergeTest.java | 31 +++++++- .../manager/balancer/BalancerEnvironmentImpl.java | 12 ++-- .../accumulo/server/metadata/ServerAmpleImpl.java | 2 +- .../main/java/org/apache/accumulo/gc/GCRun.java | 4 +- .../accumulo/gc/GarbageCollectionAlgorithm.java | 84 +++++++++++----------- .../manager/tableOps/bulkVer2/LoadFiles.java | 23 +++--- .../manager/tableOps/bulkVer2/PrepBulkImport.java | 42 ++++++++--- .../manager/tableOps/compact/CompactionDriver.java | 21 +++--- .../tableOps/bulkVer2/PrepBulkImportTest.java | 31 +++++--- .../org/apache/accumulo/tserver/ScanServer.java | 10 ++- .../java/org/apache/accumulo/test/AmpleIT.java | 16 +++-- .../java/org/apache/accumulo/test/GCRunIT.java | 13 ++-- .../accumulo/test/ScanServerMetadataEntriesIT.java | 6 +- .../test/compaction/ExternalCompaction4_IT.java | 29 +++++--- .../test/compaction/ExternalCompaction_1_IT.java | 26 ++++--- .../test/compaction/ExternalCompaction_2_IT.java | 8 +-- .../accumulo/test/functional/CompactionIT.java | 29 +++++--- .../test/functional/GarbageCollectorTrashBase.java | 17 ++--- .../accumulo/test/functional/MetadataIT.java | 15 ++-- 23 files changed, 338 insertions(+), 244 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index e9e784b23e..3b7bf91612 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -666,9 +666,9 @@ public class TableOperationsImpl extends TableOperationsHelper { TableId tableId = context.getTableId(tableName); while (true) { - try { - return context.getAmple().readTablets().forTable(tableId).fetch(PREV_ROW).checkConsistency() - .build().stream().map(tm -> tm.getExtent().endRow()).filter(Objects::nonNull) + try (TabletsMetadata tabletsMetadata = context.getAmple().readTablets().forTable(tableId) + .fetch(PREV_ROW).checkConsistency().build()) { + return tabletsMetadata.stream().map(tm -> tm.getExtent().endRow()).filter(Objects::nonNull) .collect(Collectors.toList()); } catch (TabletDeletedException tde) { // see if the table was deleted @@ -1320,9 +1320,6 @@ public class TableOperationsImpl extends TableOperationsHelper { range = new Range(startRow, lastRow); } - TabletsMetadata tablets = TabletsMetadata.builder(context).scanMetadataTable() - .overRange(range).fetch(LOCATION, PREV_ROW).build(); - KeyExtent lastExtent = null; int total = 0; @@ -1331,34 +1328,38 @@ public class TableOperationsImpl extends TableOperationsHelper { Text continueRow = null; MapCounter<String> serverCounts = new MapCounter<>(); - for (TabletMetadata tablet : tablets) { - total++; - Location loc = tablet.getLocation(); + try (TabletsMetadata tablets = TabletsMetadata.builder(context).scanMetadataTable() + .overRange(range).fetch(LOCATION, PREV_ROW).build()) { + + for (TabletMetadata tablet : tablets) { + total++; + Location loc = tablet.getLocation(); - if ((expectedState == TableState.ONLINE - && (loc == null || loc.getType() == LocationType.FUTURE)) - || (expectedState == TableState.OFFLINE && loc != null)) { - if (continueRow == null) { - continueRow = tablet.getExtent().toMetaRow(); + if ((expectedState == TableState.ONLINE + && (loc == null || loc.getType() == LocationType.FUTURE)) + || (expectedState == TableState.OFFLINE && loc != null)) { + if (continueRow == null) { + continueRow = tablet.getExtent().toMetaRow(); + } + waitFor++; + lastRow = tablet.getExtent().toMetaRow(); + + if (loc != null) { + serverCounts.increment(loc.getHostPortSession(), 1); + } } - waitFor++; - lastRow = tablet.getExtent().toMetaRow(); - if (loc != null) { - serverCounts.increment(loc.getHostPortSession(), 1); + if (!tablet.getExtent().tableId().equals(tableId)) { + throw new AccumuloException( + "Saw unexpected table Id " + tableId + " " + tablet.getExtent()); } - } - if (!tablet.getExtent().tableId().equals(tableId)) { - throw new AccumuloException( - "Saw unexpected table Id " + tableId + " " + tablet.getExtent()); - } + if (lastExtent != null && !tablet.getExtent().isPreviousExtent(lastExtent)) { + holes++; + } - if (lastExtent != null && !tablet.getExtent().isPreviousExtent(lastExtent)) { - holes++; + lastExtent = tablet.getExtent(); } - - lastExtent = tablet.getExtent(); } if (continueRow != null) { @@ -2059,8 +2060,11 @@ public class TableOperationsImpl extends TableOperationsHelper { @Override public TimeType getTimeType(final String tableName) throws TableNotFoundException { TableId tableId = context.getTableId(tableName); - Optional<TabletMetadata> tabletMetadata = context.getAmple().readTablets().forTable(tableId) - .fetch(TabletMetadata.ColumnType.TIME).checkConsistency().build().stream().findFirst(); + Optional<TabletMetadata> tabletMetadata; + try (TabletsMetadata tabletsMetadata = context.getAmple().readTablets().forTable(tableId) + .fetch(TabletMetadata.ColumnType.TIME).checkConsistency().build()) { + tabletMetadata = tabletsMetadata.stream().findFirst(); + } TabletMetadata timeData = tabletMetadata.orElseThrow(() -> new IllegalStateException("Failed to retrieve TimeType")); return timeData.getTime().getType(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java index f17639a474..acd5924ff9 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java @@ -23,7 +23,6 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.Objects; @@ -88,8 +87,9 @@ class ConcurrentKeyExtentCache implements KeyExtentCache { @VisibleForTesting protected Stream<KeyExtent> lookupExtents(Text row) { - return TabletsMetadata.builder(ctx).forTable(tableId).overlapping(row, true, null) - .checkConsistency().fetch(PREV_ROW).build().stream().limit(100) + TabletsMetadata tabletsMetadata = TabletsMetadata.builder(ctx).forTable(tableId) + .overlapping(row, true, null).checkConsistency().fetch(PREV_ROW).build(); + return tabletsMetadata.stream().onClose(tabletsMetadata::close).limit(100) .map(TabletMetadata::getExtent); } @@ -129,15 +129,8 @@ class ConcurrentKeyExtentCache implements KeyExtentCache { for (Text lookupRow : lookupRows) { if (getFromCache(lookupRow) == null) { while (true) { - try { - Iterator<KeyExtent> iter = lookupExtents(lookupRow).iterator(); - while (iter.hasNext()) { - KeyExtent ke2 = iter.next(); - if (inCache(ke2)) { - break; - } - updateCache(ke2); - } + try (Stream<KeyExtent> keyExtentStream = lookupExtents(lookupRow)) { + keyExtentStream.takeWhile(ke2 -> !inCache(ke2)).forEach(this::updateCache); break; } catch (TabletDeletedException tde) { // tablets were merged away in the table, start over and try again diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java index 2bbcdbbabd..e9ee45d763 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java @@ -166,17 +166,17 @@ public class Gatherer { private Map<String,Map<StoredTabletFile,List<TRowRange>>> getFilesGroupedByLocation(Predicate<StoredTabletFile> fileSelector) { - Iterable<TabletMetadata> tmi = TabletsMetadata.builder(ctx).forTable(tableId) - .overlapping(startRow, endRow).fetch(FILES, LOCATION, LAST, PREV_ROW).build(); - // get a subset of files Map<StoredTabletFile,List<TabletMetadata>> files = new HashMap<>(); - for (TabletMetadata tm : tmi) { - for (StoredTabletFile file : tm.getFiles()) { - if (fileSelector.test(file)) { - // TODO push this filtering to server side and possibly use batch scanner - files.computeIfAbsent(file, s -> new ArrayList<>()).add(tm); + try (TabletsMetadata tmi = TabletsMetadata.builder(ctx).forTable(tableId) + .overlapping(startRow, endRow).fetch(FILES, LOCATION, LAST, PREV_ROW).build()) { + for (TabletMetadata tm : tmi) { + for (StoredTabletFile file : tm.getFiles()) { + if (fileSelector.test(file)) { + // TODO push this filtering to server side and possibly use batch scanner + files.computeIfAbsent(file, s -> new ArrayList<>()).add(tm); + } } } } @@ -447,8 +447,10 @@ public class Gatherer { private int countFiles() { // TODO use a batch scanner + iterator to parallelize counting files - return TabletsMetadata.builder(ctx).forTable(tableId).overlapping(startRow, endRow) - .fetch(FILES, PREV_ROW).build().stream().mapToInt(tm -> tm.getFiles().size()).sum(); + try (TabletsMetadata tabletsMetadata = TabletsMetadata.builder(ctx).forTable(tableId) + .overlapping(startRow, endRow).fetch(FILES, PREV_ROW).build()) { + return tabletsMetadata.stream().mapToInt(tm -> tm.getFiles().size()).sum(); + } } private class GatherRequest implements Supplier<SummaryCollection> { diff --git a/core/src/main/java/org/apache/accumulo/core/util/Merge.java b/core/src/main/java/org/apache/accumulo/core/util/Merge.java index 41f5a67943..2acfd6457a 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/Merge.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Merge.java @@ -149,19 +149,43 @@ public class Merge { } List<Size> sizes = new ArrayList<>(); long totalSize = 0; - // Merge any until you get larger than the goal size, and then merge one less tablet - Iterator<Size> sizeIterator = getSizeIterator(client, table, start, end); - while (sizeIterator.hasNext()) { - Size next = sizeIterator.next(); - totalSize += next.size; - sizes.add(next); - if (totalSize > goalSize) { - totalSize = mergeMany(client, table, sizes, goalSize, force, false); + + TableId tableId; + ClientContext context = (ClientContext) client; + try { + tableId = context.getTableId(table); + } catch (Exception e) { + throw new MergeException(e); + } + + try (TabletsMetadata tablets = TabletsMetadata.builder(context).scanMetadataTable() + .overRange(new KeyExtent(tableId, end, start).toMetaRange()).fetch(FILES, PREV_ROW) + .build()) { + + Iterator<Size> sizeIterator = tablets.stream().map(tm -> { + long size = tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum(); + return new Size(tm.getExtent(), size); + }).iterator(); + + while (sizeIterator.hasNext()) { + Size next = sizeIterator.next(); + totalSize += next.size; + sizes.add(next); + + if (totalSize > goalSize) { + mergeMany(client, table, sizes, goalSize, force, false); + sizes.clear(); + sizes.add(next); + totalSize = next.size; + } } } + + // merge one less tablet if (sizes.size() > 1) { mergeMany(client, table, sizes, goalSize, force, true); } + } catch (Exception ex) { throw new MergeException(ex); } @@ -239,26 +263,4 @@ public class Merge { } } - protected Iterator<Size> getSizeIterator(AccumuloClient client, String tablename, Text start, - Text end) throws MergeException { - // open up metadata, walk through the tablets. - - TableId tableId; - TabletsMetadata tablets; - try { - ClientContext context = (ClientContext) client; - tableId = context.getTableId(tablename); - tablets = TabletsMetadata.builder(context).scanMetadataTable() - .overRange(new KeyExtent(tableId, end, start).toMetaRange()).fetch(FILES, PREV_ROW) - .build(); - } catch (Exception e) { - throw new MergeException(e); - } - - return tablets.stream().map(tm -> { - long size = tm.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum(); - return new Size(tm.getExtent(), size); - }).iterator(); - } - } diff --git a/core/src/test/java/org/apache/accumulo/core/util/MergeTest.java b/core/src/test/java/org/apache/accumulo/core/util/MergeTest.java index 6d253fe0ae..a37a402665 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/MergeTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/MergeTest.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.util.Merge.Size; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@ -57,8 +58,34 @@ public class MergeTest { protected void message(String format, Object... args) {} @Override - protected Iterator<Size> getSizeIterator(AccumuloClient client, String tablename, - final Text start, final Text end) throws MergeException { + public void mergomatic(AccumuloClient client, String table, Text start, Text end, long goalSize, + boolean force) throws MergeException { + if (table.equals(MetadataTable.NAME)) { + throw new IllegalArgumentException("cannot merge tablets on the metadata table"); + } + + List<Size> sizes = new ArrayList<>(); + long totalSize = 0; + + Iterator<Size> sizeIterator = getSizeIterator(start, end); + + while (sizeIterator.hasNext()) { + Size next = sizeIterator.next(); + totalSize += next.size; + sizes.add(next); + if (totalSize > goalSize) { + mergeMany(client, table, sizes, goalSize, force, false); + sizes.clear(); + sizes.add(next); + totalSize = next.size; + } + } + if (sizes.size() > 1) { + mergeMany(client, table, sizes, goalSize, force, true); + } + } + + protected Iterator<Size> getSizeIterator(final Text start, final Text end) { final Iterator<Size> impl = tablets.iterator(); return new Iterator<>() { Size next = skip(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java b/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java index a2a1a1b1ba..3018ae1910 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java @@ -75,11 +75,13 @@ public class BalancerEnvironmentImpl extends ServiceEnvironmentImpl implements B @Override public Map<TabletId,TabletServerId> listTabletLocations(TableId tableId) { Map<TabletId,TabletServerId> tablets = new LinkedHashMap<>(); - for (var tm : TabletsMetadata.builder(getContext()).forTable(tableId).fetch(LOCATION, PREV_ROW) - .build()) { - tablets.put(new TabletIdImpl(tm.getExtent()), - TabletServerIdImpl.fromThrift(Optional.ofNullable(tm.getLocation()) - .map(TabletMetadata.Location::getServerInstance).orElse(null))); + try (TabletsMetadata tabletsMetadata = + TabletsMetadata.builder(getContext()).forTable(tableId).fetch(LOCATION, PREV_ROW).build()) { + for (var tm : tabletsMetadata) { + tablets.put(new TabletIdImpl(tm.getExtent()), + TabletServerIdImpl.fromThrift(Optional.ofNullable(tm.getLocation()) + .map(TabletMetadata.Location::getServerInstance).orElse(null))); + } } return tablets; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index 7aa6ecb6e1..fd6b55727e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -364,7 +364,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { Scanner scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY); scanner.setRange(ScanServerFileReferenceSection.getRange()); int pLen = ScanServerFileReferenceSection.getRowPrefix().length(); - return StreamSupport.stream(scanner.spliterator(), false) + return scanner.stream().onClose(scanner::close) .map(e -> new ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen), e.getKey().getColumnFamily(), e.getKey().getColumnQualifier())); } catch (TableNotFoundException e) { diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java index db09b7ac7a..798bb6e535 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java @@ -175,9 +175,9 @@ public class GCRun implements GarbageCollectionEnvironment { if (level == Ample.DataLevel.ROOT) { tabletStream = Stream.of(context.getAmple().readTablet(RootTable.EXTENT, DIR, FILES, SCANS)); } else { - var tabletsMetadata = TabletsMetadata.builder(context).scanTable(level.metaTable()) + TabletsMetadata tm = TabletsMetadata.builder(context).scanTable(level.metaTable()) .checkConsistency().fetch(DIR, FILES, SCANS).build(); - tabletStream = tabletsMetadata.stream(); + tabletStream = tm.stream().onClose(tm::close); } // there is a lot going on in this "one line" so see below for more info diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java index 48360b3ef3..ab9bff706e 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java @@ -143,55 +143,57 @@ public class GarbageCollectionAlgorithm { List<GcCandidate> candidateEntriesToBeDeleted = new ArrayList<>(); Set<TableId> tableIdsBefore = gce.getCandidateTableIDs(); Set<TableId> tableIdsSeen = new HashSet<>(); - Iterator<Reference> iter = gce.getReferences().iterator(); - while (iter.hasNext()) { - Reference ref = iter.next(); - tableIdsSeen.add(ref.getTableId()); + try (Stream<Reference> references = gce.getReferences()) { + references.forEach(ref -> { + tableIdsSeen.add(ref.getTableId()); - if (ref.isDirectory()) { - var dirReference = (ReferenceDirectory) ref; - ServerColumnFamily.validateDirCol(dirReference.getTabletDir()); + if (ref.isDirectory()) { + var dirReference = (ReferenceDirectory) ref; + ServerColumnFamily.validateDirCol(dirReference.getTabletDir()); - String dir = "/" + dirReference.tableId + "/" + dirReference.getTabletDir(); + String dir = "/" + dirReference.tableId + "/" + dirReference.getTabletDir(); - dir = makeRelative(dir, 2); + dir = makeRelative(dir, 2); - GcCandidate gcTemp = candidateMap.remove(dir); - if (gcTemp != null) { - log.debug("Directory Candidate was still in use by dir ref: {}", dir); - // Do not add dir candidates to candidateEntriesToBeDeleted as they are only created once. - } - } else { - String reference = ref.getMetadataPath(); - if (reference.startsWith("/")) { - log.debug("Candidate {} has a relative path, prepend tableId {}", reference, - ref.getTableId()); - reference = "/" + ref.getTableId() + ref.getMetadataPath(); - } else if (!reference.contains(":") && !reference.startsWith("../")) { - throw new RuntimeException("Bad file reference " + reference); - } + GcCandidate gcTemp = candidateMap.remove(dir); + if (gcTemp != null) { + log.debug("Directory Candidate was still in use by dir ref: {}", dir); + // Do not add dir candidates to candidateEntriesToBeDeleted as they are only created + // once. + } + } else { + String reference = ref.getMetadataPath(); + if (reference.startsWith("/")) { + log.debug("Candidate {} has a relative path, prepend tableId {}", reference, + ref.getTableId()); + reference = "/" + ref.getTableId() + ref.getMetadataPath(); + } else if (!reference.contains(":") && !reference.startsWith("../")) { + throw new RuntimeException("Bad file reference " + reference); + } - String relativePath = makeRelative(reference, 3); - - // WARNING: This line is EXTREMELY IMPORTANT. - // You MUST REMOVE candidates that are still in use - GcCandidate gcTemp = candidateMap.remove(relativePath); - if (gcTemp != null) { - log.debug("File Candidate was still in use: {}", relativePath); - // Prevent deletion of candidates that are still in use by scans, because they won't be - // recreated once the scan is finished. - if (!ref.isScan()) { - candidateEntriesToBeDeleted.add(gcTemp); + String relativePath = makeRelative(reference, 3); + + // WARNING: This line is EXTREMELY IMPORTANT. + // You MUST REMOVE candidates that are still in use + GcCandidate gcTemp = candidateMap.remove(relativePath); + if (gcTemp != null) { + log.debug("File Candidate was still in use: {}", relativePath); + // Prevent deletion of candidates that are still in use by scans, because they won't be + // recreated once the scan is finished. + if (!ref.isScan()) { + candidateEntriesToBeDeleted.add(gcTemp); + } } - } - String dir = relativePath.substring(0, relativePath.lastIndexOf('/')); - GcCandidate gcT = candidateMap.remove(dir); - if (gcT != null) { - log.debug("Directory Candidate was still in use by file ref: {}", relativePath); - // Do not add dir candidates to candidateEntriesToBeDeleted as they are only created once. + String dir = relativePath.substring(0, relativePath.lastIndexOf('/')); + GcCandidate gcT = candidateMap.remove(dir); + if (gcT != null) { + log.debug("Directory Candidate was still in use by file ref: {}", relativePath); + // Do not add dir candidates to candidateEntriesToBeDeleted as they are only created + // once. + } } - } + }); } Set<TableId> tableIdsAfter = gce.getCandidateTableIDs(); ensureAllTablesChecked(Collections.unmodifiableSet(tableIdsBefore), diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index a5234819dc..02924116ec 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -328,24 +328,25 @@ class LoadFiles extends ManagerRepo { Text startRow = loadMapEntry.getKey().prevEndRow(); - Iterator<TabletMetadata> tabletIter = - TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow, null) - .checkConsistency().fetch(PREV_ROW, LOCATION, LOADED).build().iterator(); - Loader loader; if (bulkInfo.tableState == TableState.ONLINE) { loader = new OnlineLoader(); } else { loader = new OfflineLoader(); } - + long t1; loader.start(bulkDir, manager, tid, bulkInfo.setTime); - - long t1 = System.currentTimeMillis(); - while (lmi.hasNext()) { - loadMapEntry = lmi.next(); - List<TabletMetadata> tablets = findOverlappingTablets(loadMapEntry.getKey(), tabletIter); - loader.load(tablets, loadMapEntry.getValue()); + try (TabletsMetadata tabletsMetadata = + TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow, null) + .checkConsistency().fetch(PREV_ROW, LOCATION, LOADED).build()) { + + t1 = System.currentTimeMillis(); + while (lmi.hasNext()) { + loadMapEntry = lmi.next(); + List<TabletMetadata> tablets = + findOverlappingTablets(loadMapEntry.getKey(), tabletsMetadata.iterator()); + loader.load(tablets, loadMapEntry.getValue()); + } } long sleepTime = loader.finish(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java index 2b8788f887..e66796c54d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java @@ -22,6 +22,7 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -102,7 +103,7 @@ public class PrepBulkImport extends ManagerRepo { } @VisibleForTesting - interface TabletIterFactory { + interface TabletIterFactory extends AutoCloseable { Iterator<KeyExtent> newTabletIter(Text startRow); } @@ -194,6 +195,33 @@ public class PrepBulkImport extends ManagerRepo { return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), firstTablet.prevEndRow()); } + private static class TabletIterFactoryImpl implements TabletIterFactory { + private final List<AutoCloseable> resourcesToClose = new ArrayList<>(); + private final Manager manager; + private final BulkInfo bulkInfo; + + public TabletIterFactoryImpl(Manager manager, BulkInfo bulkInfo) { + this.manager = manager; + this.bulkInfo = bulkInfo; + } + + @Override + public Iterator<KeyExtent> newTabletIter(Text startRow) { + TabletsMetadata tabletsMetadata = + TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId) + .overlapping(startRow, null).checkConsistency().fetch(PREV_ROW).build(); + resourcesToClose.add(tabletsMetadata); + return tabletsMetadata.stream().map(TabletMetadata::getExtent).iterator(); + } + + @Override + public void close() throws Exception { + for (AutoCloseable resource : resourcesToClose) { + resource.close(); + } + } + } + private KeyExtent checkForMerge(final long tid, final Manager manager) throws Exception { VolumeManager fs = manager.getVolumeManager(); @@ -202,14 +230,10 @@ public class PrepBulkImport extends ManagerRepo { int maxTablets = manager.getContext().getTableConfiguration(bulkInfo.tableId) .getCount(Property.TABLE_BULK_MAX_TABLETS); - try (LoadMappingIterator lmi = - BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) { - - TabletIterFactory tabletIterFactory = - startRow -> TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId) - .overlapping(startRow, null).checkConsistency().fetch(PREV_ROW).build().stream() - .map(TabletMetadata::getExtent).iterator(); - + try ( + LoadMappingIterator lmi = + BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open); + TabletIterFactory tabletIterFactory = new TabletIterFactoryImpl(manager, bulkInfo)) { return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets, tid); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index b17ce8b63c..d005faf04e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -104,18 +104,19 @@ class CompactionDriver extends ManagerRepo { int tabletsToWaitFor = 0; int tabletCount = 0; - TabletsMetadata tablets = TabletsMetadata.builder(manager.getContext()).forTable(tableId) - .overlapping(startRow, endRow).fetch(LOCATION, PREV_ROW, COMPACT_ID).build(); - - for (TabletMetadata tablet : tablets) { - if (tablet.getCompactId().orElse(-1) < compactId) { - tabletsToWaitFor++; - if (tablet.hasCurrent()) { - serversToFlush.increment(tablet.getLocation().getServerInstance(), 1); + try (TabletsMetadata tablets = TabletsMetadata.builder(manager.getContext()).forTable(tableId) + .overlapping(startRow, endRow).fetch(LOCATION, PREV_ROW, COMPACT_ID).build()) { + + for (TabletMetadata tablet : tablets) { + if (tablet.getCompactId().orElse(-1) < compactId) { + tabletsToWaitFor++; + if (tablet.hasCurrent()) { + serversToFlush.increment(tablet.getLocation().getServerInstance(), 1); + } } - } - tabletCount++; + tabletCount++; + } } long scanTime = System.currentTimeMillis() - t1; diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java index 62bbbd1402..dff909896b 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -96,21 +97,29 @@ public class PrepBulkImportTest { public void runTest(Map<KeyExtent,String> loadRanges, List<KeyExtent> tabletRanges, int maxTablets) throws Exception { - TabletIterFactory tabletIterFactory = startRow -> { - int start = -1; - - if (startRow == null) { - start = 0; - } else { - for (int i = 0; i < tabletRanges.size(); i++) { - if (tabletRanges.get(i).contains(startRow)) { - start = i; - break; + TabletIterFactory tabletIterFactory = new TabletIterFactory() { + @Override + public Iterator<KeyExtent> newTabletIter(Text startRow) { + int start = -1; + + if (startRow == null) { + start = 0; + } else { + for (int i = 0; i < tabletRanges.size(); i++) { + if (tabletRanges.get(i).contains(startRow)) { + start = i; + break; + } } } + + return tabletRanges.subList(start, tabletRanges.size()).iterator(); } - return tabletRanges.subList(start, tabletRanges.size()).iterator(); + @Override + public void close() { + // nothing to close + } }; var sortedExtents = loadRanges.keySet().stream().sorted().collect(Collectors.toList()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index fe227959f7..00ffa10391 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -77,6 +77,7 @@ import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.tabletscan.thrift.ActiveScan; @@ -152,10 +153,15 @@ public class ScanServer extends AbstractServer @Override public Map<? extends KeyExtent,? extends TabletMetadata> loadAll(Set<? extends KeyExtent> keys) { + Map<KeyExtent,TabletMetadata> tms; long t1 = System.currentTimeMillis(); @SuppressWarnings("unchecked") - var tms = ample.readTablets().forTablets((Collection<KeyExtent>) keys, Optional.empty()) - .build().stream().collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm)); + Collection<KeyExtent> extents = (Collection<KeyExtent>) keys; + try (TabletsMetadata tabletsMetadata = + ample.readTablets().forTablets(extents, Optional.empty()).build()) { + tms = tabletsMetadata.stream().onClose(tabletsMetadata::close) + .collect(Collectors.toMap(TabletMetadata::getExtent, tm -> tm)); + } long t2 = System.currentTimeMillis(); LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1); return tms; diff --git a/test/src/main/java/org/apache/accumulo/test/AmpleIT.java b/test/src/main/java/org/apache/accumulo/test/AmpleIT.java index fb78b7fad8..e89a1fd4b4 100644 --- a/test/src/main/java/org/apache/accumulo/test/AmpleIT.java +++ b/test/src/main/java/org/apache/accumulo/test/AmpleIT.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@ -46,14 +47,19 @@ public class AmpleIT extends AccumuloClusterHarness { private void runFetchTest(Ample ample, List<KeyExtent> extentsToFetch, Set<KeyExtent> expected, Set<KeyExtent> expectMissing) { + Set<KeyExtent> extentsSeen; // always run a test without a consumer for not seen tablets as this takes a different code path - var extentsSeen = ample.readTablets().forTablets(extentsToFetch, Optional.empty()).build() - .stream().map(TabletMetadata::getExtent).collect(toSet()); - assertEquals(expected, extentsSeen); + try (TabletsMetadata tm = + ample.readTablets().forTablets(extentsToFetch, Optional.empty()).build()) { + extentsSeen = tm.stream().map(TabletMetadata::getExtent).collect(toSet()); + assertEquals(expected, extentsSeen); + } HashSet<KeyExtent> extentsNotSeen = new HashSet<>(); - extentsSeen = ample.readTablets().forTablets(extentsToFetch, Optional.of(extentsNotSeen::add)) - .build().stream().map(TabletMetadata::getExtent).collect(toSet()); + try (TabletsMetadata tm = + ample.readTablets().forTablets(extentsToFetch, Optional.of(extentsNotSeen::add)).build()) { + extentsSeen = tm.stream().map(TabletMetadata::getExtent).collect(toSet()); + } assertEquals(expected, extentsSeen); assertEquals(expectMissing, extentsNotSeen); } diff --git a/test/src/main/java/org/apache/accumulo/test/GCRunIT.java b/test/src/main/java/org/apache/accumulo/test/GCRunIT.java index c57f4be4dc..a48b0a595e 100644 --- a/test/src/main/java/org/apache/accumulo/test/GCRunIT.java +++ b/test/src/main/java/org/apache/accumulo/test/GCRunIT.java @@ -27,7 +27,7 @@ import java.time.Duration; import java.util.List; import java.util.TreeSet; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -155,16 +155,11 @@ public class GCRunIT extends SharedMiniClusterBase { public void forceMissingPrevRowTest() {} private void scanReferences(GCRun userGC) { - final AtomicInteger counter = new AtomicInteger(0); // loop through the user table references - the row deleted above should violate dir present. - var userTableIter = userGC.getReferences().iterator(); - while (userTableIter.hasNext()) { - Reference ref = userTableIter.next(); - counter.incrementAndGet(); - log.trace("user ref: {}", ref); + try (Stream<Reference> references = userGC.getReferences()) { + long count = references.peek(ref -> log.trace("user ref: {}", ref)).count(); + assertTrue(count > 0); } - - assertTrue(counter.get() > 0); } private void fillMetadataEntries(final String table1, final String clone1) throws Exception { diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java index de7413a898..68b3bff0c7 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java @@ -251,8 +251,10 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { assertEquals(fileCount, metadataScanFileRefs.size()); assertEquals(fileCount, ctx.getAmple().getScanServerFileReferences().count()); - - List<Reference> refs = gc.getReferences().collect(Collectors.toList()); + List<Reference> refs; + try (Stream<Reference> references = gc.getReferences()) { + refs = references.collect(Collectors.toList()); + } assertTrue(refs.size() > fileCount * 2); List<Reference> tableRefs = refs.stream().filter(r -> r.getTableId().equals(tid) && !r.isDirectory()) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java index 12b0eb5286..b52e7a41c7 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java @@ -71,9 +71,11 @@ public class ExternalCompaction4_IT extends AccumuloClusterHarness { ReadWriteIT.verify(client, 50, 1, 1, 0, table1); Ample ample = ((ClientContext) client).getAmple(); - TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build(); - TabletMetadata tm = tms.iterator().next(); - assertEquals(50, tm.getFiles().size()); + try ( + TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) { + TabletMetadata tm = tms.iterator().next(); + assertEquals(50, tm.getFiles().size()); + } IteratorSetting setting = new IteratorSetting(50, "ageoff", AgeOffFilter.class); setting.addOption("ttl", "0"); @@ -87,8 +89,9 @@ public class ExternalCompaction4_IT extends AccumuloClusterHarness { client.tableOperations().attachIterator(table1, setting2, EnumSet.of(IteratorScope.majc)); client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); - assertThrows(NoSuchElementException.class, () -> ample.readTablets().forTable(tid) - .fetch(ColumnType.FILES).build().iterator().next()); + try (TabletsMetadata tm = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) { + assertThrows(NoSuchElementException.class, () -> tm.iterator().next()); + } assertEquals(0, client.createScanner(table1).stream().count()); } finally { getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); @@ -110,18 +113,22 @@ public class ExternalCompaction4_IT extends AccumuloClusterHarness { ReadWriteIT.ingest(client, 1000, 1, 1, 0, "colf", table1, 1); Ample ample = ((ClientContext) client).getAmple(); - TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build(); - TabletMetadata tm = tms.iterator().next(); - assertEquals(1000, tm.getFiles().size()); + try ( + TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) { + TabletMetadata tm = tms.iterator().next(); + assertEquals(1000, tm.getFiles().size()); + } IteratorSetting setting = new IteratorSetting(50, "error", ErrorThrowingIterator.class); setting.addOption(ErrorThrowingIterator.TIMES, "3"); client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); - tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build(); - tm = tms.iterator().next(); - assertEquals(1, tm.getFiles().size()); + try ( + TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) { + TabletMetadata tm = tms.iterator().next(); + assertEquals(1, tm.getFiles().size()); + } ReadWriteIT.verify(client, 1000, 1, 1, 0, table1); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 35630437ab..1713c4f496 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.compaction; +import static com.google.common.collect.MoreCollectors.onlyElement; import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; @@ -409,20 +410,17 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { } LOG.info("Validating metadata table contents."); - TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) - .fetch(ColumnType.ECOMP).build(); - List<TabletMetadata> md = new ArrayList<>(); - tm.forEach(t -> md.add(t)); - assertEquals(1, md.size()); - TabletMetadata m = md.get(0); - Map<ExternalCompactionId,ExternalCompactionMetadata> em = m.getExternalCompactions(); - assertEquals(1, em.size()); - List<ExternalCompactionFinalState> finished = new ArrayList<>(); - getFinalStatesForTable(getCluster(), tid).forEach(f -> finished.add(f)); - assertEquals(1, finished.size()); - assertEquals(em.entrySet().iterator().next().getKey(), - finished.get(0).getExternalCompactionId()); - tm.close(); + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forTable(tid).fetch(ColumnType.ECOMP).build()) { + TabletMetadata m = tm.stream().collect(onlyElement()); + Map<ExternalCompactionId,ExternalCompactionMetadata> em = m.getExternalCompactions(); + assertEquals(1, em.size()); + List<ExternalCompactionFinalState> finished = new ArrayList<>(); + getFinalStatesForTable(getCluster(), tid).forEach(f -> finished.add(f)); + assertEquals(1, finished.size()); + assertEquals(em.entrySet().iterator().next().getKey(), + finished.get(0).getExternalCompactionId()); + } // Force a flush on the metadata table before killing our tserver client.tableOperations().flush(MetadataTable.NAME); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java index 9b0a94efae..3b902a5eab 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java @@ -336,10 +336,10 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { confirmCompactionCompleted(getCluster().getServerContext(), ecids, TCompactionState.CANCELLED); - TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) - .fetch(ColumnType.ECOMP).build(); - assertEquals(0, tm.stream().count()); - tm.close(); + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forTable(tid).fetch(ColumnType.ECOMP).build()) { + assertEquals(0, tm.stream().count()); + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index f29227b79e..a5e3ea4c1b 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -377,18 +377,22 @@ public class CompactionIT extends AccumuloClusterHarness { ReadWriteIT.ingest(client, MAX_DATA, 1, 1, 0, "colf", table1, 1); Ample ample = ((ClientContext) client).getAmple(); - TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build(); - TabletMetadata tm = tms.iterator().next(); - assertEquals(1000, tm.getFiles().size()); + try (TabletsMetadata tms = + ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();) { + TabletMetadata tm = tms.iterator().next(); + assertEquals(1000, tm.getFiles().size()); + } IteratorSetting setting = new IteratorSetting(50, "error", ErrorThrowingIterator.class); setting.addOption(ErrorThrowingIterator.TIMES, "3"); client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); - tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build(); - tm = tms.iterator().next(); - assertEquals(1, tm.getFiles().size()); + try ( + TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) { + TabletMetadata tm = tms.iterator().next(); + assertEquals(1, tm.getFiles().size()); + } ReadWriteIT.verify(client, MAX_DATA, 1, 1, 0, table1); @@ -408,9 +412,11 @@ public class CompactionIT extends AccumuloClusterHarness { ReadWriteIT.verify(client, 50, 1, 1, 0, table1); Ample ample = ((ClientContext) client).getAmple(); - TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build(); - TabletMetadata tm = tms.iterator().next(); - assertEquals(50, tm.getFiles().size()); + try ( + TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) { + TabletMetadata tm = tms.iterator().next(); + assertEquals(50, tm.getFiles().size()); + } IteratorSetting setting = new IteratorSetting(50, "ageoff", AgeOffFilter.class); setting.addOption("ttl", "0"); @@ -424,8 +430,9 @@ public class CompactionIT extends AccumuloClusterHarness { client.tableOperations().attachIterator(table1, setting2, EnumSet.of(IteratorScope.majc)); client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); - assertThrows(NoSuchElementException.class, () -> ample.readTablets().forTable(tid) - .fetch(ColumnType.FILES).build().iterator().next()); + try (TabletsMetadata tm = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) { + assertThrows(NoSuchElementException.class, () -> tm.iterator().next()); + } assertEquals(0, client.createScanner(table1).stream().count()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashBase.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashBase.java index f1411a04fc..8904dadd67 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashBase.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashBase.java @@ -26,6 +26,7 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.data.TableId; @@ -51,14 +52,14 @@ public class GarbageCollectorTrashBase extends ConfigurableMacBase { protected ArrayList<StoredTabletFile> getFilesForTable(ServerContext ctx, AccumuloClient client, String tableName) { String tid = client.tableOperations().tableIdMap().get(tableName); - TabletsMetadata tms = - ctx.getAmple().readTablets().forTable(TableId.of(tid)).fetch(ColumnType.FILES).build(); - ArrayList<StoredTabletFile> files = new ArrayList<>(); - tms.forEach(tm -> { - files.addAll(tm.getFiles()); - }); - LOG.debug("Tablet files: {}", files); - return files; + try (TabletsMetadata tms = + ctx.getAmple().readTablets().forTable(TableId.of(tid)).fetch(ColumnType.FILES).build()) { + ArrayList<StoredTabletFile> files = + tms.stream().flatMap(tabletMetadata -> tabletMetadata.getFiles().stream()) + .collect(Collectors.toCollection(ArrayList::new)); + LOG.debug("Tablet files: {}", files); + return files; + } } protected ArrayList<StoredTabletFile> loadData(ServerContext ctx, AccumuloClient client, diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java index 0571dbce7c..14ff0f90b1 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MetadataIT.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -174,12 +175,16 @@ public class MetadataIT extends AccumuloClusterHarness { Text startRow = new Text("a"); Text endRow = new Text("z"); - // Call up Ample from the client context using table "t" and build - TabletsMetadata tablets = cc.getAmple().readTablets().forTable(TableId.of("1")) - .overlapping(startRow, endRow).fetch(FILES, LOCATION, LAST, PREV_ROW).build(); + TabletMetadata tabletMetadata0; + TabletMetadata tabletMetadata1; - TabletMetadata tabletMetadata0 = tablets.stream().findFirst().orElseThrow(); - TabletMetadata tabletMetadata1 = tablets.stream().skip(1).findFirst().orElseThrow(); + // Call up Ample from the client context using table "t" and build + try (TabletsMetadata tm = cc.getAmple().readTablets().forTable(TableId.of("1")) + .overlapping(startRow, endRow).fetch(FILES, LOCATION, LAST, PREV_ROW).build()) { + var tablets = tm.stream().limit(2).collect(Collectors.toList()); + tabletMetadata0 = tablets.get(0); + tabletMetadata1 = tablets.get(1); + } String infoTabletId0 = tabletMetadata0.getTableId().toString(); String infoExtent0 = tabletMetadata0.getExtent().toString();