This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new a1f1fb85b2 Removes external compaction final state metadata (#3565) a1f1fb85b2 is described below commit a1f1fb85b2e8dd715b04f2676ad83ac035815de8 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Jul 5 15:03:32 2023 -0400 Removes external compaction final state metadata (#3565) Removes metadata related to how the external compaction commit process used to work. fixes #3465 --- .../accumulo/core/metadata/schema/Ample.java | 14 --- .../schema/ExternalCompactionFinalState.java | 139 --------------------- .../core/metadata/schema/MetadataSchema.java | 13 -- .../ClusterServerConfiguration.java | 2 +- .../accumulo/server/metadata/ServerAmpleImpl.java | 52 -------- .../coordinator/DeadCompactionDetector.java | 2 +- .../accumulo/manager/upgrade/Upgrader11to12.java | 70 ++++++++++- .../accumulo/tserver/tablet/CompactableImpl.java | 8 +- .../test/compaction/ExternalCompactionTServer.java | 47 ------- .../compaction/ExternalCompactionTestUtils.java | 27 ++-- .../test/compaction/ExternalCompaction_1_IT.java | 136 +++----------------- .../test/compaction/ExternalCompaction_2_IT.java | 95 -------------- 12 files changed, 111 insertions(+), 494 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 78a047dabe..082296446a 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -214,20 +214,6 @@ public interface Ample { throw new UnsupportedOperationException(); } - default void - putExternalCompactionFinalStates(Collection<ExternalCompactionFinalState> finalStates) { - throw new UnsupportedOperationException(); - } - - default Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() { - throw new UnsupportedOperationException(); - } - - default void - deleteExternalCompactionFinalStates(Collection<ExternalCompactionId> statusesToDelete) { - throw new UnsupportedOperationException(); - } - /** * Return an encoded delete marker Mutation to delete the specified TabletFile path. A * ReferenceFile is used for the parameter because the Garbage Collector is optimized to store a diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java deleted file mode 100644 index 907982f2ed..0000000000 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.metadata.schema; - -import static org.apache.accumulo.core.util.LazySingletons.GSON; - -import java.util.Base64; - -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.util.TextUtil; -import org.apache.hadoop.io.Text; - -import com.google.common.base.Preconditions; - -// ELASTICITY_TODO remove this class, remove it from ample, add upgrade code to remove it from metadata table -public class ExternalCompactionFinalState { - - public enum FinalState { - FINISHED, FAILED - } - - private final ExternalCompactionId ecid; - private final KeyExtent extent; - private final FinalState state; - private final long fileSize; - private final long fileEntries; - - public ExternalCompactionFinalState(ExternalCompactionId ecid, KeyExtent extent, FinalState state, - long fileSize, long fileEntries) { - this.ecid = ecid; - this.extent = extent; - this.state = state; - this.fileSize = fileSize; - this.fileEntries = fileEntries; - } - - public ExternalCompactionId getExternalCompactionId() { - return ecid; - } - - public FinalState getFinalState() { - return state; - } - - public KeyExtent getExtent() { - return extent; - } - - public long getFileSize() { - Preconditions.checkState(state == FinalState.FINISHED); - return fileSize; - } - - public long getEntries() { - Preconditions.checkState(state == FinalState.FINISHED); - return fileEntries; - } - - // This class is used to serialize and deserialize this class using GSon. Any changes to this - // class must consider persisted data. - private static class Extent { - - final String tableId; - final String er; - final String per; - - Extent(KeyExtent extent) { - this.tableId = extent.tableId().canonical(); - if (extent.endRow() != null) { - er = Base64.getEncoder().encodeToString(TextUtil.getBytes(extent.endRow())); - } else { - er = null; - } - - if (extent.prevEndRow() != null) { - per = Base64.getEncoder().encodeToString(TextUtil.getBytes(extent.prevEndRow())); - } else { - per = null; - } - } - - private Text decode(String s) { - if (s == null) { - return null; - } - return new Text(Base64.getDecoder().decode(s)); - } - - KeyExtent toKeyExtent() { - return new KeyExtent(TableId.of(tableId), decode(er), decode(per)); - } - } - - // This class is used to serialize and deserialize this class using GSon. Any changes to this - // class must consider persisted data. - private static class JsonData { - Extent extent; - String state; - long fileSize; - long entries; - } - - public String toJson() { - JsonData jd = new JsonData(); - jd.state = state.name(); - jd.fileSize = fileSize; - jd.entries = fileEntries; - jd.extent = new Extent(extent); - return GSON.get().toJson(jd); - } - - public static ExternalCompactionFinalState fromJson(ExternalCompactionId ecid, String json) { - JsonData jd = GSON.get().fromJson(json, JsonData.class); - return new ExternalCompactionFinalState(ecid, jd.extent.toKeyExtent(), - FinalState.valueOf(jd.state), jd.fileSize, jd.entries); - } - - @Override - public String toString() { - return toJson(); - } -} diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index c42c088b6a..92725fabc0 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -470,19 +470,6 @@ public class MetadataSchema { } - public static class ExternalCompactionSection { - private static final Section section = - new Section(RESERVED_PREFIX + "ecomp", true, RESERVED_PREFIX + "ecomq", false); - - public static Range getRange() { - return section.getRange(); - } - - public static String getRowPrefix() { - return section.getRowPrefix(); - } - } - public static class ScanServerFileReferenceSection { private static final Section section = new Section(RESERVED_PREFIX + "sserv", true, RESERVED_PREFIX + "sserx", false); diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java index 05e37b5584..3a5d93e34a 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java @@ -96,7 +96,7 @@ public class ClusterServerConfiguration { while (iter.hasNext()) { String resourceGroup = iter.next(); if (!resourceGroup.equals(ServiceLockData.ServiceDescriptor.DEFAULT_GROUP_NAME)) { - compactors.remove(resourceGroup); + iter.remove(); } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index 25f4ff12eb..dd9ea397c4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -50,12 +50,9 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.ValidationUtil; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.AmpleImpl; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection.SkewedKeyValue; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.ExternalCompactionSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection; import org.apache.accumulo.core.metadata.schema.TabletMutatorBase; import org.apache.accumulo.core.security.Authorizations; @@ -246,55 +243,6 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { return delFlag; } - @Override - public void - putExternalCompactionFinalStates(Collection<ExternalCompactionFinalState> finalStates) { - try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) { - String prefix = ExternalCompactionSection.getRowPrefix(); - for (ExternalCompactionFinalState finalState : finalStates) { - Mutation m = new Mutation(prefix + finalState.getExternalCompactionId().canonical()); - m.put("", "", finalState.toJson()); - writer.addMutation(m); - } - } catch (MutationsRejectedException | TableNotFoundException e) { - throw new IllegalStateException(e); - } - } - - @Override - public Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() { - Scanner scanner; - try { - scanner = context.createScanner(DataLevel.USER.metaTable(), Authorizations.EMPTY); - } catch (TableNotFoundException e) { - throw new IllegalStateException(e); - } - - scanner.setRange(ExternalCompactionSection.getRange()); - int pLen = ExternalCompactionSection.getRowPrefix().length(); - return scanner.stream() - .map(e -> ExternalCompactionFinalState.fromJson( - ExternalCompactionId.of(e.getKey().getRowData().toString().substring(pLen)), - e.getValue().toString())); - } - - @Override - public void - deleteExternalCompactionFinalStates(Collection<ExternalCompactionId> statusesToDelete) { - try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) { - String prefix = ExternalCompactionSection.getRowPrefix(); - for (ExternalCompactionId ecid : statusesToDelete) { - Mutation m = new Mutation(prefix + ecid.canonical()); - m.putDelete(EMPTY_TEXT, EMPTY_TEXT); - writer.addMutation(m); - } - log.debug("Deleted external compaction final state entries for external compactions: {}", - statusesToDelete); - } catch (MutationsRejectedException | TableNotFoundException e) { - throw new IllegalStateException(e); - } - } - @Override public void putScanServerFileReferences(Collection<ScanServerRefTabletFile> scanRefs) { try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index b8bd5b4562..98540e1b93 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -98,7 +98,7 @@ public class DeadCompactionDetector { running.forEach((ecid) -> { if (tabletCompactions.remove(ecid) != null) { - log.debug("Removed compaction {} running on a compactor", ecid); + log.debug("Ignoring compaction {} that is running on a compactor", ecid); } if (this.deadCompactions.remove(ecid) != null) { log.debug("Removed {} from the dead compaction map, it's running on a compactor", ecid); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java index 1d5a56804d..f1d11712d0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java @@ -18,18 +18,30 @@ */ package org.apache.accumulo.manager.upgrade; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.RESERVED_PREFIX; + +import java.util.Map; + import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.schema.Section; import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class Upgrader11to12 implements Upgrader { private static final Logger LOG = LoggerFactory.getLogger(Upgrader11to12.class); @@ -50,6 +62,33 @@ public class Upgrader11to12 implements Upgrader { public void upgradeMetadata(ServerContext context) { LOG.info("setting hosting goal on user tables"); addHostingGoalToUserTables(context); + deleteExternalCompactionFinalStates(context); + deleteExternalCompactions(context); + } + + private void deleteExternalCompactionFinalStates(ServerContext context) { + // This metadata was only written for user tablets as part of the compaction commit process. + // Compactions are committed in a completely different way now, so delete these entries. Its + // possible some completed compactions may need to be redone, but processing these entries would + // not be easy to test so its better for correctness to delete them and redo the work. + try (var scanner = context.createScanner(MetadataTable.NAME); + var writer = context.createBatchWriter(MetadataTable.NAME)) { + var section = new Section(RESERVED_PREFIX + "ecomp", true, RESERVED_PREFIX + "ecomq", false); + scanner.setRange(section.getRange()); + + for (Map.Entry<Key,Value> entry : scanner) { + var key = entry.getKey(); + var row = key.getRow(); + Preconditions.checkState(row.toString().startsWith(section.getRowPrefix())); + Mutation m = new Mutation(row); + Preconditions.checkState(key.getColumnVisibilityData().length() == 0, + "Expected empty visibility, saw %s ", key.getColumnVisibilityData()); + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); + writer.addMutation(m); + } + } catch (Exception e) { + throw new IllegalStateException(e); + } } private void addHostingGoalToSystemTable(ServerContext context, TableId tableId) { @@ -57,7 +96,8 @@ public class Upgrader11to12 implements Upgrader { TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).fetch(ColumnType.PREV_ROW).build(); TabletsMutator mut = context.getAmple().mutateTablets()) { - tm.forEach(t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ALWAYS)); + tm.forEach( + t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ALWAYS).mutate()); } } @@ -74,8 +114,34 @@ public class Upgrader11to12 implements Upgrader { TabletsMetadata tm = context.getAmple().readTablets().forLevel(DataLevel.USER) .fetch(ColumnType.PREV_ROW).build(); TabletsMutator mut = context.getAmple().mutateTablets()) { - tm.forEach(t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ONDEMAND)); + tm.forEach( + t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ONDEMAND).mutate()); } } + private void deleteExternalCompactions(ServerContext context) { + // External compactions were only written for user tablets in 3.x and earlier, so only need to + // process the metadata table. The metadata related to an external compaction has changed so + // delete any that exists. Not using Ample in case there are problems deserializing the old + // external compaction metadata. + try (var scanner = context.createScanner(MetadataTable.NAME); + var writer = context.createBatchWriter(MetadataTable.NAME)) { + scanner.setRange(TabletsSection.getRange()); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + + for (Map.Entry<Key,Value> entry : scanner) { + var key = entry.getKey(); + Mutation m = new Mutation(key.getRow()); + Preconditions.checkState(key.getColumnFamily().equals(ExternalCompactionColumnFamily.NAME), + "Expected family %s, saw %s ", ExternalCompactionColumnFamily.NAME, + key.getColumnVisibilityData()); + Preconditions.checkState(key.getColumnVisibilityData().length() == 0, + "Expected empty visibility, saw %s ", key.getColumnVisibilityData()); + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); + writer.addMutation(m); + } + } catch (Exception e) { + throw new IllegalStateException(e); + } + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index ec3ed5d71c..aafea306d1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -1410,7 +1410,9 @@ public class CompactableImpl implements Compactable { extCompactionId); } - tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId)); + throw new UnsupportedOperationException( + "This code no longer functions properly and needs to be removed"); + // tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId)); } finally { synchronized (this) { Preconditions.checkState(externalCompactionsCommitting.remove(extCompactionId)); @@ -1445,7 +1447,9 @@ public class CompactableImpl implements Compactable { log.debug("Ignoring request to fail external compaction that is unknown {}", ecid); } - tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(ecid)); + throw new UnsupportedOperationException( + "This code no longer functions properly and needs to be removed"); + // tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(ecid)); } finally { synchronized (this) { Preconditions.checkState(externalCompactionsCommitting.remove(ecid)); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTServer.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTServer.java deleted file mode 100644 index 63fa29e156..0000000000 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTServer.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test.compaction; - -import org.apache.accumulo.core.cli.ConfigOpts; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; -import org.apache.accumulo.tserver.TabletClientHandler; -import org.apache.accumulo.tserver.TabletServer; -import org.apache.accumulo.tserver.WriteTracker; - -public class ExternalCompactionTServer extends TabletServer { - - ExternalCompactionTServer(ConfigOpts opts, String[] args) { - super(opts, args); - } - - @Override - protected TabletClientHandler newTabletClientHandler(TransactionWatcher watcher, - WriteTracker writeTracker) { - return new NonCommittingExternalCompactionTabletClientHandler(this, watcher, writeTracker); - } - - public static void main(String[] args) throws Exception { - try ( - ExternalCompactionTServer tserver = new ExternalCompactionTServer(new ConfigOpts(), args)) { - tserver.runServer(); - } - - } - -} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index c0563c9187..26ad0c0f79 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -20,6 +20,7 @@ package org.apache.accumulo.test.compaction; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.Collections; @@ -31,9 +32,8 @@ import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.stream.Stream; +import java.util.stream.Collectors; -import org.apache.accumulo.cluster.AccumuloCluster; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -54,7 +54,6 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; @@ -68,6 +67,7 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.TestFilter; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; @@ -93,12 +93,6 @@ public class ExternalCompactionTestUtils { return String.format("r:%04d", r); } - public static Stream<ExternalCompactionFinalState> getFinalStatesForTable(AccumuloCluster cluster, - TableId tid) { - return cluster.getServerContext().getAmple().getExternalCompactionFinalStates() - .filter(state -> state.getExtent().tableId().equals(tid)); - } - public static void compact(final AccumuloClient client, String table1, int modulus, String expectedQueue, boolean wait) throws AccumuloSecurityException, TableNotFoundException, AccumuloException { @@ -298,6 +292,21 @@ public class ExternalCompactionTestUtils { return ecids; } + public static void waitForRunningCompactions(ServerContext ctx, TableId tid, + Set<ExternalCompactionId> idsToWaitFor) throws Exception { + + assertTrue(Wait.waitFor(() -> { + Set<ExternalCompactionId> seen; + try (TabletsMetadata tm = + ctx.getAmple().readTablets().forTable(tid).fetch(ColumnType.ECOMP).build()) { + seen = tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) + .collect(Collectors.toSet()); + } + + return Collections.disjoint(seen, idsToWaitFor); + })); + } + public static int confirmCompactionRunning(ServerContext ctx, Set<ExternalCompactionId> ecids) throws Exception { int matches = 0; diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index bda4afdab7..3721960303 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.test.compaction; -import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE2; @@ -26,21 +25,17 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QU import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE4; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE5; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE6; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE7; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE8; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStatesForTable; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; @@ -48,9 +43,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv; import org.apache.accumulo.core.client.Accumulo; @@ -74,20 +67,10 @@ import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -202,23 +185,15 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { String table1 = this.getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { - // Stop the TabletServer so that it does not commit the compaction - getCluster().getProcesses().get(TABLET_SERVER).forEach(p -> { - try { - getCluster().killProcess(TABLET_SERVER, p); - } catch (Exception e) { - fail("Failed to shutdown tablet server"); - } - }); - // Start our TServer that will not commit the compaction - // ELASTICITY_TODO this will likely no longer work now that compactions do not run in the - // tserver - getCluster().getClusterControl().start(TABLET_SERVER, null, 1, - ExternalCompactionTServer.class); - getCluster().getClusterControl().start(ServerType.TABLET_SERVER); createTable(client, table1, "cs3", 2); writeData(client, table1); + verify(client, table1, 1); + + // ELASTICITY_TODO the compactors started by mini inspecting the config were interfering with + // starting the ExternalDoNothingCompactor, so killed all compactors. This is not the best way + // to handle this. + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE3, 1); getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, @@ -228,19 +203,22 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { TableId tid = getCluster().getServerContext().getTableId(table1); // Wait for the compaction to start by waiting for 1 external compaction column - ExternalCompactionTestUtils + var ecids = ExternalCompactionTestUtils .waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid); + assertFalse(ecids.isEmpty()); + // Kill the compactor getCluster().getClusterControl().stop(ServerType.COMPACTOR); - // DeadCompactionDetector in the CompactionCoordinator should fail the compaction. - long count = 0; - while (count == 0) { - count = getFinalStatesForTable(getCluster(), tid) - .filter(state -> state.getFinalState().equals(FinalState.FAILED)).count(); - UtilWaitThread.sleep(250); - } + // DeadCompactionDetector in the CompactionCoordinator should fail the compaction and delete + // it from the tablet. + ExternalCompactionTestUtils.waitForRunningCompactions(getCluster().getServerContext(), tid, + ecids); + + // If the compaction actually ran it would have filtered data, so lets make sure all the data + // written is there. This check provides evidence the compaction did not run. + verify(client, table1, 1); // We need to cancel the compaction or delete the table here because we initiate a user // compaction above in the test. Even though the external compaction was cancelled @@ -374,86 +352,6 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { } } - @Test - public void testExternalCompactionDeadTServer() throws Exception { - // Shut down the normal TServers - getCluster().getProcesses().get(TABLET_SERVER).forEach(p -> { - try { - getCluster().killProcess(TABLET_SERVER, p); - } catch (Exception e) { - fail("Failed to shutdown tablet server"); - } - }); - // Start our TServer that will not commit the compaction - ProcessInfo tserverProcess = getCluster().exec(ExternalCompactionTServer.class); - - final String table3 = this.getUniqueNames(1)[0]; - - try (final AccumuloClient client = - Accumulo.newClient().from(getCluster().getClientProperties()).build()) { - createTable(client, table3, "cs7"); - writeData(client, table3); - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE7, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR); - compact(client, table3, 2, QUEUE7, false); - - // ExternalCompactionTServer will not commit the compaction. Wait for the - // metadata table entries to show up. - LOG.info("Waiting for external compaction to complete."); - TableId tid = getCluster().getServerContext().getTableId(table3); - Stream<ExternalCompactionFinalState> fs = getFinalStatesForTable(getCluster(), tid); - while (fs.findAny().isEmpty()) { - LOG.info("Waiting for compaction completed marker to appear"); - UtilWaitThread.sleep(250); - fs = getFinalStatesForTable(getCluster(), tid); - } - - LOG.info("Validating metadata table contents."); - TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) - .fetch(ColumnType.ECOMP).build(); - List<TabletMetadata> md = new ArrayList<>(); - tm.forEach(t -> md.add(t)); - assertEquals(1, md.size()); - TabletMetadata m = md.get(0); - Map<ExternalCompactionId,ExternalCompactionMetadata> em = m.getExternalCompactions(); - assertEquals(1, em.size()); - List<ExternalCompactionFinalState> finished = new ArrayList<>(); - getFinalStatesForTable(getCluster(), tid).forEach(f -> finished.add(f)); - assertEquals(1, finished.size()); - assertEquals(em.entrySet().iterator().next().getKey(), - finished.get(0).getExternalCompactionId()); - tm.close(); - - // Force a flush on the metadata table before killing our tserver - client.tableOperations().flush(MetadataTable.NAME); - - // Stop our TabletServer. Need to perform a normal shutdown so that the WAL is closed - // normally. - LOG.info("Stopping our tablet server"); - getCluster().stopProcessWithTimeout(tserverProcess.getProcess(), 30, TimeUnit.SECONDS); - getCluster().getClusterControl().stop(ServerType.TABLET_SERVER); - - // Start a TabletServer to commit the compaction. - LOG.info("Starting normal tablet server"); - getCluster().getClusterControl().start(ServerType.TABLET_SERVER); - - // Wait for the compaction to be committed. - LOG.info("Waiting for compaction completed marker to disappear"); - Stream<ExternalCompactionFinalState> fs2 = getFinalStatesForTable(getCluster(), tid); - while (fs2.findAny().isPresent()) { - LOG.info("Waiting for compaction completed marker to disappear"); - UtilWaitThread.sleep(500); - fs2 = getFinalStatesForTable(getCluster(), tid); - } - verify(client, table3, 2); - - // We need to cancel the compaction or delete the table here because we initiate a user - // compaction above in the test. Even though the external compaction was cancelled - // because we split the table, FaTE will continue to queue up a compaction - client.tableOperations().cancelCompaction(table3); - } - } - public static class FSelector implements CompactionSelector { @Override diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java index f946f1d901..969c164fd5 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java @@ -27,32 +27,26 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.co import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionRunning; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStatesForTable; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.util.Collections; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.compaction.thrift.TCompactionState; -import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; @@ -140,95 +134,6 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { } } - @Test - public void testExternalCompactionsSucceedsRunWithTableOffline() throws Exception { - - getCluster().getClusterControl().stop(ServerType.COMPACTOR); - - String table1 = this.getUniqueNames(1)[0]; - try (AccumuloClient client = - Accumulo.newClient().from(getCluster().getClientProperties()).build()) { - - createTable(client, table1, "cs2"); - // set compaction ratio to 1 so that majc occurs naturally, not user compaction - // user compaction blocks merge - client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0"); - // cause multiple rfiles to be created - writeData(client, table1); - writeData(client, table1); - writeData(client, table1); - writeData(client, table1); - - TableId tid = getCluster().getServerContext().getTableId(table1); - // Confirm that no final state is in the metadata table - assertEquals(0, getFinalStatesForTable(getCluster(), tid).count()); - - // Offline the table when the compaction starts - final AtomicBoolean succeededInTakingOffline = new AtomicBoolean(false); - Thread t = new Thread(() -> { - try (AccumuloClient client2 = - Accumulo.newClient().from(getCluster().getClientProperties()).build()) { - TExternalCompactionList metrics2 = getRunningCompactions(getCluster().getServerContext()); - while (metrics2.getCompactions() == null) { - metrics2 = getRunningCompactions(getCluster().getServerContext()); - if (metrics2.getCompactions() == null) { - UtilWaitThread.sleep(50); - } - } - LOG.info("Taking table offline"); - client2.tableOperations().offline(table1, false); - succeededInTakingOffline.set(true); - } catch (Exception e) { - LOG.error("Error: ", e); - } - }); - t.start(); - - // Start the compactor - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR); - - // Wait for the compaction to start by waiting for 1 external compaction column - Set<ExternalCompactionId> ecids = ExternalCompactionTestUtils - .waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid); - - // Confirm that this ECID shows up in RUNNING set - int matches = ExternalCompactionTestUtils - .confirmCompactionRunning(getCluster().getServerContext(), ecids); - assertTrue(matches > 0); - - t.join(); - if (!succeededInTakingOffline.get()) { - fail("Failed to offline table"); - } - - confirmCompactionCompleted(getCluster().getServerContext(), ecids, - TCompactionState.SUCCEEDED); - - // Confirm that final state is in the metadata table - assertEquals(1, getFinalStatesForTable(getCluster(), tid).count()); - - // Online the table - client.tableOperations().online(table1); - - // wait for compaction to be committed by tserver or test timeout - long finalStateCount = getFinalStatesForTable(getCluster(), tid).count(); - while (finalStateCount > 0) { - finalStateCount = getFinalStatesForTable(getCluster(), tid).count(); - if (finalStateCount > 0) { - UtilWaitThread.sleep(50); - } - } - - // We need to cancel the compaction or delete the table here because we initiate a user - // compaction above in the test. Even though the external compaction was cancelled - // because we split the table, FaTE will continue to queue up a compaction - client.tableOperations().delete(table1); - - getCluster().getClusterControl().stop(ServerType.COMPACTOR); - } - } - @Test public void testUserCompactionCancellation() throws Exception {