keith-turner commented on code in PR #3968:
URL: https://github.com/apache/accumulo/pull/3968#discussion_r1441010667


##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1215,9 +1216,9 @@ private void recordCompletion(ExternalCompactionId ecid) {
   }
 
   protected Set<ExternalCompactionId> readExternalCompactionIds() {
-    return 
this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER).fetch(ECOMP).build()
-        .stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream())
-        .collect(Collectors.toSet());
+    return this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
+        .filter(new 
HasExternalCompactionsFilter()).fetch(ECOMP).build().stream()

Review Comment:
   When only the ECOMP column is fetched, should not need the 
HasExternalCompactionsFilter because only tablets that have that column present 
will be retrieved.  There is another place where HasExternalCompactionsFilter 
is used that fetches the PREV_ROW and ECOMP column, in that case the filter is 
useful because otherwise every tablet would be returned by the scan.



##########
server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java:
##########
@@ -143,16 +144,14 @@ public TabletServers 
getParticipatingTabletServers(@PathParam("tableId") @NotNul
       locs.add(rootTabletLocation);
     } else {
       var level = Ample.DataLevel.of(tableId);
-      try (TabletsMetadata tablets =
-          
monitor.getContext().getAmple().readTablets().forLevel(level).build()) {
+      try (TabletsMetadata tablets = 
monitor.getContext().getAmple().readTablets().forLevel(level)

Review Comment:
   This is neat.



##########
core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java:
##########
@@ -475,6 +517,14 @@ public interface Options {
      * {@link ReadConsistency#IMMEDIATE}
      */
     Options readConsistency(ReadConsistency readConsistency);
+
+    /**
+     * Adds a filter to be applied while fetching the data. Filters are 
applied in the order they
+     * are added. This method can be called multiple times to chain multiple 
filters together. The
+     * first filter added has the highest priority and each subsequent filter 
is applied with a
+     * sequentially lower priority.

Review Comment:
   ```suggestion
        * sequentially lower priority. If columns needed by a filter are not 
fetched then a runtime exception is thrown.
   ```



##########
test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java:
##########
@@ -851,6 +872,188 @@ public void testTime() {
     }
   }
 
+  @Nested
+  public class TestFilter {
+
+    /**
+     * @param filters set of filters to apply to the readTablets operation
+     * @param expectedTablets set of tablets expected to be returned with the 
filters applied
+     */
+    private void testFilterApplied(ServerContext context, 
Set<TabletMetadataFilter> filters,
+        Set<KeyExtent> expectedTablets, String message) {
+      TabletsMetadata.TableRangeOptions options = 
context.getAmple().readTablets().forTable(tid);
+      // add the filter(s) to the operation before building
+      for (TabletMetadataFilter filter : filters) {
+        options.filter(filter);
+        // fetch the columns that the filter needs to evaluate. not necessary 
but makes the call
+        // more refined.
+        for (TabletMetadata.ColumnType columnType : filter.getColumns()) {
+          options.fetch(columnType);

Review Comment:
   It would be good to run the test twice.  Run it once with fetching like the 
code is currently doing.  Run it again where nothing is fetched, which should 
fetch all columns and should work.



##########
test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java:
##########
@@ -851,6 +872,188 @@ public void testTime() {
     }
   }
 
+  @Nested
+  public class TestFilter {
+
+    /**
+     * @param filters set of filters to apply to the readTablets operation
+     * @param expectedTablets set of tablets expected to be returned with the 
filters applied
+     */
+    private void testFilterApplied(ServerContext context, 
Set<TabletMetadataFilter> filters,
+        Set<KeyExtent> expectedTablets, String message) {
+      TabletsMetadata.TableRangeOptions options = 
context.getAmple().readTablets().forTable(tid);
+      // add the filter(s) to the operation before building
+      for (TabletMetadataFilter filter : filters) {
+        options.filter(filter);
+        // fetch the columns that the filter needs to evaluate. not necessary 
but makes the call
+        // more refined.
+        for (TabletMetadata.ColumnType columnType : filter.getColumns()) {
+          options.fetch(columnType);
+        }
+      }
+      // need to fetch PREV_ROW in order to use getExtent()
+      try (TabletsMetadata tablets = options.fetch(PREV_ROW).build()) {
+        Set<KeyExtent> actual =
+            
tablets.stream().map(TabletMetadata::getExtent).collect(Collectors.toSet());
+        assertEquals(expectedTablets, actual, message);
+      }
+    }
+
+    @Test
+    public void multipleFilters() {
+      ServerContext context = cluster.getServerContext();
+      ConditionalTabletsMutatorImpl ctmi;
+
+      // make sure we read all tablets on table initially with no filters
+      testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
+          "Initially, all tablets should be present");
+
+      String server = "server1+8555";
+
+      String walFilePath = java.nio.file.Path.of(server, 
UUID.randomUUID().toString()).toString();
+      LogEntry wal = LogEntry.fromPath(walFilePath);
+
+      // add wal compact and flush ID to these tablets
+      final Set<KeyExtent> tabletsWithWalCompactFlush = Set.of(e1, e2, e3);
+      for (KeyExtent ke : tabletsWithWalCompactFlush) {
+        ctmi = new ConditionalTabletsMutatorImpl(context);
+        ctmi.mutateTablet(ke).requireAbsentOperation().putCompacted(34L)
+            .putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).putWal(wal)
+            .submit(tabletMetadata -> false);
+        var results = ctmi.process();
+        assertEquals(Status.ACCEPTED, results.get(ke).getStatus());
+      }
+      // check that applying a combination of filters returns only tablets 
that meet the criteria
+      testFilterApplied(context, Set.of(new TestTabletMetadataFilter(), new 
HasWalsFilter()),
+          tabletsWithWalCompactFlush, "Combination of filters did not return 
the expected tablets");
+
+      TServerInstance serverInstance = new TServerInstance(server, 1L);
+
+      // on a subset of the tablets, put a location
+      final Set<KeyExtent> tabletsWithLocation = Set.of(e2, e3, e4);
+      for (KeyExtent ke : tabletsWithLocation) {
+        ctmi = new ConditionalTabletsMutatorImpl(context);
+        ctmi.mutateTablet(ke).requireAbsentOperation().requireAbsentLocation()
+            
.putLocation(Location.current(serverInstance)).submit(tabletMetadata -> false);
+        var results = ctmi.process();
+        assertEquals(Status.ACCEPTED, results.get(ke).getStatus());
+        assertEquals(Location.current(serverInstance),
+            context.getAmple().readTablet(ke).getLocation(),
+            "Did not see expected location after adding it");
+      }
+
+      // test that the new subset is returned with all 3 filters applied
+      Set<KeyExtent> expected = Sets.intersection(tabletsWithWalCompactFlush, 
tabletsWithLocation);
+      assertFalse(expected.isEmpty());
+      testFilterApplied(context,
+          Set.of(new HasCurrentFilter(), new HasWalsFilter(), new 
TestTabletMetadataFilter()),
+          expected, "Combination of filters did not return the expected 
tablets");
+    }
+
+    @Test
+    public void testCompactedAndFlushIdFilter() {
+      ServerContext context = cluster.getServerContext();
+      ConditionalTabletsMutatorImpl ctmi = new 
ConditionalTabletsMutatorImpl(context);
+      Set<TabletMetadataFilter> filter = Set.of(new 
TestTabletMetadataFilter());
+
+      // make sure we read all tablets on table initially with no filters
+      testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
+          "Initially, all tablets should be present");
+
+      // Set compacted on e2 but with no flush ID
+      ctmi.mutateTablet(e2).requireAbsentOperation().putCompacted(34L)
+          .submit(tabletMetadata -> false);
+      var results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+      testFilterApplied(context, filter, Set.of(),
+          "Compacted but no flush ID should return no tablets");
+
+      // Set incorrect flush ID on e2
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      ctmi.mutateTablet(e2).requireAbsentOperation().putFlushId(45L)
+          .submit(tabletMetadata -> false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+      testFilterApplied(context, filter, Set.of(),
+          "Compacted with incorrect flush ID should return no tablets");
+
+      // Set correct flush ID on e2
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      ctmi.mutateTablet(e2).requireAbsentOperation()
+          
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata -> 
false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+      testFilterApplied(context, filter, Set.of(e2),
+          "Compacted with correct flush ID should return e2");
+
+      // Set compacted and correct flush ID on e3
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      ctmi.mutateTablet(e3).requireAbsentOperation().putCompacted(987L)
+          
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata -> 
false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e3).getStatus());
+      testFilterApplied(context, filter, Set.of(e2, e3),
+          "Compacted with correct flush ID should return e2 and e3");
+    }
+
+    @Test
+    public void walFilter() {
+      ServerContext context = cluster.getServerContext();
+      ConditionalTabletsMutatorImpl ctmi = new 
ConditionalTabletsMutatorImpl(context);
+      Set<TabletMetadataFilter> filter = Set.of(new HasWalsFilter());
+
+      // make sure we read all tablets on table initially with no filters
+      testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
+          "Initially, all tablets should be present");
+
+      // add a wal to e2
+      String walFilePath =
+          java.nio.file.Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
+      LogEntry wal = LogEntry.fromPath(walFilePath);
+      
ctmi.mutateTablet(e2).requireAbsentOperation().putWal(wal).submit(tabletMetadata
 -> false);
+      var results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+
+      // test that the filter works
+      testFilterApplied(context, filter, Set.of(e2), "Only tablets with wals 
should be returned");
+
+      // add wal to tablet e4
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      walFilePath = java.nio.file.Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
+      wal = LogEntry.fromPath(walFilePath);
+      
ctmi.mutateTablet(e4).requireAbsentOperation().putWal(wal).submit(tabletMetadata
 -> false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e4).getStatus());
+
+      // now, when using the wal filter, should see both e2 and e4
+      testFilterApplied(context, filter, Set.of(e2, e4),
+          "Only tablets with wals should be returned");
+
+      // remove the wal from e4
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      
ctmi.mutateTablet(e4).requireAbsentOperation().deleteWal(wal).submit(tabletMetadata
 -> false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e4).getStatus());
+
+      // test that now only the tablet with a wal is returned when using 
filter()
+      testFilterApplied(context, filter, Set.of(e2), "Only tablets with wals 
should be returned");
+    }
+
+    @Test
+    public void partialFetch() {
+      ServerContext context = cluster.getServerContext();
+      TestTabletMetadataFilter filter = new TestTabletMetadataFilter();
+      // if we only fetch some columns needed by the filter, we should get an 
exception
+      TabletsMetadata.Options options =
+          
context.getAmple().readTablets().forTable(tid).fetch(FLUSH_ID).filter(filter);
+      var ise = assertThrows(IllegalStateException.class, options::build);

Review Comment:
   Its good to test this error case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to