keith-turner commented on code in PR #6027:
URL: https://github.com/apache/accumulo/pull/6027#discussion_r2632285611


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletInformationCollector.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.admin.TabletInformation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Suppliers;
+
+/**
+ * Utility to read tablet information for one or more ranges.
+ */
+public class TabletInformationCollector {
+
+  private static final Logger log = 
LoggerFactory.getLogger(TabletInformationCollector.class);
+
+  private TabletInformationCollector() {}
+
+  /**
+   * Fetch tablet information for the provided ranges. Ranges will be merged.
+   */
+  public static Stream<TabletInformation> getTabletInformation(ClientContext 
context,
+      TableId tableId, List<Range> ranges, EnumSet<TabletInformation.Field> 
fields) {
+    var mergedRanges = Range.mergeOverlapping(ranges);
+
+    EnumSet<ColumnType> columns = columnsForFields(fields);
+
+    Set<TServerInstance> liveTserverSet = 
TabletMetadata.getLiveTServers(context);
+
+    Supplier<Duration> currentTime = Suppliers.memoize(() -> {
+      try {
+        return context.instanceOperations().getManagerTime();
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
+      }
+    });
+
+    List<Stream<TabletInformation>> tabletStreams = new ArrayList<>();
+    // TODO replace the per-range builds below with a single multirange (maybe 
new TabletsMetadata
+    // logic)
+    for (Range range : mergedRanges) {
+
+      Text startRow = range.getStartKey() == null ? null : 
range.getStartKey().getRow();
+      Text endRow = range.getEndKey() == null ? null : 
range.getEndKey().getRow();
+      boolean startInclusive = range.getStartKey() == null || 
range.isStartKeyInclusive();
+
+      var tm = context.getAmple().readTablets().forTable(tableId)
+          .overlapping(startRow, startInclusive, 
endRow).fetch(columns.toArray(ColumnType[]::new))
+          .checkConsistency().build();
+
+      // we need to stop the stream when we have passed the end bound
+      Predicate<TabletMetadata> tabletMetadataPredicate = tme -> 
tme.getPrevEndRow() == null
+          || !range.afterEndKey(new 
Key(tme.getPrevEndRow()).followingKey(PartialKey.ROW));
+
+      Stream<TabletInformation> stream = 
tm.stream().onClose(tm::close).peek(tme -> {
+        if (startRow != null && tme.getEndRow() != null
+            && tme.getEndRow().compareTo(startRow) < 0) {
+          log.debug("tablet {} is before scan start range: {}", 
tme.getExtent(), startRow);
+          throw new RuntimeException("Bug in ample or this code.");
+        }
+      }).takeWhile(tabletMetadataPredicate).map(tme -> new 
TabletInformationImpl(tme,
+          () -> TabletState.compute(tme, liveTserverSet).toString(), 
currentTime));
+      tabletStreams.add(stream);
+    }
+
+    return 
tabletStreams.stream().reduce(Stream::concat).orElseGet(Stream::empty);

Review Comment:
   When the returned stream is closed will it close all the streams in 
`tabletStreams`?



##########
core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java:
##########
@@ -1090,6 +1090,20 @@ default Stream<TabletInformation> 
getTabletInformation(final String tableName, f
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * @param ranges the ranges of tablets to scan. Ranges can overlap and an 
attempt will be made to
+   *        merge this list
+   * @param fields can optionally narrow the data retrieved per tablet, which 
can speed up streaming
+   *        over tablets. If this list is empty then all fields are fetched.
+   * @return a stream of tablet information for tablets that fall in the 
specified ranges. The
+   *         stream may be backed by a scanner, so it's best to close the 
stream.
+   * @since 4.0.0
+   */
+  default Stream<TabletInformation> getTabletInformation(final String 
tableName,

Review Comment:
   We could drop the other getTabletInformation method from the API, its a new 
4.0 API so we do not have to keep it for any reason.
   
   Would be good to eventually make this api take a `List<RowRange>` instead of 
`Range`.   We can only really use the row part of the range.



##########
core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java:
##########
@@ -86,4 +89,10 @@ List<TabletStatistics> 
listOnlineTabletsForTable(TabletServerId tabletServerId,
    * none is configured.
    */
   String tableContext(TableId tableId);
+
+  /**
+   * Retrieve tablet information for the provided list of ranges.
+   */

Review Comment:
   This javadoc should have a since tag.  A reminder to close the stream in the 
javadoc would be nice too.



##########
test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java:
##########
@@ -1037,6 +1038,59 @@ public void testGetTabletInformation() throws Exception {
         });
       }
 
+      var unboundedStartRange = new Range(null, new Text("4"));
+      try (var tablets = 
accumuloClient.tableOperations().getTabletInformation(tableName,
+          unboundedStartRange, TabletInformation.Field.LOCATION)) {
+        var endRows = 
tablets.map(TabletInformation::getTabletId).map(TabletId::getEndRow)
+            .map(er -> er == null ? "null" : er.toString()).toList();
+        assertEquals(List.of("1", "2", "3", "4"), endRows);
+      }
+
+      var unboundedEndRange = new Range(new Text("6"), true, null, true);
+      try (var tablets = 
accumuloClient.tableOperations().getTabletInformation(tableName,
+          unboundedEndRange, TabletInformation.Field.LOCATION)) {
+        var endRows = 
tablets.map(TabletInformation::getTabletId).map(TabletId::getEndRow)
+            .map(er -> er == null ? "null" : er.toString()).toList();
+        assertEquals(List.of("6", "7", "8", "null"), endRows);
+      }
+
+      var exclusiveStartRange = new Range(new Text("4"), false, new Text("6"), 
true);
+      try (var tablets = 
accumuloClient.tableOperations().getTabletInformation(tableName,
+          exclusiveStartRange, TabletInformation.Field.LOCATION)) {
+        var endRows = 
tablets.map(TabletInformation::getTabletId).map(TabletId::getEndRow)
+            .map(Text::toString).toList();
+        assertEquals(List.of("5", "6"), endRows);
+      }
+
+      var inclusiveStartRange = new Range(new Text("4"), true, new Text("6"), 
true);
+      try (var tablets = 
accumuloClient.tableOperations().getTabletInformation(tableName,
+          inclusiveStartRange, TabletInformation.Field.LOCATION)) {
+        var endRows = 
tablets.map(TabletInformation::getTabletId).map(TabletId::getEndRow)
+            .map(Text::toString).toList();
+        assertEquals(List.of("4", "5", "6"), endRows);
+      }
+
+      var fileFieldMissingRange = List.of(new Range(new Text("2"), new 
Text("4")));
+      try (var tablets = 
accumuloClient.tableOperations().getTabletInformation(tableName,
+          fileFieldMissingRange, TabletInformation.Field.LOCATION)) {
+        tablets.forEach(ti -> assertThrows(IllegalStateException.class, 
ti::getNumFiles));
+      }
+
+      var overlappingRange = new Range(new Text("2"), new Text("4"));
+      var overlappingRange2 = new Range(new Text("3"), new Text("5"));
+      var disjointRange = new Range(new Text("7"), new Text("8"));
+      try (var tablets = 
accumuloClient.tableOperations().getTabletInformation(tableName,
+          List.of(overlappingRange, overlappingRange2, disjointRange),
+          TabletInformation.Field.LOCATION)) {
+        var tabletIds = tablets.map(TabletInformation::getTabletId).toList();
+        var endRows = 
tabletIds.stream().map(TabletId::getEndRow).map(Text::toString).toList();
+        assertEquals(6, tabletIds.size());
+        assertEquals(tabletIds.size(), new HashSet<>(tabletIds).size());
+        assertEquals(endRows, new ArrayList<>(new LinkedHashSet<>(endRows)));
+        assertEquals(Set.of("2", "3", "4", "5", "7", "8"), new 
HashSet<>(endRows));
+        assertEquals(List.of("2", "3", "4", "5", "7", "8"), endRows);

Review Comment:
   Why is the assertEquals for the set needed?  Seems like the List case covers 
the expected data and ensure there are no dupes.  It seems like the list case 
is testing if the ranges were merged correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to