This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 12fe963 Create tool to validate Compaction configuration (#2345) 12fe963 is described below commit 12fe9633f6cdcac7a964d8fdfcec8c615aaf1629 Author: Dom G <47725857+domgargu...@users.noreply.github.com> AuthorDate: Mon Jan 3 09:21:30 2022 -0500 Create tool to validate Compaction configuration (#2345) * Create CLI tool that validates a compaction configuration within a file * Extract CompactionManager.Config into its own class * Extract CompactionService.CpInitParams into its own class --- .../spi/compaction/DefaultCompactionPlanner.java | 11 +- .../compaction/CompactionPlannerInitParams.java | 98 +++++++++++ .../util/compaction/CompactionServicesConfig.java | 189 +++++++++++++++++++++ .../server/conf/CheckCompactionConfig.java | 162 ++++++++++++++++++ .../server/conf/CheckCompactionConfigTest.java | 154 +++++++++++++++++ .../tserver/compactions/CompactionManager.java | 174 +++---------------- .../tserver/compactions/CompactionService.java | 70 +------- .../accumulo/tserver/tablet/CompactableImpl.java | 5 +- .../apache/accumulo/test/start/KeywordStartIT.java | 8 +- 9 files changed, 645 insertions(+), 226 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java index a74ab37..2c86f1f 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java @@ -113,7 +113,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public class DefaultCompactionPlanner implements CompactionPlanner { - private static Logger log = LoggerFactory.getLogger(DefaultCompactionPlanner.class); + private static final Logger log = LoggerFactory.getLogger(DefaultCompactionPlanner.class); public static class ExecutorConfig { String type; @@ -170,17 +170,16 @@ public class DefaultCompactionPlanner implements CompactionPlanner { case "internal": Preconditions.checkArgument(null == executorConfig.queue, "'queue' should not be specified for internal compactions"); - Objects.requireNonNull(executorConfig.numThreads, + int numThreads = Objects.requireNonNull(executorConfig.numThreads, "'numThreads' must be specified for internal type"); - ceid = params.getExecutorManager().createExecutor(executorConfig.name, - executorConfig.numThreads); + ceid = params.getExecutorManager().createExecutor(executorConfig.name, numThreads); break; case "external": Preconditions.checkArgument(null == executorConfig.numThreads, "'numThreads' should not be specified for external compactions"); - Objects.requireNonNull(executorConfig.queue, + String queue = Objects.requireNonNull(executorConfig.queue, "'queue' must be specified for external type"); - ceid = params.getExecutorManager().getExternalExecutor(executorConfig.queue); + ceid = params.getExecutorManager().getExternalExecutor(queue); break; default: throw new IllegalArgumentException("type must be 'internal' or 'external'"); diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java new file mode 100644 index 0000000..61bf50e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.compaction; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.apache.accumulo.core.spi.compaction.ExecutorManager; + +import com.google.common.base.Preconditions; + +public class CompactionPlannerInitParams implements CompactionPlanner.InitParameters { + + private final Map<String,String> plannerOpts; + private final Map<CompactionExecutorId,Integer> requestedExecutors; + private final Set<CompactionExecutorId> requestedExternalExecutors; + private final ServiceEnvironment senv; + private final CompactionServiceId serviceId; + + public CompactionPlannerInitParams(CompactionServiceId serviceId, Map<String,String> plannerOpts, + ServiceEnvironment senv) { + this.serviceId = serviceId; + this.plannerOpts = plannerOpts; + this.requestedExecutors = new HashMap<>(); + this.requestedExternalExecutors = new HashSet<>(); + this.senv = senv; + } + + @Override + public ServiceEnvironment getServiceEnvironment() { + return senv; + } + + @Override + public Map<String,String> getOptions() { + return plannerOpts; + } + + @Override + public String getFullyQualifiedOption(String key) { + return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".opts." + key; + } + + @Override + public ExecutorManager getExecutorManager() { + return new ExecutorManager() { + @Override + public CompactionExecutorId createExecutor(String executorName, int threads) { + Preconditions.checkArgument(threads > 0, "Positive number of threads required : %s", + threads); + var ceid = CompactionExecutorIdImpl.internalId(serviceId, executorName); + Preconditions.checkState(!getRequestedExecutors().containsKey(ceid)); + getRequestedExecutors().put(ceid, threads); + return ceid; + } + + @Override + public CompactionExecutorId getExternalExecutor(String name) { + var ceid = CompactionExecutorIdImpl.externalId(name); + Preconditions.checkArgument(!getRequestedExternalExecutors().contains(ceid), + "Duplicate external executor for queue " + name); + getRequestedExternalExecutors().add(ceid); + return ceid; + } + }; + } + + public Map<CompactionExecutorId,Integer> getRequestedExecutors() { + return requestedExecutors; + } + + public Set<CompactionExecutorId> getRequestedExternalExecutors() { + return requestedExternalExecutors; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java new file mode 100644 index 0000000..0dfcfe8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.compaction; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; + +import com.google.common.collect.Sets; + +/** + * This class serves to configure compaction services from an {@link AccumuloConfiguration} object. + * + * Specifically, compaction service properties (those prefixed by "tserver.compaction.major + * .service") are used. + */ +public class CompactionServicesConfig { + + private final Map<String,String> planners = new HashMap<>(); + private final Map<String,Long> rateLimits = new HashMap<>(); + private final Map<String,Map<String,String>> options = new HashMap<>(); + long defaultRateLimit; + private final Consumer<String> deprecationWarningConsumer; + + public static final CompactionServiceId DEFAULT_SERVICE = CompactionServiceId.of("default"); + + @SuppressWarnings("removal") + private long getDefaultThroughput(AccumuloConfiguration aconf) { + if (aconf.isPropertySet(Property.TSERV_MAJC_THROUGHPUT, true)) { + return aconf.getAsBytes(Property.TSERV_MAJC_THROUGHPUT); + } + + return ConfigurationTypeHelper + .getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue()); + } + + @SuppressWarnings("removal") + private Map<String,String> getConfiguration(AccumuloConfiguration aconf) { + + Map<String,String> configs = + aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX); + + // check if deprecated properties for compaction executor are set + if (aconf.isPropertySet(Property.TSERV_MAJC_MAXCONCURRENT, true)) { + + String defaultServicePrefix = + Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + DEFAULT_SERVICE.canonical() + "."; + + // check if any properties for the default compaction service are set + boolean defaultServicePropsSet = configs.keySet().stream() + .filter(key -> key.startsWith(defaultServicePrefix)).map(Property::getPropertyByKey) + .anyMatch(prop -> prop == null || aconf.isPropertySet(prop, true)); + + if (defaultServicePropsSet) { + + String warning = "The deprecated property " + Property.TSERV_MAJC_MAXCONCURRENT.getKey() + + " was set. Properties with the prefix " + defaultServicePrefix + + " were also set which replace the deprecated properties. The deprecated property " + + "was therefore ignored."; + + deprecationWarningConsumer.accept(warning); + + } else { + String numThreads = aconf.get(Property.TSERV_MAJC_MAXCONCURRENT); + + // Its possible a user has configured the other compaction services, but not the default + // service. In this case want to produce a config with the default service configs + // overridden using deprecated configs. + + HashMap<String,String> configsCopy = new HashMap<>(configs); + + Map<String,String> defaultServiceConfigs = + Map.of(defaultServicePrefix + "planner", DefaultCompactionPlanner.class.getName(), + defaultServicePrefix + "planner.opts.executors", + "[{'name':'deprecated', 'numThreads':" + numThreads + "}]"); + + configsCopy.putAll(defaultServiceConfigs); + + String warning = "The deprecated property " + Property.TSERV_MAJC_MAXCONCURRENT.getKey() + + " was set. Properties with the prefix " + defaultServicePrefix + + " were not set, these should replace the deprecated properties. The old properties " + + "were automatically mapped to the new properties in process creating : " + + defaultServiceConfigs + "."; + + deprecationWarningConsumer.accept(warning); + + configs = Map.copyOf(configsCopy); + } + } + + return configs; + + } + + public CompactionServicesConfig(AccumuloConfiguration aconf, + Consumer<String> deprecationWarningConsumer) { + this.deprecationWarningConsumer = deprecationWarningConsumer; + Map<String,String> configs = getConfiguration(aconf); + + configs.forEach((prop, val) -> { + + var suffix = prop.substring(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey().length()); + String[] tokens = suffix.split("\\."); + if (tokens.length == 4 && tokens[1].equals("planner") && tokens[2].equals("opts")) { + options.computeIfAbsent(tokens[0], k -> new HashMap<>()).put(tokens[3], val); + } else if (tokens.length == 2 && tokens[1].equals("planner")) { + planners.put(tokens[0], val); + } else if (tokens.length == 3 && tokens[1].equals("rate") && tokens[2].equals("limit")) { + var eprop = Property.getPropertyByKey(prop); + if (eprop == null || aconf.isPropertySet(eprop, true) + || !isDeprecatedThroughputSet(aconf)) { + rateLimits.put(tokens[0], ConfigurationTypeHelper.getFixedMemoryAsBytes(val)); + } + } else { + throw new IllegalArgumentException("Malformed compaction service property " + prop); + } + }); + + defaultRateLimit = getDefaultThroughput(aconf); + + var diff = Sets.difference(options.keySet(), planners.keySet()); + + if (!diff.isEmpty()) { + throw new IllegalArgumentException( + "Incomplete compaction service definitions, missing planner class " + diff); + } + + } + + @SuppressWarnings("removal") + private boolean isDeprecatedThroughputSet(AccumuloConfiguration aconf) { + return aconf.isPropertySet(Property.TSERV_MAJC_THROUGHPUT, true); + } + + public long getRateLimit(String serviceName) { + return getRateLimits().getOrDefault(serviceName, defaultRateLimit); + } + + @Override + public boolean equals(Object o) { + if (o instanceof CompactionServicesConfig) { + var oc = (CompactionServicesConfig) o; + return getPlanners().equals(oc.getPlanners()) && getOptions().equals(oc.getOptions()) + && getRateLimits().equals(oc.getRateLimits()); + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hash(getPlanners(), getOptions(), getRateLimits()); + } + + public Map<String,String> getPlanners() { + return planners; + } + + public Map<String,Long> getRateLimits() { + return rateLimits; + } + + public Map<String,Map<String,String>> getOptions() { + return options; + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java new file mode 100644 index 0000000..2d53180 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf; + +import java.io.FileNotFoundException; +import java.nio.file.Path; +import java.util.Set; + +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.apache.accumulo.core.util.ConfigurationImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; +import org.apache.accumulo.start.spi.KeywordExecutable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; +import com.google.auto.service.AutoService; + +/** + * A command line tool that verifies that a given properties file will correctly configure + * compaction services. + * + * This tool takes, as input, a local path to a properties file containing the properties used to + * configure compaction services. The file is parsed and the user is presented with output detailing + * which (if any) compaction services would be created from the given properties, or an error + * describing why the given properties are incorrect. + */ +@AutoService(KeywordExecutable.class) +public class CheckCompactionConfig implements KeywordExecutable { + + private final static Logger log = LoggerFactory.getLogger(CheckCompactionConfig.class); + + final static String DEFAULT = "default"; + final static String META = "meta"; + final static String ROOT = "root"; + + static class Opts extends Help { + @Parameter(description = "<path> Local path to file containing compaction configuration", + required = true) + String filePath; + } + + @Override + public String keyword() { + return "check-compaction-config"; + } + + @Override + public String description() { + return "Verifies compaction config within a given file"; + } + + public static void main(String[] args) throws Exception { + new CheckCompactionConfig().execute(args); + } + + @Override + public void execute(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(keyword(), args); + + if (opts.filePath == null) { + throw new IllegalArgumentException("No properties file was given"); + } + + Path path = Path.of(opts.filePath); + if (!path.toFile().exists()) + throw new FileNotFoundException("File at given path could not be found"); + + AccumuloConfiguration config = SiteConfiguration.fromFile(path.toFile()).build(); + var servicesConfig = new CompactionServicesConfig(config, log::warn); + ServiceEnvironment senv = createServiceEnvironment(config); + + Set<String> defaultServices = Set.of(DEFAULT, META, ROOT); + if (servicesConfig.getPlanners().keySet().equals(defaultServices)) { + log.warn("Only the default compaction services were created - {}", defaultServices); + return; + } + + for (var entry : servicesConfig.getPlanners().entrySet()) { + String serviceId = entry.getKey(); + String plannerClassName = entry.getValue(); + + log.info("Service id: {}, planner class:{}", serviceId, plannerClassName); + + Class<? extends CompactionPlanner> plannerClass = + Class.forName(plannerClassName).asSubclass(CompactionPlanner.class); + CompactionPlanner planner = plannerClass.getDeclaredConstructor().newInstance(); + + var initParams = new CompactionPlannerInitParams(CompactionServiceId.of(serviceId), + servicesConfig.getOptions().get(serviceId), senv); + + planner.init(initParams); + + initParams.getRequestedExecutors() + .forEach((execId, numThreads) -> log.info( + "Compaction service '{}' requested creation of thread pool '{}' with {} threads.", + serviceId, execId, numThreads)); + + initParams.getRequestedExternalExecutors() + .forEach(execId -> log.info( + "Compaction service '{}' requested with external execution queue '{}'", serviceId, + execId)); + + } + + log.info("Properties file has passed all checks."); + } + + private ServiceEnvironment createServiceEnvironment(AccumuloConfiguration config) { + return new ServiceEnvironment() { + + @Override + public <T> T instantiate(TableId tableId, String className, Class<T> base) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> T instantiate(String className, Class<T> base) { + throw new UnsupportedOperationException(); + } + + @Override + public String getTableName(TableId tableId) { + throw new UnsupportedOperationException(); + } + + @Override + public Configuration getConfiguration(TableId tableId) { + return new ConfigurationImpl(config); + } + + @Override + public Configuration getConfiguration() { + return new ConfigurationImpl(config); + } + }; + } +} diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java new file mode 100644 index 0000000..01c6fa3 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path not set by user input") +public class CheckCompactionConfigTest { + + private final static Logger log = LoggerFactory.getLogger(CheckCompactionConfigTest.class); + + @Rule + public TestName testName = new TestName(); + + @ClassRule + public static final TemporaryFolder folder = + new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); + + @Test + public void testValidInput1() throws Exception { + String inputString = ("tserver.compaction.major.service.cs1.planner=" + + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n" + + "tserver.compaction.major.service.cs1.planner.opts.executors=\\\n" + + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n" + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':4},\\\n" + + "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\""); + + String filePath = writeToFileAndReturnPath(inputString); + + CheckCompactionConfig.main(new String[] {filePath}); + } + + @Test + public void testValidInput2() throws Exception { + String inputString = ("tserver.compaction.major.service.cs1.planner=" + + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n" + + "tserver.compaction.major.service.cs1.planner.opts.executors=\\\n" + + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n" + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':4},\\\n" + + "{'name':'large','type':'internal','numThreads':2}] \n" + + "tserver.compaction.major.service.cs2.planner=" + + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n" + + "tserver.compaction.major.service.cs2.planner.opts.executors=\\\n" + + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':7},\\\n" + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':5},\\\n" + + "{'name':'large','type':'external','queue':'DCQ1'}]").replaceAll("'", "\""); + + String filePath = writeToFileAndReturnPath(inputString); + + CheckCompactionConfig.main(new String[] {filePath}); + } + + @Test + public void testThrowsExternalNumThreadsError() throws IOException { + String inputString = ("tserver.compaction.major.service.cs1.planner=" + + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n" + + "tserver.compaction.major.service.cs1.planner.opts.executors=\\\n" + + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n" + + "{'name':'medium','type':'external','maxSize':'128M','numThreads':4},\\\n" + + "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\""); + String expectedErrorMsg = "'numThreads' should not be specified for external compactions"; + + String filePath = writeToFileAndReturnPath(inputString); + + var e = assertThrows(IllegalArgumentException.class, + () -> CheckCompactionConfig.main(new String[] {filePath})); + assertEquals(e.getMessage(), expectedErrorMsg); + } + + @Test + public void testNegativeThreadCount() throws IOException { + String inputString = ("tserver.compaction.major.service.cs1.planner=" + + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n" + + "tserver.compaction.major.service.cs1.planner.opts.executors=\\\n" + + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n" + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':-4},\\\n" + + "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\""); + String expectedErrorMsg = "Positive number of threads required : -4"; + + String filePath = writeToFileAndReturnPath(inputString); + + var e = assertThrows(IllegalArgumentException.class, + () -> CheckCompactionConfig.main(new String[] {filePath})); + assertEquals(e.getMessage(), expectedErrorMsg); + } + + @Test + public void testNoPlanner() throws Exception { + String inputString = ("tserver.compaction.major.service.cs1.planner.opts.executors=\\\n" + + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n" + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':4},\\\n" + + "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\""); + String expectedErrorMsg = "Incomplete compaction service definitions, missing planner class"; + + String filePath = writeToFileAndReturnPath(inputString); + + var e = assertThrows(IllegalArgumentException.class, + () -> CheckCompactionConfig.main(new String[] {filePath})); + assertTrue(e.getMessage().startsWith(expectedErrorMsg)); + } + + @Test + public void testBadPropsFilePath() { + String[] args = {"/home/foo/bar/myProperties.properties"}; + String expectedErrorMsg = "File at given path could not be found"; + var e = assertThrows(FileNotFoundException.class, () -> CheckCompactionConfig.main(args)); + assertEquals(expectedErrorMsg, e.getMessage()); + } + + private String writeToFileAndReturnPath(String inputString) throws IOException { + File file = folder.newFile(testName.getMethodName() + ".properties"); + try (FileWriter fileWriter = new FileWriter(file, UTF_8); + BufferedWriter bufferedWriter = new BufferedWriter(fileWriter)) { + bufferedWriter.write(inputString); + } + log.info("Wrote to path: {}\nWith string:\n{}", file.getAbsolutePath(), inputString); + return file.getAbsolutePath(); + } +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index f65a7a0..35d9d7d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.tserver.compactions; +import static org.apache.accumulo.core.util.compaction.CompactionServicesConfig.DEFAULT_SERVICE; + import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -30,8 +32,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -39,9 +39,9 @@ import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; import org.apache.accumulo.core.spi.compaction.CompactionServices; -import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.fate.util.Retry; import org.apache.accumulo.server.ServerContext; @@ -67,14 +67,12 @@ public class CompactionManager { private ServerContext context; - private Config currentCfg; + private CompactionServicesConfig currentCfg; private long lastConfigCheckTime = System.nanoTime(); private CompactionExecutorsMetrics ceMetrics; - public static final CompactionServiceId DEFAULT_SERVICE = CompactionServiceId.of("default"); - private String lastDeprecationWarning = ""; private Map<CompactionExecutorId,ExternalCompactionExecutor> externalExecutors; @@ -91,144 +89,10 @@ public class CompactionManager { } } - private class Config { - Map<String,String> planners = new HashMap<>(); - Map<String,Long> rateLimits = new HashMap<>(); - Map<String,Map<String,String>> options = new HashMap<>(); - long defaultRateLimit = 0; - - @SuppressWarnings("removal") - private long getDefaultThroughput(AccumuloConfiguration aconf) { - if (aconf.isPropertySet(Property.TSERV_MAJC_THROUGHPUT, true)) { - return aconf.getAsBytes(Property.TSERV_MAJC_THROUGHPUT); - } - - return ConfigurationTypeHelper - .getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue()); - } - - @SuppressWarnings("removal") - private Map<String,String> getConfiguration(AccumuloConfiguration aconf) { - - Map<String,String> configs = - aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX); - - // check if deprecated properties for compaction executor are set - if (aconf.isPropertySet(Property.TSERV_MAJC_MAXCONCURRENT, true)) { - - String defaultServicePrefix = - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + DEFAULT_SERVICE.canonical() + "."; - - // check if any properties for the default compaction service are set - boolean defaultServicePropsSet = configs.keySet().stream() - .filter(key -> key.startsWith(defaultServicePrefix)).map(Property::getPropertyByKey) - .anyMatch(prop -> prop == null || aconf.isPropertySet(prop, true)); - - if (defaultServicePropsSet) { - - String warning = String.format( - "The deprecated property %s was set. Properties with the prefix %s " - + "were also set, which replace the deprecated properties. The deprecated " - + "property was therefore ignored.", - Property.TSERV_MAJC_MAXCONCURRENT.getKey(), defaultServicePrefix); - - if (!warning.equals(lastDeprecationWarning)) { - log.warn(warning); - lastDeprecationWarning = warning; - } - } else { - String numThreads = aconf.get(Property.TSERV_MAJC_MAXCONCURRENT); - - // Its possible a user has configured the others compaction services, but not the default - // service. In this case want to produce a config with the default service configs - // overridden using deprecated configs. - - HashMap<String,String> configsCopy = new HashMap<>(configs); - - Map<String,String> defaultServiceConfigs = - Map.of(defaultServicePrefix + "planner", DefaultCompactionPlanner.class.getName(), - defaultServicePrefix + "planner.opts.executors", - "[{'name':'deprecated', 'numThreads':" + numThreads + "}]"); - - configsCopy.putAll(defaultServiceConfigs); - - String warning = String.format( - "The deprecated property %s was set. Properties with the prefix %s " - + "were not set, these should replace the deprecated properties. The old " - + "properties were automatically mapped to the new properties in process " - + "creating : %s.", - Property.TSERV_MAJC_MAXCONCURRENT.getKey(), defaultServicePrefix, - defaultServiceConfigs); - - if (!warning.equals(lastDeprecationWarning)) { - log.warn(warning); - lastDeprecationWarning = warning; - } - - configs = Map.copyOf(configsCopy); - } - } - - return configs; - - } - - Config(AccumuloConfiguration aconf) { - Map<String,String> configs = getConfiguration(aconf); - - configs.forEach((prop, val) -> { - - var suffix = prop.substring(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey().length()); - String[] tokens = suffix.split("\\."); - if (tokens.length == 4 && tokens[1].equals("planner") && tokens[2].equals("opts")) { - options.computeIfAbsent(tokens[0], k -> new HashMap<>()).put(tokens[3], val); - } else if (tokens.length == 2 && tokens[1].equals("planner")) { - planners.put(tokens[0], val); - } else if (tokens.length == 3 && tokens[1].equals("rate") && tokens[2].equals("limit")) { - var eprop = Property.getPropertyByKey(prop); - if (eprop == null || aconf.isPropertySet(eprop, true) - || !isDeprecatedThroughputSet(aconf)) { - rateLimits.put(tokens[0], ConfigurationTypeHelper.getFixedMemoryAsBytes(val)); - } - } else { - throw new IllegalArgumentException("Malformed compaction service property " + prop); - } - }); - - defaultRateLimit = getDefaultThroughput(aconf); - - var diff = Sets.difference(options.keySet(), planners.keySet()); - - if (!diff.isEmpty()) { - throw new IllegalArgumentException( - "Incomplete compaction service definitions, missing planner class " + diff); - } - - } - - @SuppressWarnings("removal") - private boolean isDeprecatedThroughputSet(AccumuloConfiguration aconf) { - return aconf.isPropertySet(Property.TSERV_MAJC_THROUGHPUT, true); - } - - public long getRateLimit(String serviceName) { - return rateLimits.getOrDefault(serviceName, defaultRateLimit); - } - - @Override - public boolean equals(Object o) { - if (o instanceof Config) { - var oc = (Config) o; - return planners.equals(oc.planners) && options.equals(oc.options) - && rateLimits.equals(oc.rateLimits); - } - - return false; - } - - @Override - public int hashCode() { - return Objects.hash(planners, options, rateLimits); + private void warnAboutDeprecation(String warning) { + if (!warning.equals(lastDeprecationWarning)) { + log.warn(warning); + lastDeprecationWarning = warning; } } @@ -320,7 +184,8 @@ public class CompactionManager { CompactionExecutorsMetrics ceMetrics) { this.compactables = compactables; - this.currentCfg = new Config(context.getConfiguration()); + this.currentCfg = + new CompactionServicesConfig(context.getConfiguration(), this::warnAboutDeprecation); this.context = context; @@ -332,16 +197,16 @@ public class CompactionManager { Map<CompactionServiceId,CompactionService> tmpServices = new HashMap<>(); - currentCfg.planners.forEach((serviceName, plannerClassName) -> { + currentCfg.getPlanners().forEach((serviceName, plannerClassName) -> { try { tmpServices.put(CompactionServiceId.of(serviceName), new CompactionService(serviceName, plannerClassName, currentCfg.getRateLimit(serviceName), - currentCfg.options.getOrDefault(serviceName, Map.of()), context, ceMetrics, + currentCfg.getOptions().getOrDefault(serviceName, Map.of()), context, ceMetrics, this::getExternalExecutor)); } catch (RuntimeException e) { log.error("Failed to create compaction service {} with planner:{} options:{}", serviceName, - plannerClassName, currentCfg.options.getOrDefault(serviceName, Map.of()), e); + plannerClassName, currentCfg.getOptions().getOrDefault(serviceName, Map.of()), e); } }); @@ -367,12 +232,13 @@ public class CompactionManager { lastConfigCheckTime = System.nanoTime(); - var tmpCfg = new Config(context.getConfiguration()); + var tmpCfg = + new CompactionServicesConfig(context.getConfiguration(), this::warnAboutDeprecation); if (!currentCfg.equals(tmpCfg)) { Map<CompactionServiceId,CompactionService> tmpServices = new HashMap<>(); - tmpCfg.planners.forEach((serviceName, plannerClassName) -> { + tmpCfg.getPlanners().forEach((serviceName, plannerClassName) -> { try { var csid = CompactionServiceId.of(serviceName); @@ -381,22 +247,22 @@ public class CompactionManager { tmpServices.put(csid, new CompactionService(serviceName, plannerClassName, tmpCfg.getRateLimit(serviceName), - tmpCfg.options.getOrDefault(serviceName, Map.of()), context, ceMetrics, + tmpCfg.getOptions().getOrDefault(serviceName, Map.of()), context, ceMetrics, this::getExternalExecutor)); } else { service.configurationChanged(plannerClassName, tmpCfg.getRateLimit(serviceName), - tmpCfg.options.getOrDefault(serviceName, Map.of())); + tmpCfg.getOptions().getOrDefault(serviceName, Map.of())); tmpServices.put(csid, service); } } catch (RuntimeException e) { throw new RuntimeException("Failed to create or update compaction service " + serviceName + " with planner:" + plannerClassName + " options:" - + tmpCfg.options.getOrDefault(serviceName, Map.of()), e); + + tmpCfg.getOptions().getOrDefault(serviceName, Map.of()), e); } }); var deletedServices = - Sets.difference(currentCfg.planners.keySet(), tmpCfg.planners.keySet()); + Sets.difference(currentCfg.getPlanners().keySet(), tmpCfg.getPlanners().keySet()); for (String serviceName : deletedServices) { services.get(CompactionServiceId.of(serviceName)).stop(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index df59d23..0829f26 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -41,7 +41,6 @@ import java.util.function.Function; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.spi.common.ServiceEnvironment; @@ -52,9 +51,9 @@ import org.apache.accumulo.core.spi.compaction.CompactionPlan; import org.apache.accumulo.core.spi.compaction.CompactionPlanner; import org.apache.accumulo.core.spi.compaction.CompactionPlanner.PlanningParameters; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; -import org.apache.accumulo.core.spi.compaction.ExecutorManager; import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -88,59 +87,6 @@ public class CompactionService { private static final Logger log = LoggerFactory.getLogger(CompactionService.class); - private class CpInitParams implements CompactionPlanner.InitParameters { - - private final Map<String,String> plannerOpts; - private final Map<CompactionExecutorId,Integer> requestedExecutors; - private final Set<CompactionExecutorId> requestedExternalExecutors; - private final ServiceEnvironment senv = new ServiceEnvironmentImpl(context); - - CpInitParams(Map<String,String> plannerOpts) { - this.plannerOpts = plannerOpts; - this.requestedExecutors = new HashMap<>(); - this.requestedExternalExecutors = new HashSet<>(); - } - - @Override - public ServiceEnvironment getServiceEnvironment() { - return senv; - } - - @Override - public Map<String,String> getOptions() { - return plannerOpts; - } - - @Override - public String getFullyQualifiedOption(String key) { - return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + myId + ".opts." + key; - } - - @Override - public ExecutorManager getExecutorManager() { - return new ExecutorManager() { - @Override - public CompactionExecutorId createExecutor(String executorName, int threads) { - Preconditions.checkArgument(threads > 0, "Positive number of threads required : %s", - threads); - var ceid = CompactionExecutorIdImpl.internalId(myId, executorName); - Preconditions.checkState(!requestedExecutors.containsKey(ceid)); - requestedExecutors.put(ceid, threads); - return ceid; - } - - @Override - public CompactionExecutorId getExternalExecutor(String name) { - var ceid = CompactionExecutorIdImpl.externalId(name); - Preconditions.checkArgument(!requestedExternalExecutors.contains(ceid), - "Duplicate external executor for queue " + name); - requestedExternalExecutors.add(ceid); - return ceid; - } - }; - } - } - public CompactionService(String serviceName, String plannerClass, Long maxRate, Map<String,String> plannerOptions, ServerContext context, CompactionExecutorsMetrics ceMetrics, @@ -155,7 +101,8 @@ public class CompactionService { this.ceMetrics = ceMetrics; this.externExecutorSupplier = externExecutorSupplier; - var initParams = new CpInitParams(plannerOpts); + var initParams = + new CompactionPlannerInitParams(myId, plannerOpts, new ServiceEnvironmentImpl(context)); planner = createPlanner(plannerClass); planner.init(initParams); @@ -168,12 +115,12 @@ public class CompactionService { this.writeLimiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration()) .create("CS_" + serviceName + "_write", () -> rateLimit.get()); - initParams.requestedExecutors.forEach((ceid, numThreads) -> { + initParams.getRequestedExecutors().forEach((ceid, numThreads) -> { tmpExecutors.put(ceid, new InternalCompactionExecutor(ceid, numThreads, ceMetrics, readLimiter, writeLimiter)); }); - initParams.requestedExternalExecutors.forEach((ceid) -> { + initParams.getRequestedExternalExecutors().forEach((ceid) -> { tmpExecutors.put(ceid, externExecutorSupplier.apply(ceid)); }); @@ -415,13 +362,14 @@ public class CompactionService { if (this.plannerClassName.equals(plannerClassName) && this.plannerOpts.equals(plannerOptions)) return; - var initParams = new CpInitParams(plannerOptions); + var initParams = + new CompactionPlannerInitParams(myId, plannerOptions, new ServiceEnvironmentImpl(context)); var tmpPlanner = createPlanner(plannerClassName); tmpPlanner.init(initParams); Map<CompactionExecutorId,CompactionExecutor> tmpExecutors = new HashMap<>(); - initParams.requestedExecutors.forEach((ceid, numThreads) -> { + initParams.getRequestedExecutors().forEach((ceid, numThreads) -> { InternalCompactionExecutor executor = (InternalCompactionExecutor) executors.get(ceid); if (executor == null) { executor = @@ -432,7 +380,7 @@ public class CompactionService { tmpExecutors.put(ceid, executor); }); - initParams.requestedExternalExecutors.forEach(ceid -> { + initParams.getRequestedExternalExecutors().forEach(ceid -> { ExternalCompactionExecutor executor = (ExternalCompactionExecutor) executors.get(ceid); if (executor == null) { executor = externExecutorSupplier.apply(ceid); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index 092a543..b2b1eed 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -66,6 +66,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionServices; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionStats; @@ -1468,8 +1469,8 @@ public class CompactableImpl implements Compactable { return dispatch.getService(); } catch (RuntimeException e) { log.error("Failed to dispatch compaction {} kind:{} hints:{}, falling back to {} service.", - getExtent(), kind, debugHints, CompactionManager.DEFAULT_SERVICE, e); - return CompactionManager.DEFAULT_SERVICE; + getExtent(), kind, debugHints, CompactionServicesConfig.DEFAULT_SERVICE, e); + return CompactionServicesConfig.DEFAULT_SERVICE; } } diff --git a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java index 708632b..6b69f82 100644 --- a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java +++ b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java @@ -50,6 +50,7 @@ import org.apache.accumulo.minicluster.MiniAccumuloRunner; import org.apache.accumulo.miniclusterImpl.MiniClusterExecutable; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.MonitorExecutable; +import org.apache.accumulo.server.conf.CheckCompactionConfig; import org.apache.accumulo.server.conf.CheckServerConfig; import org.apache.accumulo.server.init.Initialize; import org.apache.accumulo.server.util.Admin; @@ -106,6 +107,7 @@ public class KeywordStartIT { assumeTrue(new File(System.getProperty("user.dir") + "/src").exists()); TreeMap<String,Class<? extends KeywordExecutable>> expectSet = new TreeMap<>(); expectSet.put("admin", Admin.class); + expectSet.put("check-compaction-config", CheckCompactionConfig.class); expectSet.put("check-server-config", CheckServerConfig.class); expectSet.put("compaction-coordinator", CoordinatorExecutable.class); expectSet.put("compactor", CompactorExecutable.class); @@ -167,6 +169,7 @@ public class KeywordStartIT { HashSet<Class<?>> expectSet = new HashSet<>(); expectSet.add(Admin.class); + expectSet.add(CheckCompactionConfig.class); expectSet.add(CreateToken.class); expectSet.add(Info.class); expectSet.add(Initialize.class); @@ -190,12 +193,11 @@ public class KeywordStartIT { private static boolean hasMain(Class<?> classToCheck) { Method main; try { - main = classToCheck.getMethod("main", new String[0].getClass()); + main = classToCheck.getMethod("main", String[].class); } catch (NoSuchMethodException e) { return false; } - return main != null && Modifier.isPublic(main.getModifiers()) - && Modifier.isStatic(main.getModifiers()); + return Modifier.isPublic(main.getModifiers()) && Modifier.isStatic(main.getModifiers()); } private static class NoOp implements KeywordExecutable {