This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new af3d4f1c5f Add filtering to Ample (#3968)
af3d4f1c5f is described below

commit af3d4f1c5fff5e8074498b0469c2e3177d3c79cc
Author: Dom G <domgargu...@apache.org>
AuthorDate: Fri Jan 5 10:41:07 2024 -0500

    Add filtering to Ample (#3968)
    
    
    ---------
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../core/iterators/user/HasCurrentFilter.java      |  44 +++++
 .../user/HasExternalCompactionsFilter.java         |  45 +++++
 .../core/iterators/user/HasWalsFilter.java         |  45 +++++
 .../core/iterators/user/TabletMetadataFilter.java  |  43 ++++
 .../core/metadata/schema/TabletsMetadata.java      |  53 ++++-
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java  |   7 +-
 .../coordinator/CompactionCoordinator.java         |   7 +-
 .../coordinator/DeadCompactionDetector.java        |   6 +-
 .../monitor/rest/tables/TablesResource.java        |  15 +-
 .../test/functional/AmpleConditionalWriterIT.java  | 218 +++++++++++++++++++++
 .../test/functional/TestTabletMetadataFilter.java  |  54 +++++
 11 files changed, 520 insertions(+), 17 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasCurrentFilter.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasCurrentFilter.java
new file mode 100644
index 0000000000..ca58922306
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasCurrentFilter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+
+import com.google.common.collect.Sets;
+
+public class HasCurrentFilter extends TabletMetadataFilter {
+
+  public static final Set<TabletMetadata.ColumnType> COLUMNS =
+      Sets.immutableEnumSet(TabletMetadata.ColumnType.LOCATION);
+
+  private final static Predicate<TabletMetadata> HAS_CURRENT = 
TabletMetadata::hasCurrent;
+
+  @Override
+  public Set<TabletMetadata.ColumnType> getColumns() {
+    return COLUMNS;
+  }
+
+  @Override
+  protected Predicate<TabletMetadata> acceptTablet() {
+    return HAS_CURRENT;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasExternalCompactionsFilter.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasExternalCompactionsFilter.java
new file mode 100644
index 0000000000..fd3b5ae5d6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasExternalCompactionsFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+
+import com.google.common.collect.Sets;
+
+public class HasExternalCompactionsFilter extends TabletMetadataFilter {
+
+  public static final Set<TabletMetadata.ColumnType> COLUMNS =
+      Sets.immutableEnumSet(TabletMetadata.ColumnType.ECOMP);
+
+  private final static Predicate<TabletMetadata> HAS_EXT_COMPS =
+      tabletMetadata -> !tabletMetadata.getExternalCompactions().isEmpty();
+
+  @Override
+  public Set<TabletMetadata.ColumnType> getColumns() {
+    return COLUMNS;
+  }
+
+  @Override
+  protected Predicate<TabletMetadata> acceptTablet() {
+    return HAS_EXT_COMPS;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java 
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java
new file mode 100644
index 0000000000..0563a7c3dc
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+
+import com.google.common.collect.Sets;
+
+public class HasWalsFilter extends TabletMetadataFilter {
+
+  private static final Set<TabletMetadata.ColumnType> COLUMNS =
+      Sets.immutableEnumSet(TabletMetadata.ColumnType.LOGS);
+
+  private final static Predicate<TabletMetadata> HAS_WALS =
+      tabletMetadata -> !tabletMetadata.getLogs().isEmpty();
+
+  @Override
+  public Set<TabletMetadata.ColumnType> getColumns() {
+    return COLUMNS;
+  }
+
+  @Override
+  protected Predicate<TabletMetadata> acceptTablet() {
+    return HAS_WALS;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java
new file mode 100644
index 0000000000..13163aebea
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorAdapter;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+
+public abstract class TabletMetadataFilter extends RowFilter {
+
+  @Override
+  public boolean acceptRow(SortedKeyValueIterator<Key,Value> rowIterator) {
+    TabletMetadata tm = TabletMetadata.convertRow(new 
IteratorAdapter(rowIterator),
+        EnumSet.copyOf(getColumns()), true, false);
+    return acceptTablet().test(tm);
+  }
+
+  public abstract Set<TabletMetadata.ColumnType> getColumns();
+
+  protected abstract Predicate<TabletMetadata> acceptTablet();
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 4e28e46e9d..46b077e626 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -63,6 +63,7 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.iterators.user.TabletMetadataFilter;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
@@ -114,6 +115,7 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
     private TableId tableId;
     private ReadConsistency readConsistency = ReadConsistency.IMMEDIATE;
     private final AccumuloClient _client;
+    private final List<TabletMetadataFilter> tabletMetadataFilters = new 
ArrayList<>();
 
     Builder(AccumuloClient client) {
       this._client = client;
@@ -128,6 +130,22 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
         return buildExtents(_client);
       }
 
+      if (!tabletMetadataFilters.isEmpty()) {
+        checkState(!checkConsistency, "Can not check tablet consistency and 
filter tablets");
+        if (!fetchedCols.isEmpty()) {
+          for (var filter : tabletMetadataFilters) {
+            // This defends against the case where the columns needed by the 
filter were not
+            // fetched. For example, the following code only fetches the file 
column and then
+            // configures the WAL filter which also needs the column for write 
ahead logs.
+            // 
ample.readTablets().forLevel(DataLevel.USER).fetch(ColumnType.FILES).filter(new
+            // HasWalsFilter()).build();
+            checkState(fetchedCols.containsAll(filter.getColumns()),
+                "%s needs cols %s however only %s were fetched", 
filter.getClass().getSimpleName(),
+                filter.getColumns(), fetchedCols);
+          }
+        }
+      }
+
       checkState((level == null) != (table == null),
           "scanTable() cannot be used in conjunction with forLevel(), 
forTable() or forTablet() %s %s",
           level, table);
@@ -168,7 +186,16 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
             scanner.setRanges(ranges);
 
             configureColumns(scanner);
-            IteratorSetting iterSetting = new IteratorSetting(100, 
WholeRowIterator.class);
+            int iteratorPriority = 100;
+
+            for (TabletMetadataFilter tmf : tabletMetadataFilters) {
+              IteratorSetting iterSetting = new 
IteratorSetting(iteratorPriority, tmf.getClass());
+              scanner.addScanIterator(iterSetting);
+              iteratorPriority++;
+            }
+
+            IteratorSetting iterSetting =
+                new IteratorSetting(iteratorPriority, WholeRowIterator.class);
             scanner.addScanIterator(iterSetting);
 
             Iterable<TabletMetadata> tmi = () -> 
Iterators.transform(scanner.iterator(), entry -> {
@@ -239,6 +266,15 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
         configureColumns(scanner);
         Range range1 = scanner.getRange();
 
+        if (!tabletMetadataFilters.isEmpty()) {
+          int iteratorPriority = 100;
+          for (TabletMetadataFilter tmf : tabletMetadataFilters) {
+            iteratorPriority++;
+            IteratorSetting iterSetting = new 
IteratorSetting(iteratorPriority, tmf.getClass());
+            scanner.addScanIterator(iterSetting);
+          }
+        }
+
         Function<Range,Iterator<TabletMetadata>> iterFactory = r -> {
           synchronized (scanner) {
             scanner.setRange(r);
@@ -445,6 +481,12 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
       return this;
     }
 
+    @Override
+    public Options filter(TabletMetadataFilter filter) {
+      this.tabletMetadataFilters.add(filter);
+      return this;
+    }
+
     @Override
     public Options readConsistency(ReadConsistency readConsistency) {
       this.readConsistency = Objects.requireNonNull(readConsistency);
@@ -475,6 +517,15 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
      * {@link ReadConsistency#IMMEDIATE}
      */
     Options readConsistency(ReadConsistency readConsistency);
+
+    /**
+     * Adds a filter to be applied while fetching the data. Filters are 
applied in the order they
+     * are added. This method can be called multiple times to chain multiple 
filters together. The
+     * first filter added has the highest priority and each subsequent filter 
is applied with a
+     * sequentially lower priority. If columns needed by a filter are not 
fetched then a runtime
+     * exception is thrown.
+     */
+    Options filter(TabletMetadataFilter filter);
   }
 
   public interface RangeOptions extends Options {
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 7e55a5274c..3c93187239 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -37,6 +37,7 @@ import java.util.UUID;
 
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.core.iterators.user.HasWalsFilter;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletState;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
@@ -83,11 +84,11 @@ public class GarbageCollectWriteAheadLogs {
     this.liveServers = liveServers;
     this.walMarker = new WalStateManager(context);
     this.store = () -> Iterators.concat(
-        context.getAmple().readTablets().forLevel(DataLevel.ROOT)
+        context.getAmple().readTablets().forLevel(DataLevel.ROOT).filter(new 
HasWalsFilter())
             .fetch(LOCATION, LAST, LOGS, PREV_ROW, 
SUSPEND).checkConsistency().build().iterator(),
-        context.getAmple().readTablets().forLevel(DataLevel.METADATA)
+        
context.getAmple().readTablets().forLevel(DataLevel.METADATA).filter(new 
HasWalsFilter())
             .fetch(LOCATION, LAST, LOGS, PREV_ROW, 
SUSPEND).checkConsistency().build().iterator(),
-        context.getAmple().readTablets().forLevel(DataLevel.USER)
+        context.getAmple().readTablets().forLevel(DataLevel.USER).filter(new 
HasWalsFilter())
             .fetch(LOCATION, LAST, LOGS, PREV_ROW, 
SUSPEND).checkConsistency().build().iterator());
   }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index c03cb3241f..faea953da6 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -78,6 +78,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
 import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 import org.apache.accumulo.core.metadata.AbstractTabletFile;
 import org.apache.accumulo.core.metadata.CompactableFileImpl;
@@ -1226,9 +1227,9 @@ public class CompactionCoordinator
   }
 
   protected Set<ExternalCompactionId> readExternalCompactionIds() {
-    return 
this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER).fetch(ECOMP).build()
-        .stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream())
-        .collect(Collectors.toSet());
+    return this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
+        .filter(new 
HasExternalCompactionsFilter()).fetch(ECOMP).build().stream()
+        .flatMap(tm -> 
tm.getExternalCompactions().keySet().stream()).collect(Collectors.toSet());
   }
 
   /**
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index 2c653158ad..479be63ae7 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
@@ -75,10 +76,11 @@ public class DeadCompactionDetector {
     log.debug("Starting to look for dead compactions");
 
     Map<ExternalCompactionId,KeyExtent> tabletCompactions = new HashMap<>();
-
+    //
     // find what external compactions tablets think are running
     context.getAmple().readTablets().forLevel(DataLevel.USER)
-        .fetch(ColumnType.ECOMP, ColumnType.PREV_ROW).build().forEach(tm -> {
+        .filter(new HasExternalCompactionsFilter()).fetch(ColumnType.ECOMP, 
ColumnType.PREV_ROW)
+        .build().forEach(tm -> {
           tm.getExternalCompactions().keySet().forEach(ecid -> {
             tabletCompactions.put(ecid, tm.getExtent());
           });
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java
index ccc5eb7ca5..89879cfae5 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java
@@ -37,6 +37,7 @@ import jakarta.ws.rs.Produces;
 import jakarta.ws.rs.core.MediaType;
 
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.iterators.user.HasCurrentFilter;
 import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
 import org.apache.accumulo.core.manager.thrift.TableInfo;
@@ -143,16 +144,14 @@ public class TablesResource {
       locs.add(rootTabletLocation);
     } else {
       var level = Ample.DataLevel.of(tableId);
-      try (TabletsMetadata tablets =
-          
monitor.getContext().getAmple().readTablets().forLevel(level).build()) {
+      try (TabletsMetadata tablets = 
monitor.getContext().getAmple().readTablets().forLevel(level)
+          .filter(new HasCurrentFilter()).build()) {
 
         for (TabletMetadata tm : tablets) {
-          if (tm.hasCurrent()) {
-            try {
-              locs.add(tm.getLocation().getHostPort());
-            } catch (Exception ex) {
-              return tabletServers;
-            }
+          try {
+            locs.add(tm.getLocation().getHostPort());
+          } catch (Exception ex) {
+            return tabletServers;
           }
         }
       }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index ebc86f97c8..53e34bd203 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -36,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
@@ -66,6 +67,9 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.iterators.user.HasCurrentFilter;
+import org.apache.accumulo.core.iterators.user.HasWalsFilter;
+import org.apache.accumulo.core.iterators.user.TabletMetadataFilter;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -81,14 +85,17 @@ import 
org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 import org.apache.accumulo.core.metadata.schema.TabletMetadataBuilder;
 import org.apache.accumulo.core.metadata.schema.TabletOperationId;
 import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
 import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.Sets;
@@ -151,6 +158,13 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
 
       assertEquals(Location.future(ts1), 
context.getAmple().readTablet(e1).getLocation());
 
+      try (TabletsMetadata tablets =
+          context.getAmple().readTablets().forTable(tid).filter(new 
HasCurrentFilter()).build()) {
+        List<KeyExtent> actual =
+            
tablets.stream().map(TabletMetadata::getExtent).collect(Collectors.toList());
+        assertEquals(List.of(), actual);
+      }
+
       ctmi = new ConditionalTabletsMutatorImpl(context);
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
           
.putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
@@ -160,6 +174,13 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
 
       assertEquals(Location.current(ts1), 
context.getAmple().readTablet(e1).getLocation());
 
+      try (TabletsMetadata tablets =
+          context.getAmple().readTablets().forTable(tid).filter(new 
HasCurrentFilter()).build()) {
+        List<KeyExtent> actual =
+            
tablets.stream().map(TabletMetadata::getExtent).collect(Collectors.toList());
+        assertEquals(List.of(e1), actual);
+      }
+
       ctmi = new ConditionalTabletsMutatorImpl(context);
       
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
           
.putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
@@ -851,6 +872,203 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
     }
   }
 
+  @Nested
+  public class TestFilter {
+
+    /**
+     * @param filters set of filters to apply to the readTablets operation
+     * @param expectedTablets set of tablets expected to be returned with the 
filters applied
+     */
+    private void testFilterApplied(ServerContext context, 
Set<TabletMetadataFilter> filters,
+        Set<KeyExtent> expectedTablets, String message) {
+      // test with just the needed columns fetched and then with all columns 
fetched. both should
+      // yield the same result
+      addFiltersFetchAndAssert(context, filters, true, expectedTablets, 
message);
+      addFiltersFetchAndAssert(context, filters, false, expectedTablets, 
message);
+    }
+
+    private void addFiltersFetchAndAssert(ServerContext context, 
Set<TabletMetadataFilter> filters,
+        boolean shouldFetchNeededCols, Set<KeyExtent> expectedTablets, String 
message) {
+      TabletsMetadata.TableRangeOptions options = 
context.getAmple().readTablets().forTable(tid);
+      // add the filter(s) to the operation before building
+      for (TabletMetadataFilter filter : filters) {
+        options.filter(filter);
+        // test fetching just the needed columns
+        if (shouldFetchNeededCols) {
+          for (TabletMetadata.ColumnType columnType : filter.getColumns()) {
+            options.fetch(columnType);
+          }
+        }
+      }
+      if (shouldFetchNeededCols) {
+        // if some columns were fetched, also need to fetch PREV_ROW in order 
to use getExtent()
+        options.fetch(PREV_ROW);
+      }
+      try (TabletsMetadata tablets = options.build()) {
+        Set<KeyExtent> actual =
+            
tablets.stream().map(TabletMetadata::getExtent).collect(Collectors.toSet());
+        assertEquals(expectedTablets, actual,
+            message + (shouldFetchNeededCols
+                ? ". Only needed columns were fetched in the readTablets 
operation."
+                : ". All columns were fetched in the readTablets operation."));
+      }
+    }
+
+    @Test
+    public void multipleFilters() {
+      ServerContext context = cluster.getServerContext();
+      ConditionalTabletsMutatorImpl ctmi;
+
+      // make sure we read all tablets on table initially with no filters
+      testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
+          "Initially, all tablets should be present");
+
+      String server = "server1+8555";
+
+      String walFilePath = java.nio.file.Path.of(server, 
UUID.randomUUID().toString()).toString();
+      LogEntry wal = LogEntry.fromPath(walFilePath);
+
+      // add wal compact and flush ID to these tablets
+      final Set<KeyExtent> tabletsWithWalCompactFlush = Set.of(e1, e2, e3);
+      for (KeyExtent ke : tabletsWithWalCompactFlush) {
+        ctmi = new ConditionalTabletsMutatorImpl(context);
+        ctmi.mutateTablet(ke).requireAbsentOperation().putCompacted(34L)
+            .putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).putWal(wal)
+            .submit(tabletMetadata -> false);
+        var results = ctmi.process();
+        assertEquals(Status.ACCEPTED, results.get(ke).getStatus());
+      }
+      // check that applying a combination of filters returns only tablets 
that meet the criteria
+      testFilterApplied(context, Set.of(new TestTabletMetadataFilter(), new 
HasWalsFilter()),
+          tabletsWithWalCompactFlush, "Combination of filters did not return 
the expected tablets");
+
+      TServerInstance serverInstance = new TServerInstance(server, 1L);
+
+      // on a subset of the tablets, put a location
+      final Set<KeyExtent> tabletsWithLocation = Set.of(e2, e3, e4);
+      for (KeyExtent ke : tabletsWithLocation) {
+        ctmi = new ConditionalTabletsMutatorImpl(context);
+        ctmi.mutateTablet(ke).requireAbsentOperation().requireAbsentLocation()
+            
.putLocation(Location.current(serverInstance)).submit(tabletMetadata -> false);
+        var results = ctmi.process();
+        assertEquals(Status.ACCEPTED, results.get(ke).getStatus());
+        assertEquals(Location.current(serverInstance),
+            context.getAmple().readTablet(ke).getLocation(),
+            "Did not see expected location after adding it");
+      }
+
+      // test that the new subset is returned with all 3 filters applied
+      Set<KeyExtent> expected = Sets.intersection(tabletsWithWalCompactFlush, 
tabletsWithLocation);
+      assertFalse(expected.isEmpty());
+      testFilterApplied(context,
+          Set.of(new HasCurrentFilter(), new HasWalsFilter(), new 
TestTabletMetadataFilter()),
+          expected, "Combination of filters did not return the expected 
tablets");
+    }
+
+    @Test
+    public void testCompactedAndFlushIdFilter() {
+      ServerContext context = cluster.getServerContext();
+      ConditionalTabletsMutatorImpl ctmi = new 
ConditionalTabletsMutatorImpl(context);
+      Set<TabletMetadataFilter> filter = Set.of(new 
TestTabletMetadataFilter());
+
+      // make sure we read all tablets on table initially with no filters
+      testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
+          "Initially, all tablets should be present");
+
+      // Set compacted on e2 but with no flush ID
+      ctmi.mutateTablet(e2).requireAbsentOperation().putCompacted(34L)
+          .submit(tabletMetadata -> false);
+      var results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+      testFilterApplied(context, filter, Set.of(),
+          "Compacted but no flush ID should return no tablets");
+
+      // Set incorrect flush ID on e2
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      ctmi.mutateTablet(e2).requireAbsentOperation().putFlushId(45L)
+          .submit(tabletMetadata -> false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+      testFilterApplied(context, filter, Set.of(),
+          "Compacted with incorrect flush ID should return no tablets");
+
+      // Set correct flush ID on e2
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      ctmi.mutateTablet(e2).requireAbsentOperation()
+          
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata -> 
false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+      testFilterApplied(context, filter, Set.of(e2),
+          "Compacted with correct flush ID should return e2");
+
+      // Set compacted and correct flush ID on e3
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      ctmi.mutateTablet(e3).requireAbsentOperation().putCompacted(987L)
+          
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata -> 
false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e3).getStatus());
+      testFilterApplied(context, filter, Set.of(e2, e3),
+          "Compacted with correct flush ID should return e2 and e3");
+    }
+
+    @Test
+    public void walFilter() {
+      ServerContext context = cluster.getServerContext();
+      ConditionalTabletsMutatorImpl ctmi = new 
ConditionalTabletsMutatorImpl(context);
+      Set<TabletMetadataFilter> filter = Set.of(new HasWalsFilter());
+
+      // make sure we read all tablets on table initially with no filters
+      testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
+          "Initially, all tablets should be present");
+
+      // add a wal to e2
+      String walFilePath =
+          java.nio.file.Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
+      LogEntry wal = LogEntry.fromPath(walFilePath);
+      
ctmi.mutateTablet(e2).requireAbsentOperation().putWal(wal).submit(tabletMetadata
 -> false);
+      var results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+
+      // test that the filter works
+      testFilterApplied(context, filter, Set.of(e2), "Only tablets with wals 
should be returned");
+
+      // add wal to tablet e4
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      walFilePath = java.nio.file.Path.of("tserver+8080", 
UUID.randomUUID().toString()).toString();
+      wal = LogEntry.fromPath(walFilePath);
+      
ctmi.mutateTablet(e4).requireAbsentOperation().putWal(wal).submit(tabletMetadata
 -> false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e4).getStatus());
+
+      // now, when using the wal filter, should see both e2 and e4
+      testFilterApplied(context, filter, Set.of(e2, e4),
+          "Only tablets with wals should be returned");
+
+      // remove the wal from e4
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      
ctmi.mutateTablet(e4).requireAbsentOperation().deleteWal(wal).submit(tabletMetadata
 -> false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e4).getStatus());
+
+      // test that now only the tablet with a wal is returned when using 
filter()
+      testFilterApplied(context, filter, Set.of(e2), "Only tablets with wals 
should be returned");
+    }
+
+    @Test
+    public void partialFetch() {
+      ServerContext context = cluster.getServerContext();
+      TestTabletMetadataFilter filter = new TestTabletMetadataFilter();
+      // if we only fetch some columns needed by the filter, we should get an 
exception
+      TabletsMetadata.Options options =
+          
context.getAmple().readTablets().forTable(tid).fetch(FLUSH_ID).filter(filter);
+      var ise = assertThrows(IllegalStateException.class, options::build);
+      String expectedMsg = String.format("%s needs cols %s however only %s 
were fetched",
+          TestTabletMetadataFilter.class.getSimpleName(), filter.getColumns(), 
Set.of(FLUSH_ID));
+      assertTrue(ise.getMessage().contains(expectedMsg));
+    }
+
+  }
+
   @Test
   public void testFlushId() {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TestTabletMetadataFilter.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/TestTabletMetadataFilter.java
new file mode 100644
index 0000000000..1847f7d941
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TestTabletMetadataFilter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.iterators.user.TabletMetadataFilter;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+
+import com.google.common.collect.Sets;
+
+/**
+ * A filter constructed to test filtering in ample. This filter only allows 
tablets with compacted
+ * and with a flush ID.
+ */
+public class TestTabletMetadataFilter extends TabletMetadataFilter {
+
+  public static final long VALID_FLUSH_ID = 44L;
+
+  public static final Set<TabletMetadata.ColumnType> COLUMNS = Sets
+      .immutableEnumSet(TabletMetadata.ColumnType.COMPACTED, 
TabletMetadata.ColumnType.FLUSH_ID);
+
+  private final static Predicate<TabletMetadata> TEST =
+      tabletMetadata -> !tabletMetadata.getCompacted().isEmpty()
+          && 
tabletMetadata.getFlushId().equals(OptionalLong.of(VALID_FLUSH_ID));
+
+  @Override
+  public Set<TabletMetadata.ColumnType> getColumns() {
+    return COLUMNS;
+  }
+
+  @Override
+  protected Predicate<TabletMetadata> acceptTablet() {
+    return TEST;
+  }
+}


Reply via email to