keith-turner commented on code in PR #6027: URL: https://github.com/apache/accumulo/pull/6027#discussion_r2632285611
########## core/src/main/java/org/apache/accumulo/core/clientImpl/TabletInformationCollector.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import org.apache.accumulo.core.client.admin.TabletInformation; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.TabletState; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Suppliers; + +/** + * Utility to read tablet information for one or more ranges. + */ +public class TabletInformationCollector { + + private static final Logger log = LoggerFactory.getLogger(TabletInformationCollector.class); + + private TabletInformationCollector() {} + + /** + * Fetch tablet information for the provided ranges. Ranges will be merged. + */ + public static Stream<TabletInformation> getTabletInformation(ClientContext context, + TableId tableId, List<Range> ranges, EnumSet<TabletInformation.Field> fields) { + var mergedRanges = Range.mergeOverlapping(ranges); + + EnumSet<ColumnType> columns = columnsForFields(fields); + + Set<TServerInstance> liveTserverSet = TabletMetadata.getLiveTServers(context); + + Supplier<Duration> currentTime = Suppliers.memoize(() -> { + try { + return context.instanceOperations().getManagerTime(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + }); + + List<Stream<TabletInformation>> tabletStreams = new ArrayList<>(); + // TODO replace the per-range builds below with a single multirange (maybe new TabletsMetadata + // logic) + for (Range range : mergedRanges) { + + Text startRow = range.getStartKey() == null ? null : range.getStartKey().getRow(); + Text endRow = range.getEndKey() == null ? null : range.getEndKey().getRow(); + boolean startInclusive = range.getStartKey() == null || range.isStartKeyInclusive(); + + var tm = context.getAmple().readTablets().forTable(tableId) + .overlapping(startRow, startInclusive, endRow).fetch(columns.toArray(ColumnType[]::new)) + .checkConsistency().build(); + + // we need to stop the stream when we have passed the end bound + Predicate<TabletMetadata> tabletMetadataPredicate = tme -> tme.getPrevEndRow() == null + || !range.afterEndKey(new Key(tme.getPrevEndRow()).followingKey(PartialKey.ROW)); + + Stream<TabletInformation> stream = tm.stream().onClose(tm::close).peek(tme -> { + if (startRow != null && tme.getEndRow() != null + && tme.getEndRow().compareTo(startRow) < 0) { + log.debug("tablet {} is before scan start range: {}", tme.getExtent(), startRow); + throw new RuntimeException("Bug in ample or this code."); + } + }).takeWhile(tabletMetadataPredicate).map(tme -> new TabletInformationImpl(tme, + () -> TabletState.compute(tme, liveTserverSet).toString(), currentTime)); + tabletStreams.add(stream); + } + + return tabletStreams.stream().reduce(Stream::concat).orElseGet(Stream::empty); Review Comment: When the returned stream is closed will it close all the streams in `tabletStreams`? ########## core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java: ########## @@ -1090,6 +1090,20 @@ default Stream<TabletInformation> getTabletInformation(final String tableName, f throw new UnsupportedOperationException(); } + /** + * @param ranges the ranges of tablets to scan. Ranges can overlap and an attempt will be made to + * merge this list + * @param fields can optionally narrow the data retrieved per tablet, which can speed up streaming + * over tablets. If this list is empty then all fields are fetched. + * @return a stream of tablet information for tablets that fall in the specified ranges. The + * stream may be backed by a scanner, so it's best to close the stream. + * @since 4.0.0 + */ + default Stream<TabletInformation> getTabletInformation(final String tableName, Review Comment: We could drop the other getTabletInformation method from the API, its a new 4.0 API so we do not have to keep it for any reason. Would be good to eventually make this api take a `List<RowRange>` instead of `Range`. We can only really use the row part of the range. ########## core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java: ########## @@ -86,4 +89,10 @@ List<TabletStatistics> listOnlineTabletsForTable(TabletServerId tabletServerId, * none is configured. */ String tableContext(TableId tableId); + + /** + * Retrieve tablet information for the provided list of ranges. + */ Review Comment: This javadoc should have a since tag. A reminder to close the stream in the javadoc would be nice too. ########## test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java: ########## @@ -1037,6 +1038,59 @@ public void testGetTabletInformation() throws Exception { }); } + var unboundedStartRange = new Range(null, new Text("4")); + try (var tablets = accumuloClient.tableOperations().getTabletInformation(tableName, + unboundedStartRange, TabletInformation.Field.LOCATION)) { + var endRows = tablets.map(TabletInformation::getTabletId).map(TabletId::getEndRow) + .map(er -> er == null ? "null" : er.toString()).toList(); + assertEquals(List.of("1", "2", "3", "4"), endRows); + } + + var unboundedEndRange = new Range(new Text("6"), true, null, true); + try (var tablets = accumuloClient.tableOperations().getTabletInformation(tableName, + unboundedEndRange, TabletInformation.Field.LOCATION)) { + var endRows = tablets.map(TabletInformation::getTabletId).map(TabletId::getEndRow) + .map(er -> er == null ? "null" : er.toString()).toList(); + assertEquals(List.of("6", "7", "8", "null"), endRows); + } + + var exclusiveStartRange = new Range(new Text("4"), false, new Text("6"), true); + try (var tablets = accumuloClient.tableOperations().getTabletInformation(tableName, + exclusiveStartRange, TabletInformation.Field.LOCATION)) { + var endRows = tablets.map(TabletInformation::getTabletId).map(TabletId::getEndRow) + .map(Text::toString).toList(); + assertEquals(List.of("5", "6"), endRows); + } + + var inclusiveStartRange = new Range(new Text("4"), true, new Text("6"), true); + try (var tablets = accumuloClient.tableOperations().getTabletInformation(tableName, + inclusiveStartRange, TabletInformation.Field.LOCATION)) { + var endRows = tablets.map(TabletInformation::getTabletId).map(TabletId::getEndRow) + .map(Text::toString).toList(); + assertEquals(List.of("4", "5", "6"), endRows); + } + + var fileFieldMissingRange = List.of(new Range(new Text("2"), new Text("4"))); + try (var tablets = accumuloClient.tableOperations().getTabletInformation(tableName, + fileFieldMissingRange, TabletInformation.Field.LOCATION)) { + tablets.forEach(ti -> assertThrows(IllegalStateException.class, ti::getNumFiles)); + } + + var overlappingRange = new Range(new Text("2"), new Text("4")); + var overlappingRange2 = new Range(new Text("3"), new Text("5")); + var disjointRange = new Range(new Text("7"), new Text("8")); + try (var tablets = accumuloClient.tableOperations().getTabletInformation(tableName, + List.of(overlappingRange, overlappingRange2, disjointRange), + TabletInformation.Field.LOCATION)) { + var tabletIds = tablets.map(TabletInformation::getTabletId).toList(); + var endRows = tabletIds.stream().map(TabletId::getEndRow).map(Text::toString).toList(); + assertEquals(6, tabletIds.size()); + assertEquals(tabletIds.size(), new HashSet<>(tabletIds).size()); + assertEquals(endRows, new ArrayList<>(new LinkedHashSet<>(endRows))); + assertEquals(Set.of("2", "3", "4", "5", "7", "8"), new HashSet<>(endRows)); + assertEquals(List.of("2", "3", "4", "5", "7", "8"), endRows); Review Comment: Why is the assertEquals for the set needed? Seems like the List case covers the expected data and ensure there are no dupes. It seems like the list case is testing if the ranges were merged correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
