This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new af3d4f1c5f Add filtering to Ample (#3968) af3d4f1c5f is described below commit af3d4f1c5fff5e8074498b0469c2e3177d3c79cc Author: Dom G <domgargu...@apache.org> AuthorDate: Fri Jan 5 10:41:07 2024 -0500 Add filtering to Ample (#3968) --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../core/iterators/user/HasCurrentFilter.java | 44 +++++ .../user/HasExternalCompactionsFilter.java | 45 +++++ .../core/iterators/user/HasWalsFilter.java | 45 +++++ .../core/iterators/user/TabletMetadataFilter.java | 43 ++++ .../core/metadata/schema/TabletsMetadata.java | 53 ++++- .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 7 +- .../coordinator/CompactionCoordinator.java | 7 +- .../coordinator/DeadCompactionDetector.java | 6 +- .../monitor/rest/tables/TablesResource.java | 15 +- .../test/functional/AmpleConditionalWriterIT.java | 218 +++++++++++++++++++++ .../test/functional/TestTabletMetadataFilter.java | 54 +++++ 11 files changed, 520 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasCurrentFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasCurrentFilter.java new file mode 100644 index 0000000000..ca58922306 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasCurrentFilter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.iterators.user; + +import java.util.Set; +import java.util.function.Predicate; + +import org.apache.accumulo.core.metadata.schema.TabletMetadata; + +import com.google.common.collect.Sets; + +public class HasCurrentFilter extends TabletMetadataFilter { + + public static final Set<TabletMetadata.ColumnType> COLUMNS = + Sets.immutableEnumSet(TabletMetadata.ColumnType.LOCATION); + + private final static Predicate<TabletMetadata> HAS_CURRENT = TabletMetadata::hasCurrent; + + @Override + public Set<TabletMetadata.ColumnType> getColumns() { + return COLUMNS; + } + + @Override + protected Predicate<TabletMetadata> acceptTablet() { + return HAS_CURRENT; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasExternalCompactionsFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasExternalCompactionsFilter.java new file mode 100644 index 0000000000..fd3b5ae5d6 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasExternalCompactionsFilter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.iterators.user; + +import java.util.Set; +import java.util.function.Predicate; + +import org.apache.accumulo.core.metadata.schema.TabletMetadata; + +import com.google.common.collect.Sets; + +public class HasExternalCompactionsFilter extends TabletMetadataFilter { + + public static final Set<TabletMetadata.ColumnType> COLUMNS = + Sets.immutableEnumSet(TabletMetadata.ColumnType.ECOMP); + + private final static Predicate<TabletMetadata> HAS_EXT_COMPS = + tabletMetadata -> !tabletMetadata.getExternalCompactions().isEmpty(); + + @Override + public Set<TabletMetadata.ColumnType> getColumns() { + return COLUMNS; + } + + @Override + protected Predicate<TabletMetadata> acceptTablet() { + return HAS_EXT_COMPS; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java new file mode 100644 index 0000000000..0563a7c3dc --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.iterators.user; + +import java.util.Set; +import java.util.function.Predicate; + +import org.apache.accumulo.core.metadata.schema.TabletMetadata; + +import com.google.common.collect.Sets; + +public class HasWalsFilter extends TabletMetadataFilter { + + private static final Set<TabletMetadata.ColumnType> COLUMNS = + Sets.immutableEnumSet(TabletMetadata.ColumnType.LOGS); + + private final static Predicate<TabletMetadata> HAS_WALS = + tabletMetadata -> !tabletMetadata.getLogs().isEmpty(); + + @Override + public Set<TabletMetadata.ColumnType> getColumns() { + return COLUMNS; + } + + @Override + protected Predicate<TabletMetadata> acceptTablet() { + return HAS_WALS; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java new file mode 100644 index 0000000000..13163aebea --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.iterators.user; + +import java.util.EnumSet; +import java.util.Set; +import java.util.function.Predicate; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorAdapter; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; + +public abstract class TabletMetadataFilter extends RowFilter { + + @Override + public boolean acceptRow(SortedKeyValueIterator<Key,Value> rowIterator) { + TabletMetadata tm = TabletMetadata.convertRow(new IteratorAdapter(rowIterator), + EnumSet.copyOf(getColumns()), true, false); + return acceptTablet().test(tm); + } + + public abstract Set<TabletMetadata.ColumnType> getColumns(); + + protected abstract Predicate<TabletMetadata> acceptTablet(); +} diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index 4e28e46e9d..46b077e626 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -63,6 +63,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReader; +import org.apache.accumulo.core.iterators.user.TabletMetadataFilter; import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; @@ -114,6 +115,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable private TableId tableId; private ReadConsistency readConsistency = ReadConsistency.IMMEDIATE; private final AccumuloClient _client; + private final List<TabletMetadataFilter> tabletMetadataFilters = new ArrayList<>(); Builder(AccumuloClient client) { this._client = client; @@ -128,6 +130,22 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable return buildExtents(_client); } + if (!tabletMetadataFilters.isEmpty()) { + checkState(!checkConsistency, "Can not check tablet consistency and filter tablets"); + if (!fetchedCols.isEmpty()) { + for (var filter : tabletMetadataFilters) { + // This defends against the case where the columns needed by the filter were not + // fetched. For example, the following code only fetches the file column and then + // configures the WAL filter which also needs the column for write ahead logs. + // ample.readTablets().forLevel(DataLevel.USER).fetch(ColumnType.FILES).filter(new + // HasWalsFilter()).build(); + checkState(fetchedCols.containsAll(filter.getColumns()), + "%s needs cols %s however only %s were fetched", filter.getClass().getSimpleName(), + filter.getColumns(), fetchedCols); + } + } + } + checkState((level == null) != (table == null), "scanTable() cannot be used in conjunction with forLevel(), forTable() or forTablet() %s %s", level, table); @@ -168,7 +186,16 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable scanner.setRanges(ranges); configureColumns(scanner); - IteratorSetting iterSetting = new IteratorSetting(100, WholeRowIterator.class); + int iteratorPriority = 100; + + for (TabletMetadataFilter tmf : tabletMetadataFilters) { + IteratorSetting iterSetting = new IteratorSetting(iteratorPriority, tmf.getClass()); + scanner.addScanIterator(iterSetting); + iteratorPriority++; + } + + IteratorSetting iterSetting = + new IteratorSetting(iteratorPriority, WholeRowIterator.class); scanner.addScanIterator(iterSetting); Iterable<TabletMetadata> tmi = () -> Iterators.transform(scanner.iterator(), entry -> { @@ -239,6 +266,15 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable configureColumns(scanner); Range range1 = scanner.getRange(); + if (!tabletMetadataFilters.isEmpty()) { + int iteratorPriority = 100; + for (TabletMetadataFilter tmf : tabletMetadataFilters) { + iteratorPriority++; + IteratorSetting iterSetting = new IteratorSetting(iteratorPriority, tmf.getClass()); + scanner.addScanIterator(iterSetting); + } + } + Function<Range,Iterator<TabletMetadata>> iterFactory = r -> { synchronized (scanner) { scanner.setRange(r); @@ -445,6 +481,12 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable return this; } + @Override + public Options filter(TabletMetadataFilter filter) { + this.tabletMetadataFilters.add(filter); + return this; + } + @Override public Options readConsistency(ReadConsistency readConsistency) { this.readConsistency = Objects.requireNonNull(readConsistency); @@ -475,6 +517,15 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable * {@link ReadConsistency#IMMEDIATE} */ Options readConsistency(ReadConsistency readConsistency); + + /** + * Adds a filter to be applied while fetching the data. Filters are applied in the order they + * are added. This method can be called multiple times to chain multiple filters together. The + * first filter added has the highest priority and each subsequent filter is applied with a + * sequentially lower priority. If columns needed by a filter are not fetched then a runtime + * exception is thrown. + */ + Options filter(TabletMetadataFilter filter); } public interface RangeOptions extends Options { diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 7e55a5274c..3c93187239 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -37,6 +37,7 @@ import java.util.UUID; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; +import org.apache.accumulo.core.iterators.user.HasWalsFilter; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; @@ -83,11 +84,11 @@ public class GarbageCollectWriteAheadLogs { this.liveServers = liveServers; this.walMarker = new WalStateManager(context); this.store = () -> Iterators.concat( - context.getAmple().readTablets().forLevel(DataLevel.ROOT) + context.getAmple().readTablets().forLevel(DataLevel.ROOT).filter(new HasWalsFilter()) .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).checkConsistency().build().iterator(), - context.getAmple().readTablets().forLevel(DataLevel.METADATA) + context.getAmple().readTablets().forLevel(DataLevel.METADATA).filter(new HasWalsFilter()) .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).checkConsistency().build().iterator(), - context.getAmple().readTablets().forLevel(DataLevel.USER) + context.getAmple().readTablets().forLevel(DataLevel.USER).filter(new HasWalsFilter()) .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).checkConsistency().build().iterator()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index c03cb3241f..faea953da6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -78,6 +78,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.metadata.AbstractTabletFile; import org.apache.accumulo.core.metadata.CompactableFileImpl; @@ -1226,9 +1227,9 @@ public class CompactionCoordinator } protected Set<ExternalCompactionId> readExternalCompactionIds() { - return this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER).fetch(ECOMP).build() - .stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream()) - .collect(Collectors.toSet()); + return this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER) + .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build().stream() + .flatMap(tm -> tm.getExternalCompactions().keySet().stream()).collect(Collectors.toSet()); } /** diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index 2c653158ad..479be63ae7 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -32,6 +32,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -75,10 +76,11 @@ public class DeadCompactionDetector { log.debug("Starting to look for dead compactions"); Map<ExternalCompactionId,KeyExtent> tabletCompactions = new HashMap<>(); - + // // find what external compactions tablets think are running context.getAmple().readTablets().forLevel(DataLevel.USER) - .fetch(ColumnType.ECOMP, ColumnType.PREV_ROW).build().forEach(tm -> { + .filter(new HasExternalCompactionsFilter()).fetch(ColumnType.ECOMP, ColumnType.PREV_ROW) + .build().forEach(tm -> { tm.getExternalCompactions().keySet().forEach(ecid -> { tabletCompactions.put(ecid, tm.getExtent()); }); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java index ccc5eb7ca5..89879cfae5 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java @@ -37,6 +37,7 @@ import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.iterators.user.HasCurrentFilter; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.manager.thrift.TableInfo; @@ -143,16 +144,14 @@ public class TablesResource { locs.add(rootTabletLocation); } else { var level = Ample.DataLevel.of(tableId); - try (TabletsMetadata tablets = - monitor.getContext().getAmple().readTablets().forLevel(level).build()) { + try (TabletsMetadata tablets = monitor.getContext().getAmple().readTablets().forLevel(level) + .filter(new HasCurrentFilter()).build()) { for (TabletMetadata tm : tablets) { - if (tm.hasCurrent()) { - try { - locs.add(tm.getLocation().getHostPort()); - } catch (Exception ex) { - return tabletServers; - } + try { + locs.add(tm.getLocation().getHostPort()); + } catch (Exception ex) { + return tabletServers; } } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index ebc86f97c8..53e34bd203 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -36,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; @@ -66,6 +67,9 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.iterators.user.HasCurrentFilter; +import org.apache.accumulo.core.iterators.user.HasWalsFilter; +import org.apache.accumulo.core.iterators.user.TabletMetadataFilter; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -81,14 +85,17 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.metadata.schema.TabletMetadataBuilder; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl; import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import com.google.common.collect.Sets; @@ -151,6 +158,13 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { assertEquals(Location.future(ts1), context.getAmple().readTablet(e1).getLocation()); + try (TabletsMetadata tablets = + context.getAmple().readTablets().forTable(tid).filter(new HasCurrentFilter()).build()) { + List<KeyExtent> actual = + tablets.stream().map(TabletMetadata::getExtent).collect(Collectors.toList()); + assertEquals(List.of(), actual); + } + ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1)) .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1)) @@ -160,6 +174,13 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { assertEquals(Location.current(ts1), context.getAmple().readTablet(e1).getLocation()); + try (TabletsMetadata tablets = + context.getAmple().readTablets().forTable(tid).filter(new HasCurrentFilter()).build()) { + List<KeyExtent> actual = + tablets.stream().map(TabletMetadata::getExtent).collect(Collectors.toList()); + assertEquals(List.of(e1), actual); + } + ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1)) .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1)) @@ -851,6 +872,203 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { } } + @Nested + public class TestFilter { + + /** + * @param filters set of filters to apply to the readTablets operation + * @param expectedTablets set of tablets expected to be returned with the filters applied + */ + private void testFilterApplied(ServerContext context, Set<TabletMetadataFilter> filters, + Set<KeyExtent> expectedTablets, String message) { + // test with just the needed columns fetched and then with all columns fetched. both should + // yield the same result + addFiltersFetchAndAssert(context, filters, true, expectedTablets, message); + addFiltersFetchAndAssert(context, filters, false, expectedTablets, message); + } + + private void addFiltersFetchAndAssert(ServerContext context, Set<TabletMetadataFilter> filters, + boolean shouldFetchNeededCols, Set<KeyExtent> expectedTablets, String message) { + TabletsMetadata.TableRangeOptions options = context.getAmple().readTablets().forTable(tid); + // add the filter(s) to the operation before building + for (TabletMetadataFilter filter : filters) { + options.filter(filter); + // test fetching just the needed columns + if (shouldFetchNeededCols) { + for (TabletMetadata.ColumnType columnType : filter.getColumns()) { + options.fetch(columnType); + } + } + } + if (shouldFetchNeededCols) { + // if some columns were fetched, also need to fetch PREV_ROW in order to use getExtent() + options.fetch(PREV_ROW); + } + try (TabletsMetadata tablets = options.build()) { + Set<KeyExtent> actual = + tablets.stream().map(TabletMetadata::getExtent).collect(Collectors.toSet()); + assertEquals(expectedTablets, actual, + message + (shouldFetchNeededCols + ? ". Only needed columns were fetched in the readTablets operation." + : ". All columns were fetched in the readTablets operation.")); + } + } + + @Test + public void multipleFilters() { + ServerContext context = cluster.getServerContext(); + ConditionalTabletsMutatorImpl ctmi; + + // make sure we read all tablets on table initially with no filters + testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4), + "Initially, all tablets should be present"); + + String server = "server1+8555"; + + String walFilePath = java.nio.file.Path.of(server, UUID.randomUUID().toString()).toString(); + LogEntry wal = LogEntry.fromPath(walFilePath); + + // add wal compact and flush ID to these tablets + final Set<KeyExtent> tabletsWithWalCompactFlush = Set.of(e1, e2, e3); + for (KeyExtent ke : tabletsWithWalCompactFlush) { + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(ke).requireAbsentOperation().putCompacted(34L) + .putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).putWal(wal) + .submit(tabletMetadata -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(ke).getStatus()); + } + // check that applying a combination of filters returns only tablets that meet the criteria + testFilterApplied(context, Set.of(new TestTabletMetadataFilter(), new HasWalsFilter()), + tabletsWithWalCompactFlush, "Combination of filters did not return the expected tablets"); + + TServerInstance serverInstance = new TServerInstance(server, 1L); + + // on a subset of the tablets, put a location + final Set<KeyExtent> tabletsWithLocation = Set.of(e2, e3, e4); + for (KeyExtent ke : tabletsWithLocation) { + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(ke).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.current(serverInstance)).submit(tabletMetadata -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(ke).getStatus()); + assertEquals(Location.current(serverInstance), + context.getAmple().readTablet(ke).getLocation(), + "Did not see expected location after adding it"); + } + + // test that the new subset is returned with all 3 filters applied + Set<KeyExtent> expected = Sets.intersection(tabletsWithWalCompactFlush, tabletsWithLocation); + assertFalse(expected.isEmpty()); + testFilterApplied(context, + Set.of(new HasCurrentFilter(), new HasWalsFilter(), new TestTabletMetadataFilter()), + expected, "Combination of filters did not return the expected tablets"); + } + + @Test + public void testCompactedAndFlushIdFilter() { + ServerContext context = cluster.getServerContext(); + ConditionalTabletsMutatorImpl ctmi = new ConditionalTabletsMutatorImpl(context); + Set<TabletMetadataFilter> filter = Set.of(new TestTabletMetadataFilter()); + + // make sure we read all tablets on table initially with no filters + testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4), + "Initially, all tablets should be present"); + + // Set compacted on e2 but with no flush ID + ctmi.mutateTablet(e2).requireAbsentOperation().putCompacted(34L) + .submit(tabletMetadata -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); + testFilterApplied(context, filter, Set.of(), + "Compacted but no flush ID should return no tablets"); + + // Set incorrect flush ID on e2 + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e2).requireAbsentOperation().putFlushId(45L) + .submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); + testFilterApplied(context, filter, Set.of(), + "Compacted with incorrect flush ID should return no tablets"); + + // Set correct flush ID on e2 + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e2).requireAbsentOperation() + .putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); + testFilterApplied(context, filter, Set.of(e2), + "Compacted with correct flush ID should return e2"); + + // Set compacted and correct flush ID on e3 + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e3).requireAbsentOperation().putCompacted(987L) + .putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e3).getStatus()); + testFilterApplied(context, filter, Set.of(e2, e3), + "Compacted with correct flush ID should return e2 and e3"); + } + + @Test + public void walFilter() { + ServerContext context = cluster.getServerContext(); + ConditionalTabletsMutatorImpl ctmi = new ConditionalTabletsMutatorImpl(context); + Set<TabletMetadataFilter> filter = Set.of(new HasWalsFilter()); + + // make sure we read all tablets on table initially with no filters + testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4), + "Initially, all tablets should be present"); + + // add a wal to e2 + String walFilePath = + java.nio.file.Path.of("tserver+8080", UUID.randomUUID().toString()).toString(); + LogEntry wal = LogEntry.fromPath(walFilePath); + ctmi.mutateTablet(e2).requireAbsentOperation().putWal(wal).submit(tabletMetadata -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); + + // test that the filter works + testFilterApplied(context, filter, Set.of(e2), "Only tablets with wals should be returned"); + + // add wal to tablet e4 + ctmi = new ConditionalTabletsMutatorImpl(context); + walFilePath = java.nio.file.Path.of("tserver+8080", UUID.randomUUID().toString()).toString(); + wal = LogEntry.fromPath(walFilePath); + ctmi.mutateTablet(e4).requireAbsentOperation().putWal(wal).submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e4).getStatus()); + + // now, when using the wal filter, should see both e2 and e4 + testFilterApplied(context, filter, Set.of(e2, e4), + "Only tablets with wals should be returned"); + + // remove the wal from e4 + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e4).requireAbsentOperation().deleteWal(wal).submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e4).getStatus()); + + // test that now only the tablet with a wal is returned when using filter() + testFilterApplied(context, filter, Set.of(e2), "Only tablets with wals should be returned"); + } + + @Test + public void partialFetch() { + ServerContext context = cluster.getServerContext(); + TestTabletMetadataFilter filter = new TestTabletMetadataFilter(); + // if we only fetch some columns needed by the filter, we should get an exception + TabletsMetadata.Options options = + context.getAmple().readTablets().forTable(tid).fetch(FLUSH_ID).filter(filter); + var ise = assertThrows(IllegalStateException.class, options::build); + String expectedMsg = String.format("%s needs cols %s however only %s were fetched", + TestTabletMetadataFilter.class.getSimpleName(), filter.getColumns(), Set.of(FLUSH_ID)); + assertTrue(ise.getMessage().contains(expectedMsg)); + } + + } + @Test public void testFlushId() { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TestTabletMetadataFilter.java b/test/src/main/java/org/apache/accumulo/test/functional/TestTabletMetadataFilter.java new file mode 100644 index 0000000000..1847f7d941 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/TestTabletMetadataFilter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import java.util.OptionalLong; +import java.util.Set; +import java.util.function.Predicate; + +import org.apache.accumulo.core.iterators.user.TabletMetadataFilter; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; + +import com.google.common.collect.Sets; + +/** + * A filter constructed to test filtering in ample. This filter only allows tablets with compacted + * and with a flush ID. + */ +public class TestTabletMetadataFilter extends TabletMetadataFilter { + + public static final long VALID_FLUSH_ID = 44L; + + public static final Set<TabletMetadata.ColumnType> COLUMNS = Sets + .immutableEnumSet(TabletMetadata.ColumnType.COMPACTED, TabletMetadata.ColumnType.FLUSH_ID); + + private final static Predicate<TabletMetadata> TEST = + tabletMetadata -> !tabletMetadata.getCompacted().isEmpty() + && tabletMetadata.getFlushId().equals(OptionalLong.of(VALID_FLUSH_ID)); + + @Override + public Set<TabletMetadata.ColumnType> getColumns() { + return COLUMNS; + } + + @Override + protected Predicate<TabletMetadata> acceptTablet() { + return TEST; + } +}