This is an automated email from the ASF dual-hosted git repository. mwalch pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new c482550 Simplified option parsing (#1010) c482550 is described below commit c482550082a4b2386e4270ec4b78a3ee1c402b76 Author: Mike Walch <mwa...@apache.org> AuthorDate: Tue Mar 5 13:17:26 2019 -0500 Simplified option parsing (#1010) * Removed setters in ClientOpts * Simplify RowHash options --- .../org/apache/accumulo/core/cli/ClientOpts.java | 36 +---- .../java/org/apache/accumulo/core/util/Merge.java | 11 +- .../apache/accumulo/core/cli/TestClientOpts.java | 13 +- .../lib/MapReduceClientOnDefaultTable.java | 51 ------ .../lib/MapReduceClientOnRequiredTable.java | 47 ------ .../mapreduce/lib/MapReduceClientOpts.java | 31 ++-- .../accumulo/hadoop/its/mapreduce/RowHashIT.java | 13 +- .../apache/accumulo/server/cli/ServerUtilOpts.java | 5 +- .../server/util/CheckForMetadataProblems.java | 3 +- .../apache/accumulo/server/util/LocalityCheck.java | 4 +- .../apache/accumulo/server/util/RandomWriter.java | 11 +- .../accumulo/server/util/TableDiskUsage.java | 3 +- .../server/util/VerifyTabletAssignments.java | 3 +- .../apache/accumulo/master/state/MergeStats.java | 3 +- .../java/org/apache/accumulo/tracer/TraceDump.java | 5 +- .../apache/accumulo/tracer/TraceTableStats.java | 3 +- .../org/apache/accumulo/test/TestBinaryRows.java | 3 +- .../java/org/apache/accumulo/test/TestIngest.java | 174 +++++++++++++-------- .../apache/accumulo/test/TestMultiTableIngest.java | 9 +- .../apache/accumulo/test/TestRandomDeletes.java | 7 +- .../org/apache/accumulo/test/VerifyIngest.java | 77 ++++++--- .../BalanceInPresenceOfOfflineTableIT.java | 13 +- .../apache/accumulo/test/functional/BulkIT.java | 48 +++--- .../test/functional/BulkSplitOptimizationIT.java | 24 ++- .../test/functional/ChaoticBalancerIT.java | 15 +- .../accumulo/test/functional/CompactionIT.java | 29 ++-- .../apache/accumulo/test/functional/DeleteIT.java | 22 +-- .../test/functional/DynamicThreadPoolsIT.java | 10 +- .../accumulo/test/functional/FateStarvationIT.java | 16 +- .../test/functional/FunctionalTestUtils.java | 18 +-- .../test/functional/GarbageCollectorIT.java | 20 ++- .../test/functional/HalfDeadTServerIT.java | 7 +- .../accumulo/test/functional/MasterFailoverIT.java | 13 +- .../apache/accumulo/test/functional/MaxOpenIT.java | 17 +- .../accumulo/test/functional/ReadWriteIT.java | 37 ++--- .../apache/accumulo/test/functional/RenameIT.java | 23 ++- .../apache/accumulo/test/functional/RestartIT.java | 59 +++---- .../accumulo/test/functional/RestartStressIT.java | 15 +- .../test/functional/SimpleBalancerFairnessIT.java | 7 +- .../apache/accumulo/test/functional/SplitIT.java | 15 +- .../apache/accumulo/test/functional/TableIT.java | 16 +- .../accumulo/test/functional/WriteAheadLogIT.java | 16 +- .../accumulo/test/functional/WriteLotsIT.java | 19 +-- .../apache/accumulo/test/mapreduce/RowHash.java | 105 +++---------- .../test/performance/ContinuousIngest.java | 3 +- .../test/performance/scan/CollectTabletStats.java | 3 +- 46 files changed, 448 insertions(+), 634 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java index 1e66d90..e821283 100644 --- a/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java +++ b/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java @@ -26,12 +26,9 @@ import java.util.Map; import java.util.Properties; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.clientImpl.ClientInfoImpl; import org.apache.accumulo.core.conf.ClientProperty; -import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.htrace.NullScope; @@ -47,13 +44,6 @@ import com.beust.jcommander.converters.IParameterSplitter; public class ClientOpts extends Help { - public static class MemoryConverter implements IStringConverter<Long> { - @Override - public Long convert(String value) { - return ConfigurationTypeHelper.getFixedMemoryAsBytes(value); - } - } - public static class AuthConverter implements IStringConverter<Authorizations> { @Override public Authorizations convert(String value) { @@ -96,14 +86,14 @@ public class ClientOpts extends Help { } @Parameter(names = {"-u", "--user"}, description = "Connection user") - private String principal = null; + public String principal = null; @Parameter(names = "--password", converter = PasswordConverter.class, description = "Enter the connection password", password = true) private Password securePassword = null; public AuthenticationToken getToken() { - return ClientProperty.getAuthenticationToken(getClientProperties()); + return ClientProperty.getAuthenticationToken(getClientProps()); } @Parameter(names = {"-auths", "--auths"}, converter = AuthConverter.class, @@ -146,26 +136,6 @@ public class ClientOpts extends Help { private Properties cachedProps = null; - public String getPrincipal() { - return ClientProperty.AUTH_PRINCIPAL.getValue(getClientProperties()); - } - - public void setPrincipal(String principal) { - this.principal = principal; - } - - public void setClientProperties(Properties clientProps) { - ClientProperty.validate(clientProps); - this.cachedProps = clientProps; - } - - /** - * @return {@link AccumuloClient} that must be closed by user - */ - public AccumuloClient createClient() { - return Accumulo.newClient().from(getClientProperties()).build(); - } - public String getClientConfigFile() { if (clientConfigFile == null) { URL clientPropsUrl = ClientOpts.class.getClassLoader() @@ -177,7 +147,7 @@ public class ClientOpts extends Help { return clientConfigFile; } - public Properties getClientProperties() { + public Properties getClientProps() { if (cachedProps == null) { cachedProps = new Properties(); if (getClientConfigFile() != null) { diff --git a/core/src/main/java/org/apache/accumulo/core/util/Merge.java b/core/src/main/java/org/apache/accumulo/core/util/Merge.java index 1a1c402..d3ff399 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/Merge.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Merge.java @@ -21,11 +21,13 @@ import java.util.Iterator; import java.util.List; import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.Tables; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -56,6 +58,13 @@ public class Merge { log.info(String.format(format, args)); } + public static class MemoryConverter implements IStringConverter<Long> { + @Override + public Long convert(String value) { + return ConfigurationTypeHelper.getFixedMemoryAsBytes(value); + } + } + static class TextConverter implements IStringConverter<Text> { @Override public Text convert(String value) { @@ -84,7 +93,7 @@ public class Merge { Opts opts = new Opts(); try (TraceScope clientTrace = opts.parseArgsAndTrace(Merge.class.getName(), args)) { - try (AccumuloClient client = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { if (!client.tableOperations().exists(opts.tableName)) { System.err.println("table " + opts.tableName + " does not exist"); diff --git a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java index f1e55ba..90e4ff5 100644 --- a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java +++ b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java @@ -19,7 +19,10 @@ package org.apache.accumulo.core.cli; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.Properties; + import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.ClientProperty; import org.junit.Test; public class TestClientOpts { @@ -31,16 +34,18 @@ public class TestClientOpts { "instance.zookeepers=zoo1,zoo2", "-o", "auth.type=password", "-o", "auth.principal=user123", "-o", "auth.token=mypass"}; opts.parseArgs("test", args); - assertEquals("user123", opts.getPrincipal()); + Properties props = opts.getClientProps(); + assertEquals("user123", ClientProperty.AUTH_PRINCIPAL.getValue(props)); assertTrue(opts.getToken() instanceof PasswordToken); - assertEquals("myinst", opts.getClientProperties().getProperty("instance.name")); + assertEquals("myinst", props.getProperty("instance.name")); opts = new ClientOpts(); args = new String[] {"-o", "instance.name=myinst", "-o", "instance.zookeepers=zoo1,zoo2", "-o", "auth.type=password", "-o", "auth.token=mypass", "-u", "userabc"}; opts.parseArgs("test", args); - assertEquals("userabc", opts.getPrincipal()); + props = opts.getClientProps(); + assertEquals("userabc", ClientProperty.AUTH_PRINCIPAL.getValue(props)); assertTrue(opts.getToken() instanceof PasswordToken); - assertEquals("myinst", opts.getClientProperties().getProperty("instance.name")); + assertEquals("myinst", props.getProperty("instance.name")); } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java deleted file mode 100644 index 92d5fb7..0000000 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnDefaultTable.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.hadoopImpl.mapreduce.lib; - -import java.util.Properties; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; -import org.apache.hadoop.mapreduce.Job; - -import com.beust.jcommander.Parameter; - -public class MapReduceClientOnDefaultTable extends MapReduceClientOpts { - @Parameter(names = "--table", description = "table to use") - public String tableName; - - public MapReduceClientOnDefaultTable(String table) { - this.tableName = table; - } - - public String getTableName() { - return tableName; - } - - @Override - public void setAccumuloConfigs(Job job) throws AccumuloException, AccumuloSecurityException { - final String tableName = getTableName(); - final Properties clientProps = getClientProperties(); - AccumuloInputFormat.configure().clientProperties(clientProps).table(tableName).auths(auths) - .store(job); - AccumuloOutputFormat.configure().clientProperties(clientProps).defaultTable(tableName) - .createTables(true).store(job); - } - -} diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java deleted file mode 100644 index 18ff939..0000000 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOnRequiredTable.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.hadoopImpl.mapreduce.lib; - -import java.util.Properties; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; -import org.apache.hadoop.mapreduce.Job; - -import com.beust.jcommander.Parameter; - -public class MapReduceClientOnRequiredTable extends MapReduceClientOpts { - - @Parameter(names = {"-t", "--table"}, required = true, description = "table to use") - private String tableName; - - @Override - public void setAccumuloConfigs(Job job) throws AccumuloException, AccumuloSecurityException { - final String tableName = getTableName(); - final Properties clientProps = getClientProperties(); - AccumuloInputFormat.configure().clientProperties(clientProps).table(tableName).auths(auths) - .store(job); - AccumuloOutputFormat.configure().clientProperties(clientProps).defaultTable(tableName) - .createTables(true).store(job); - } - - public String getTableName() { - return tableName; - } -} diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java index e376697..b1cfdf8 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/MapReduceClientOpts.java @@ -16,16 +16,16 @@ */ package org.apache.accumulo.hadoopImpl.mapreduce.lib; +import java.util.Properties; + import org.apache.accumulo.core.cli.ClientOpts; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.security.SystemPermission; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,17 +33,15 @@ import org.slf4j.LoggerFactory; /** * Adds some MR awareness to the ClientOpts */ -public abstract class MapReduceClientOpts extends ClientOpts { - private static final Logger log = LoggerFactory.getLogger(MapReduceClientOpts.class); +public class MapReduceClientOpts extends ClientOpts { - public abstract void setAccumuloConfigs(Job job) - throws AccumuloException, AccumuloSecurityException; + private static final Logger log = LoggerFactory.getLogger(MapReduceClientOpts.class); - @Override - public AuthenticationToken getToken() { - AuthenticationToken authToken = super.getToken(); + public Properties getClientProps() { + Properties props = super.getClientProps(); // For MapReduce, Kerberos credentials don't make it to the Mappers and Reducers, // so we need to request a delegation token and use that instead. + AuthenticationToken authToken = ClientProperty.getAuthenticationToken(props); if (authToken instanceof KerberosToken) { log.info("Received KerberosToken, fetching DelegationToken for MapReduce"); final KerberosToken krbToken = (KerberosToken) authToken; @@ -57,9 +55,8 @@ public abstract class MapReduceClientOpts extends ClientOpts { String newPrincipal = user.getUserName(); log.info("Obtaining delegation token for {}", newPrincipal); - setPrincipal(newPrincipal); - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()) - .as(newPrincipal, krbToken).build()) { + try (AccumuloClient client = Accumulo.newClient().from(props).as(newPrincipal, krbToken) + .build()) { // Do the explicit check to see if the user has the permission to get a delegation token if (!client.securityOperations().hasSystemPermission(client.whoami(), @@ -75,7 +72,11 @@ public abstract class MapReduceClientOpts extends ClientOpts { } // Get the delegation token from Accumulo - return client.securityOperations().getDelegationToken(new DelegationTokenConfig()); + AuthenticationToken token = client.securityOperations() + .getDelegationToken(new DelegationTokenConfig()); + + props.setProperty(ClientProperty.AUTH_PRINCIPAL.getKey(), newPrincipal); + ClientProperty.setAuthenticationToken(props, token); } } catch (Exception e) { final String msg = "Failed to acquire DelegationToken for use with MapReduce"; @@ -83,6 +84,6 @@ public abstract class MapReduceClientOpts extends ClientOpts { throw new RuntimeException(msg, e); } } - return authToken; + return props; } } diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java index a8823ba..06ba291 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/RowHashIT.java @@ -41,7 +41,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.hadoopImpl.mapreduce.lib.MapReduceClientOnRequiredTable; +import org.apache.accumulo.hadoopImpl.mapreduce.lib.MapReduceClientOpts; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.hadoop.conf.Configuration; @@ -128,9 +128,11 @@ public class RowHashIT extends ConfigurableMacBase { public void setup(Context job) {} } - public class Opts extends MapReduceClientOnRequiredTable { + public class Opts extends MapReduceClientOpts { @Parameter(names = "--column", required = true) String column; + @Parameter(names = {"-t", "--table"}, required = true, description = "table to use") + String tableName; } @Override @@ -141,15 +143,14 @@ public class RowHashIT extends ConfigurableMacBase { RowHash.Opts opts = new RowHash.Opts(); opts.parseArgs(RowHash.class.getName(), args); job.setInputFormatClass(AccumuloInputFormat.class); - opts.setAccumuloConfigs(job); String col = opts.column; int idx = col.indexOf(":"); Text cf = new Text(idx < 0 ? col : col.substring(0, idx)); Text cq = idx < 0 ? null : new Text(col.substring(idx + 1)); if (cf.getLength() > 0) - AccumuloInputFormat.configure().clientProperties(opts.getClientProperties()) - .table(opts.getTableName()).auths(Authorizations.EMPTY) + AccumuloInputFormat.configure().clientProperties(opts.getClientProps()) + .table(opts.tableName).auths(Authorizations.EMPTY) .fetchColumns(Collections.singleton(new IteratorSetting.Column(cf, cq))).store(job); job.setMapperClass(RowHash.HashDataMapper.class); @@ -159,7 +160,7 @@ public class RowHashIT extends ConfigurableMacBase { job.setNumReduceTasks(0); job.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.configure().clientProperties(opts.getClientProperties()).store(job); + AccumuloOutputFormat.configure().clientProperties(opts.getClientProps()).store(job); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java index 0085093..1578b9a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java +++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ServerUtilOpts.java @@ -21,9 +21,6 @@ import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.server.ServerContext; public class ServerUtilOpts extends ClientOpts { - { - setPrincipal("root"); - } private ServerContext context; @@ -32,7 +29,7 @@ public class ServerUtilOpts extends ClientOpts { if (getClientConfigFile() == null) { context = new ServerContext(new SiteConfiguration()); } else { - context = new ServerContext(new SiteConfiguration(), getClientProperties()); + context = new ServerContext(new SiteConfiguration(), getClientProps()); } } return context; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java b/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java index 0fe3fdf..1762364 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java @@ -23,6 +23,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; @@ -96,7 +97,7 @@ public class CheckForMetadataProblems { System.out.println("Checking table: " + tableNameToCheck); Map<String,TreeSet<KeyExtent>> tables = new HashMap<>(); - try (AccumuloClient client = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { Scanner scanner = client.createScanner(tableNameToCheck, Authorizations.EMPTY); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java b/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java index 8998323..087701e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; @@ -46,7 +47,8 @@ public class LocalityCheck { try (TraceScope clientSpan = opts.parseArgsAndTrace(LocalityCheck.class.getName(), args)) { VolumeManager fs = opts.getServerContext().getVolumeManager(); - try (AccumuloClient accumuloClient = opts.createClient()) { + try (AccumuloClient accumuloClient = Accumulo.newClient().from(opts.getClientProps()) + .build()) { Scanner scanner = accumuloClient.createScanner(MetadataTable.NAME, Authorizations.EMPTY); scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); scanner.fetchColumnFamily(DataFileColumnFamily.NAME); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java b/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java index c0d3a84..1c1241b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/RandomWriter.java @@ -18,11 +18,14 @@ package org.apache.accumulo.server.util; import java.security.SecureRandom; import java.util.Iterator; +import java.util.Properties; import java.util.Random; import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.hadoop.io.Text; @@ -91,11 +94,13 @@ public class RandomWriter { public static void main(String[] args) throws Exception { Opts opts = new Opts(); - opts.setPrincipal("root"); + opts.principal = "root"; try (TraceScope clientSpan = opts.parseArgsAndTrace(RandomWriter.class.getName(), args)) { long start = System.currentTimeMillis(); - log.info("starting at {} for user {}", start, opts.getPrincipal()); - try (AccumuloClient accumuloClient = opts.createClient(); + Properties clientProps = opts.getClientProps(); + String principal = ClientProperty.AUTH_PRINCIPAL.getValue(clientProps); + log.info("starting at {} for user {}", start, principal); + try (AccumuloClient accumuloClient = Accumulo.newClient().from(clientProps).build(); BatchWriter bw = accumuloClient.createBatchWriter(opts.tableName)) { log.info("Writing {} mutations...", opts.count); bw.addMutations(new RandomMutationGenerator(opts.count)); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java b/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java index 789e69c..c4085f7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -298,7 +299,7 @@ public class TableDiskUsage { public static void main(String[] args) throws Exception { Opts opts = new Opts(); try (TraceScope clientSpan = opts.parseArgsAndTrace(TableDiskUsage.class.getName(), args)) { - try (AccumuloClient client = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { VolumeManager fs = opts.getServerContext().getVolumeManager(); org.apache.accumulo.server.util.TableDiskUsage.printDiskUsage(opts.tables, fs, client, false); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java index 4346eba..dda58f4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -74,7 +75,7 @@ public class VerifyTabletAssignments { Opts opts = new Opts(); try (TraceScope clientSpan = opts.parseArgsAndTrace(VerifyTabletAssignments.class.getName(), args)) { - try (AccumuloClient client = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { for (String table : client.tableOperations().list()) checkTable((ClientContext) client, opts, table, null); } diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java index efd9422..ccbe13a 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java +++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -263,7 +264,7 @@ public class MergeStats { ServerUtilOpts opts = new ServerUtilOpts(); try (TraceScope clientSpan = opts.parseArgsAndTrace(MergeStats.class.getName(), args)) { - try (AccumuloClient client = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { Map<String,String> tableIdMap = client.tableOperations().tableIdMap(); ZooReaderWriter zooReaderWriter = opts.getServerContext().getZooReaderWriter(); for (Entry<String,String> entry : tableIdMap.entrySet()) { diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceDump.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceDump.java index 8658eb5..4ebbf60 100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceDump.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceDump.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; @@ -77,7 +78,7 @@ public class TraceDump { PrintStream out = System.out; long endTime = System.currentTimeMillis(); long startTime = endTime - opts.length; - try (AccumuloClient client = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { Scanner scanner = client.createScanner(opts.tableName, opts.auths); Range range = new Range(new Text("start:" + Long.toHexString(startTime)), new Text("start:" + Long.toHexString(endTime))); @@ -100,7 +101,7 @@ public class TraceDump { private static int dumpTrace(Opts opts) throws Exception { final PrintStream out = System.out; int count = 0; - try (AccumuloClient client = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { for (String traceId : opts.traceIds) { Scanner scanner = client.createScanner(opts.tableName, opts.auths); Range range = new Range(new Text(traceId)); diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceTableStats.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceTableStats.java index 5a7e56d..85de1b6 100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceTableStats.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceTableStats.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.TreeMap; import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -82,7 +83,7 @@ public class TraceTableStats { double maxSpanLength = 0; double maxSpanLengthMS = 0; - try (AccumuloClient client = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { Scanner scanner = client.createScanner(opts.tableName, Authorizations.EMPTY); scanner.setRange(new Range(null, true, "idx:", false)); for (Entry<Key,Value> entry : scanner) { diff --git a/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java b/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java index fe214b1..b3c3fc3 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java +++ b/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.TreeSet; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; @@ -228,7 +229,7 @@ public class TestBinaryRows { Opts opts = new Opts(); opts.parseArgs(TestBinaryRows.class.getName(), args); - try (AccumuloClient client = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { runTest(client, opts); } catch (Exception e) { throw new RuntimeException(e); diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java index b75889a..e3a5823 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java @@ -20,11 +20,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; import java.util.Map.Entry; +import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.TreeSet; import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -35,6 +37,7 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.security.SecurityErrorCode; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TabletServerBatchWriter; +import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoServiceFactory; import org.apache.accumulo.core.data.ConstraintViolationSummary; @@ -48,7 +51,6 @@ import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.util.FastFormat; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.log4j.Level; @@ -61,71 +63,114 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public class TestIngest { public static final Authorizations AUTHS = new Authorizations("L1", "L2", "G1", "GROUP2"); + public static class IngestParams { + public Properties clientProps = new Properties(); + public String tableName = "test_ingest"; + public boolean createTable = false; + public int numsplits = 1; + public int startRow = 0; + public int rows = 100000; + public int cols = 1; + public Integer random = null; + public int dataSize = 1000; + public boolean delete = false; + public long timestamp = -1; + public String outputFile = null; + public int stride; + public String columnFamily = "colf"; + public ColumnVisibility columnVisibility = new ColumnVisibility(); + + public IngestParams(Properties props) { + clientProps = props; + } + + public IngestParams(Properties props, String table) { + this(props); + tableName = table; + } + + public IngestParams(Properties props, String table, int rows) { + this(props, table); + this.rows = rows; + } + } + public static class Opts extends ClientOpts { @Parameter(names = "--table", description = "table to use") String tableName = "test_ingest"; @Parameter(names = "--createTable") - public boolean createTable = false; + boolean createTable = false; @Parameter(names = "--splits", description = "the number of splits to use when creating the table") - public int numsplits = 1; + int numsplits = 1; @Parameter(names = "--start", description = "the starting row number") - public int startRow = 0; + int startRow = 0; @Parameter(names = "--rows", description = "the number of rows to ingest") - public int rows = 100000; + int rows = 100000; @Parameter(names = "--cols", description = "the number of columns to ingest per row") - public int cols = 1; + int cols = 1; @Parameter(names = "--random", description = "insert random rows and use" + " the given number to seed the psuedo-random number generator") - public Integer random = null; + Integer random = null; @Parameter(names = "--size", description = "the size of the value to ingest") - public int dataSize = 1000; + int dataSize = 1000; @Parameter(names = "--delete", description = "delete values instead of inserting them") - public boolean delete = false; + boolean delete = false; @Parameter(names = {"-ts", "--timestamp"}, description = "timestamp to use for all values") - public long timestamp = -1; + long timestamp = -1; @Parameter(names = "--rfile", description = "generate data into a file that can be imported") - public String outputFile = null; + String outputFile = null; @Parameter(names = "--stride", description = "the difference between successive row ids") - public int stride; + int stride; @Parameter(names = {"-cf", "--columnFamily"}, description = "place columns in this column family") - public String columnFamily = "colf"; + String columnFamily = "colf"; @Parameter(names = {"-cv", "--columnVisibility"}, description = "place columns in this column family", converter = VisibilityConverter.class) - public ColumnVisibility columnVisibility = new ColumnVisibility(); - - public Configuration conf = null; - public FileSystem fs = null; - - public void setTableName(String tableName) { - this.tableName = tableName; + ColumnVisibility columnVisibility = new ColumnVisibility(); + + public IngestParams getIngestPrams() { + IngestParams params = new IngestParams(getClientProps(), tableName); + params.createTable = createTable; + params.numsplits = numsplits; + params.startRow = startRow; + params.rows = rows; + params.cols = cols; + params.random = random; + params.dataSize = dataSize; + params.delete = delete; + params.timestamp = timestamp; + params.outputFile = outputFile; + params.stride = stride; + params.columnFamily = columnFamily; + params.columnVisibility = columnVisibility; + return params; } } - public static void createTable(AccumuloClient client, Opts args) + public static void createTable(AccumuloClient client, IngestParams params) throws AccumuloException, AccumuloSecurityException, TableExistsException { - if (args.createTable) { - TreeSet<Text> splits = getSplitPoints(args.startRow, args.startRow + args.rows, - args.numsplits); + if (params.createTable) { + TreeSet<Text> splits = getSplitPoints(params.startRow, params.startRow + params.rows, + params.numsplits); - if (!client.tableOperations().exists(args.tableName)) - client.tableOperations().create(args.tableName); + if (!client.tableOperations().exists(params.tableName)) + client.tableOperations().create(params.tableName); try { - client.tableOperations().addSplits(args.tableName, splits); + client.tableOperations().addSplits(params.tableName, splits); } catch (TableNotFoundException ex) { // unlikely throw new RuntimeException(ex); @@ -190,67 +235,68 @@ public class TestIngest { if (opts.debug) Logger.getLogger(TabletServerBatchWriter.class.getName()).setLevel(Level.TRACE); - try (AccumuloClient client = opts.createClient()) { - ingest(client, opts); + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { + ingest(client, opts.getIngestPrams()); } } @SuppressFBWarnings(value = "PREDICTABLE_RANDOM", justification = "predictable random is okay for testing") - public static void ingest(AccumuloClient accumuloClient, FileSystem fs, Opts opts) + public static void ingest(AccumuloClient accumuloClient, FileSystem fs, IngestParams params) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException, TableExistsException { long stopTime; - byte[][] bytevals = generateValues(opts.dataSize); + byte[][] bytevals = generateValues(params.dataSize); - byte[] randomValue = new byte[opts.dataSize]; + byte[] randomValue = new byte[params.dataSize]; Random random = new Random(); long bytesWritten = 0; - createTable(accumuloClient, opts); + createTable(accumuloClient, params); BatchWriter bw = null; FileSKVWriter writer = null; - if (opts.outputFile != null) { + if (params.outputFile != null) { ClientContext cc = (ClientContext) accumuloClient; writer = FileOperations.getInstance().newWriterBuilder() - .forFile(opts.outputFile + "." + RFile.EXTENSION, fs, cc.getHadoopConf(), + .forFile(params.outputFile + "." + RFile.EXTENSION, fs, cc.getHadoopConf(), CryptoServiceFactory.newDefaultInstance()) .withTableConfiguration(DefaultConfiguration.getInstance()).build(); writer.startDefaultLocalityGroup(); } else { - bw = accumuloClient.createBatchWriter(opts.tableName); - accumuloClient.securityOperations().changeUserAuthorizations(opts.getPrincipal(), AUTHS); + bw = accumuloClient.createBatchWriter(params.tableName); + String principal = ClientProperty.AUTH_PRINCIPAL.getValue(params.clientProps); + accumuloClient.securityOperations().changeUserAuthorizations(principal, AUTHS); } - Text labBA = new Text(opts.columnVisibility.getExpression()); + Text labBA = new Text(params.columnVisibility.getExpression()); long startTime = System.currentTimeMillis(); - for (int i = 0; i < opts.rows; i++) { + for (int i = 0; i < params.rows; i++) { int rowid; - if (opts.stride > 0) { - rowid = ((i % opts.stride) * (opts.rows / opts.stride)) + (i / opts.stride); + if (params.stride > 0) { + rowid = ((i % params.stride) * (params.rows / params.stride)) + (i / params.stride); } else { rowid = i; } - Text row = generateRow(rowid, opts.startRow); + Text row = generateRow(rowid, params.startRow); Mutation m = new Mutation(row); - for (int j = 0; j < opts.cols; j++) { - Text colf = new Text(opts.columnFamily); + for (int j = 0; j < params.cols; j++) { + Text colf = new Text(params.columnFamily); Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX)); if (writer != null) { Key key = new Key(row, colf, colq, labBA); - if (opts.timestamp >= 0) { - key.setTimestamp(opts.timestamp); + if (params.timestamp >= 0) { + key.setTimestamp(params.timestamp); } else { key.setTimestamp(startTime); } - if (opts.delete) { + if (params.delete) { key.setDeleted(true); } else { key.setDeleted(false); @@ -258,12 +304,13 @@ public class TestIngest { bytesWritten += key.getSize(); - if (opts.delete) { + if (params.delete) { writer.append(key, new Value(new byte[0])); } else { byte[] value; - if (opts.random != null) { - value = genRandomValue(random, randomValue, opts.random, rowid + opts.startRow, j); + if (params.random != null) { + value = genRandomValue(random, randomValue, params.random, rowid + params.startRow, + j); } else { value = bytevals[j % bytevals.length]; } @@ -277,24 +324,25 @@ public class TestIngest { Key key = new Key(row, colf, colq, labBA); bytesWritten += key.getSize(); - if (opts.delete) { - if (opts.timestamp >= 0) - m.putDelete(colf, colq, opts.columnVisibility, opts.timestamp); + if (params.delete) { + if (params.timestamp >= 0) + m.putDelete(colf, colq, params.columnVisibility, params.timestamp); else - m.putDelete(colf, colq, opts.columnVisibility); + m.putDelete(colf, colq, params.columnVisibility); } else { byte[] value; - if (opts.random != null) { - value = genRandomValue(random, randomValue, opts.random, rowid + opts.startRow, j); + if (params.random != null) { + value = genRandomValue(random, randomValue, params.random, rowid + params.startRow, + j); } else { value = bytevals[j % bytevals.length]; } bytesWritten += value.length; - if (opts.timestamp >= 0) { - m.put(colf, colq, opts.columnVisibility, opts.timestamp, new Value(value, true)); + if (params.timestamp >= 0) { + m.put(colf, colq, params.columnVisibility, params.timestamp, new Value(value, true)); } else { - m.put(colf, colq, opts.columnVisibility, new Value(value, true)); + m.put(colf, colq, params.columnVisibility, new Value(value, true)); } } @@ -303,7 +351,6 @@ public class TestIngest { } if (bw != null) bw.addMutation(m); - } if (writer != null) { @@ -325,14 +372,13 @@ public class TestIngest { System.err.println("ERROR : Constraint violates : " + cvs); } } - throw e; } } stopTime = System.currentTimeMillis(); - int totalValues = opts.rows * opts.cols; + int totalValues = params.rows * params.cols; double elapsed = (stopTime - startTime) / 1000.0; System.out.printf( @@ -342,10 +388,10 @@ public class TestIngest { elapsed); } - public static void ingest(AccumuloClient c, Opts opts) + public static void ingest(AccumuloClient c, IngestParams params) throws MutationsRejectedException, IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { ClientContext cc = (ClientContext) c; - ingest(c, FileSystem.get(cc.getHadoopConf()), opts); + ingest(c, FileSystem.get(cc.getHadoopConf()), params); } } diff --git a/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java b/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java index b9f982d..f913d7a 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map.Entry; import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; @@ -75,18 +76,18 @@ public class TestMultiTableIngest { Opts opts = new Opts(); opts.parseArgs(TestMultiTableIngest.class.getName(), args); // create the test table within accumulo - try (AccumuloClient accumuloClient = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { for (int i = 0; i < opts.tables; i++) { tableNames.add(String.format(opts.prefix + "%04d", i)); } if (!opts.readonly) { for (String table : tableNames) - accumuloClient.tableOperations().create(table); + client.tableOperations().create(table); MultiTableBatchWriter b; try { - b = accumuloClient.createMultiTableBatchWriter(); + b = client.createMultiTableBatchWriter(); } catch (Exception e) { throw new RuntimeException(e); } @@ -105,7 +106,7 @@ public class TestMultiTableIngest { } } try { - readBack(opts, accumuloClient, tableNames); + readBack(opts, client, tableNames); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java b/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java index b0e8e72..cef264a 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java +++ b/test/src/main/java/org/apache/accumulo/test/TestRandomDeletes.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.TreeSet; import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; @@ -81,7 +82,7 @@ public class TestRandomDeletes { private static TreeSet<RowColumn> scanAll(TestOpts opts) throws Exception { TreeSet<RowColumn> result = new TreeSet<>(); - try (AccumuloClient client = opts.createClient(); + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build(); Scanner scanner = client.createScanner(opts.tableName, auths)) { for (Entry<Key,Value> entry : scanner) { Key key = entry.getKey(); @@ -100,8 +101,8 @@ public class TestRandomDeletes { ArrayList<RowColumn> entries = new ArrayList<>(rows); java.util.Collections.shuffle(entries); - try (AccumuloClient accumuloClient = opts.createClient(); - BatchWriter bw = accumuloClient.createBatchWriter(opts.tableName)) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build(); + BatchWriter bw = client.createBatchWriter(opts.tableName)) { for (int i = 0; i < (entries.size() + 1) / 2; i++) { RowColumn rc = entries.get(i); diff --git a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java index 21f0714..ea164c5 100644 --- a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java @@ -22,11 +22,13 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Random; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; @@ -57,9 +59,31 @@ public class VerifyIngest { return Integer.parseInt(k.getColumnQualifier().toString().split("_")[1]); } + public static class VerifyParams extends TestIngest.IngestParams { + public boolean useGet = false; + + public VerifyParams(Properties props) { + super(props); + } + + public VerifyParams(Properties props, String table) { + super(props, table); + } + + public VerifyParams(Properties props, String table, int rows) { + super(props, table, rows); + } + } + public static class Opts extends TestIngest.Opts { @Parameter(names = "-useGet", description = "fetches values one at a time, instead of scanning") public boolean useGet = false; + + public VerifyParams getVerifyParams() { + VerifyParams params = new VerifyParams(getClientProps(), tableName); + params.useGet = useGet; + return params; + } } public static void main(String[] args) throws Exception { @@ -74,8 +98,8 @@ public class VerifyIngest { if (span != null) span.addKVAnnotation("cmdLine", Arrays.asList(args).toString()); - try (AccumuloClient client = opts.createClient()) { - verifyIngest(client, opts); + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { + verifyIngest(client, opts.getVerifyParams()); } } finally { @@ -85,32 +109,33 @@ public class VerifyIngest { @SuppressFBWarnings(value = "PREDICTABLE_RANDOM", justification = "predictable random is okay for testing") - public static void verifyIngest(AccumuloClient accumuloClient, Opts opts) + public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams params) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - byte[][] bytevals = TestIngest.generateValues(opts.dataSize); + byte[][] bytevals = TestIngest.generateValues(params.dataSize); Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2"); - accumuloClient.securityOperations().changeUserAuthorizations(opts.getPrincipal(), labelAuths); + String principal = ClientProperty.AUTH_PRINCIPAL.getValue(params.clientProps); + accumuloClient.securityOperations().changeUserAuthorizations(principal, labelAuths); - int expectedRow = opts.startRow; + int expectedRow = params.startRow; int expectedCol = 0; int recsRead = 0; long bytesRead = 0; long t1 = System.currentTimeMillis(); - byte[] randomValue = new byte[opts.dataSize]; + byte[] randomValue = new byte[params.dataSize]; Random random = new Random(); - Key endKey = new Key(new Text("row_" + String.format("%010d", opts.rows + opts.startRow))); + Key endKey = new Key(new Text("row_" + String.format("%010d", params.rows + params.startRow))); int errors = 0; - while (expectedRow < (opts.rows + opts.startRow)) { + while (expectedRow < (params.rows + params.startRow)) { - if (opts.useGet) { - Text rowKey = new Text("row_" + String.format("%010d", expectedRow + opts.startRow)); - Text colf = new Text(opts.columnFamily); + if (params.useGet) { + Text rowKey = new Text("row_" + String.format("%010d", expectedRow + params.startRow)); + Text colf = new Text(params.columnFamily); Text colq = new Text("col_" + String.format("%07d", expectedCol)); try (Scanner scanner = accumuloClient.createScanner("test_ingest", labelAuths)) { @@ -128,8 +153,8 @@ public class VerifyIngest { } byte[] ev; - if (opts.random != null) { - ev = TestIngest.genRandomValue(random, randomValue, opts.random, expectedRow, + if (params.random != null) { + ev = TestIngest.genRandomValue(random, randomValue, params.random, expectedRow, expectedCol); } else { ev = bytevals[expectedCol % bytevals.length]; @@ -150,7 +175,7 @@ public class VerifyIngest { } expectedCol++; - if (expectedCol >= opts.cols) { + if (expectedCol >= params.cols) { expectedCol = 0; expectedRow++; } @@ -159,10 +184,10 @@ public class VerifyIngest { Key startKey = new Key(new Text("row_" + String.format("%010d", expectedRow))); - try (Scanner scanner = accumuloClient.createScanner(opts.tableName, labelAuths)) { + try (Scanner scanner = accumuloClient.createScanner(params.tableName, labelAuths)) { scanner.setRange(new Range(startKey, endKey)); - for (int j = 0; j < opts.cols; j++) { - scanner.fetchColumn(new Text(opts.columnFamily), + for (int j = 0; j < params.cols; j++) { + scanner.fetchColumn(new Text(params.columnFamily), new Text("col_" + String.format("%07d", j))); } @@ -190,18 +215,18 @@ public class VerifyIngest { errors++; } - if (expectedRow >= (opts.rows + opts.startRow)) { + if (expectedRow >= (params.rows + params.startRow)) { log.error( "expectedRow ({}) >= (ingestArgs.rows + ingestArgs.startRow) ({}), get" + " batch returned data passed end key", - expectedRow, (opts.rows + opts.startRow)); + expectedRow, (params.rows + params.startRow)); errors++; break; } byte[] value; - if (opts.random != null) { - value = TestIngest.genRandomValue(random, randomValue, opts.random, expectedRow, + if (params.random != null) { + value = TestIngest.genRandomValue(random, randomValue, params.random, expectedRow, colNum); } else { value = bytevals[colNum % bytevals.length]; @@ -214,14 +239,14 @@ public class VerifyIngest { errors++; } - if (opts.timestamp >= 0 && entry.getKey().getTimestamp() != opts.timestamp) { + if (params.timestamp >= 0 && entry.getKey().getTimestamp() != params.timestamp) { log.error("unexpected timestamp {}, rowNum : {} colNum : {}", entry.getKey().getTimestamp(), rowNum, colNum); errors++; } expectedCol++; - if (expectedCol >= opts.cols) { + if (expectedCol >= params.cols) { expectedCol = 0; expectedRow++; } @@ -242,9 +267,9 @@ public class VerifyIngest { throw new AccumuloException("saw " + errors + " errors "); } - if (expectedRow != (opts.rows + opts.startRow)) { + if (expectedRow != (params.rows + params.startRow)) { throw new AccumuloException("Did not read expected number of rows. Saw " - + (expectedRow - opts.startRow) + " expected " + opts.rows); + + (expectedRow - params.startRow) + " expected " + params.rows); } else { System.out.printf( "%,12d records read | %,8d records/sec | %,12d bytes read |" diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java index b7bdc79..a86962f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java @@ -44,6 +44,7 @@ import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.commons.lang.math.NumberUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -128,16 +129,10 @@ public class BalanceInPresenceOfOfflineTableIT extends AccumuloClusterHarness { log.debug("starting test ingestion"); - TestIngest.Opts opts = new TestIngest.Opts(); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - opts.setClientProperties(getClientProperties()); - vopts.setClientProperties(getClientProperties()); - vopts.rows = opts.rows = 200000; - opts.setTableName(TEST_TABLE); - TestIngest.ingest(accumuloClient, opts); + VerifyParams params = new VerifyParams(getClientProperties(), TEST_TABLE, 200_000); + TestIngest.ingest(accumuloClient, params); accumuloClient.tableOperations().flush(TEST_TABLE, null, null, true); - vopts.setTableName(TEST_TABLE); - VerifyIngest.verifyIngest(accumuloClient, vopts); + VerifyIngest.verifyIngest(accumuloClient, params); log.debug("waiting for balancing, up to ~5 minutes to allow for migration cleanup."); final long startTime = System.currentTimeMillis(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java index b03db9e..1828c1a 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java @@ -25,9 +25,9 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientInfo; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.test.TestIngest; -import org.apache.accumulo.test.TestIngest.Opts; +import org.apache.accumulo.test.TestIngest.IngestParams; import org.apache.accumulo.test.VerifyIngest; -import org.apache.hadoop.conf.Configuration; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; @@ -72,40 +72,32 @@ public class BulkIT extends AccumuloClusterHarness { fs.mkdirs(bulkFailures); fs.mkdirs(files); - Opts opts = new Opts(); - opts.timestamp = 1; - opts.random = 56; - opts.rows = N; - opts.cols = 1; - opts.setTableName(tableName); - opts.setClientProperties(info.getProperties()); - opts.conf = new Configuration(false); - opts.fs = fs; + IngestParams params = new IngestParams(info.getProperties(), tableName, N); + params.timestamp = 1; + params.random = 56; + params.cols = 1; String fileFormat = filePrefix + "rf%02d"; for (int i = 0; i < COUNT; i++) { - opts.outputFile = new Path(files, String.format(fileFormat, i)).toString(); - opts.startRow = N * i; - TestIngest.ingest(c, fs, opts); + params.outputFile = new Path(files, String.format(fileFormat, i)).toString(); + params.startRow = N * i; + TestIngest.ingest(c, fs, params); } - opts.outputFile = new Path(files, String.format(fileFormat, N)).toString(); - opts.startRow = N; - opts.rows = 1; + params.outputFile = new Path(files, String.format(fileFormat, N)).toString(); + params.startRow = N; + params.rows = 1; // create an rfile with one entry, there was a bug with this: - TestIngest.ingest(c, fs, opts); + TestIngest.ingest(c, fs, params); bulkLoad(c, tableName, bulkFailures, files, useOld); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - vopts.setTableName(tableName); - vopts.random = 56; - vopts.setClientProperties(info.getProperties()); + VerifyParams verifyParams = new VerifyParams(info.getProperties(), tableName, N); + verifyParams.random = 56; for (int i = 0; i < COUNT; i++) { - vopts.startRow = i * N; - vopts.rows = N; - VerifyIngest.verifyIngest(c, vopts); + verifyParams.startRow = i * N; + VerifyIngest.verifyIngest(c, verifyParams); } - vopts.startRow = N; - vopts.rows = 1; - VerifyIngest.verifyIngest(c, vopts); + verifyParams.startRow = N; + verifyParams.rows = 1; + VerifyIngest.verifyIngest(c, verifyParams); } @SuppressWarnings("deprecation") diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java index 63b41fe..defb91d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java @@ -26,6 +26,7 @@ import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -76,8 +77,8 @@ public class BulkSplitOptimizationIT extends AccumuloClusterHarness { } } - static final int ROWS = 100000; - static final int SPLITS = 99; + private static final int ROWS = 100000; + private static final int SPLITS = 99; @Test public void testBulkSplitOptimization() throws Exception { @@ -108,21 +109,16 @@ public class BulkSplitOptimizationIT extends AccumuloClusterHarness { } FunctionalTestUtils.checkSplits(c, tableName, 50, 100); - VerifyIngest.Opts opts = new VerifyIngest.Opts(); - opts.timestamp = 1; - opts.dataSize = 50; - opts.random = 56; - opts.rows = 100000; - opts.startRow = 0; - opts.cols = 1; - opts.setTableName(tableName); - - opts.setClientProperties(getClientProperties()); - VerifyIngest.verifyIngest(c, opts); + VerifyParams params = new VerifyParams(getClientProperties(), tableName, ROWS); + params.timestamp = 1; + params.dataSize = 50; + params.random = 56; + params.startRow = 0; + params.cols = 1; + VerifyIngest.verifyIngest(c, params); // ensure each tablet does not have all map files, should be ~2.5 files per tablet FunctionalTestUtils.checkRFiles(c, tableName, 50, 100, 1, 4); } } - } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java index 25eac32..2fbb038 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java @@ -29,6 +29,7 @@ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.master.balancer.ChaoticLoadBalancer; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.conf.Configuration; import org.junit.Test; @@ -58,19 +59,13 @@ public class ChaoticBalancerIT extends AccumuloClusterHarness { ntc.setProperties(Stream .of(new Pair<>(Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"), new Pair<>(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K")) - .collect(Collectors.toMap(k -> k.getFirst(), v -> v.getSecond()))); + .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond))); c.tableOperations().create(tableName, ntc); - TestIngest.Opts opts = new TestIngest.Opts(); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - vopts.rows = opts.rows = 20000; - opts.setTableName(tableName); - vopts.setTableName(tableName); - opts.setClientProperties(getClientProperties()); - vopts.setClientProperties(getClientProperties()); - TestIngest.ingest(c, opts); + VerifyParams params = new VerifyParams(getClientProperties(), tableName, 20_000); + TestIngest.ingest(c, params); c.tableOperations().flush(tableName, null, null, true); - VerifyIngest.verifyIngest(c, vopts); + VerifyIngest.verifyIngest(c, params); } } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index ceca72c..33ca650 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -36,6 +36,7 @@ import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -127,23 +128,17 @@ public class CompactionIT extends AccumuloClusterHarness { final int span = 500000 / 59; for (int i = 0; i < 500000; i += 500000 / 59) { final int finalI = i; - Runnable r = new Runnable() { - @Override - public void run() { - try { - VerifyIngest.Opts opts = new VerifyIngest.Opts(); - opts.startRow = finalI; - opts.rows = span; - opts.random = 56; - opts.dataSize = 50; - opts.cols = 1; - opts.setTableName(tableName); - opts.setClientProperties(getClientProperties()); - VerifyIngest.verifyIngest(c, opts); - } catch (Exception ex) { - log.warn("Got exception verifying data", ex); - fail.set(true); - } + Runnable r = () -> { + try { + VerifyParams params = new VerifyParams(getClientProperties(), tableName, span); + params.startRow = finalI; + params.random = 56; + params.dataSize = 50; + params.cols = 1; + VerifyIngest.verifyIngest(c, params); + } catch (Exception ex) { + log.warn("Got exception verifying data", ex); + fail.set(true); } }; executor.execute(r); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java index 483b0de..0f5aa22 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java @@ -24,6 +24,7 @@ import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.TestRandomDeletes; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.junit.Test; public class DeleteIT extends AccumuloClusterHarness { @@ -44,23 +45,14 @@ public class DeleteIT extends AccumuloClusterHarness { public static void deleteTest(AccumuloClient c, AccumuloCluster cluster, String tableName) throws Exception { - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - TestIngest.Opts opts = new TestIngest.Opts(); - vopts.setTableName(tableName); - opts.setTableName(tableName); - vopts.rows = opts.rows = 1000; - vopts.cols = opts.cols = 1; - vopts.random = opts.random = 56; - - opts.setClientProperties(getClientProperties()); - vopts.setClientProperties(getClientProperties()); - - TestIngest.ingest(c, opts); + VerifyParams params = new VerifyParams(getClientProperties(), tableName, 1000); + params.cols = 1; + params.random = 56; + TestIngest.ingest(c, params); assertEquals(0, cluster.getClusterControl().exec(TestRandomDeletes.class, new String[] {"-c", cluster.getClientPropsPath(), "--table", tableName})); - TestIngest.ingest(c, opts); - VerifyIngest.verifyIngest(c, vopts); + TestIngest.ingest(c, params); + VerifyIngest.verifyIngest(c, params); } - } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java index 72d7504..3de219e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestIngest.IngestParams; import org.apache.hadoop.conf.Configuration; import org.junit.After; import org.junit.Before; @@ -84,12 +85,9 @@ public class DynamicThreadPoolsIT extends AccumuloClusterHarness { String firstTable = tables[0]; try (AccumuloClient c = createAccumuloClient()) { c.instanceOperations().setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "5"); - TestIngest.Opts opts = new TestIngest.Opts(); - opts.rows = 500 * 1000; - opts.createTable = true; - opts.setTableName(firstTable); - opts.setClientProperties(getClientProperties()); - TestIngest.ingest(c, opts); + IngestParams params = new IngestParams(getClientProperties(), firstTable, 500_000); + params.createTable = true; + TestIngest.ingest(c, params); c.tableOperations().flush(firstTable, null, null, true); for (int i = 1; i < tables.length; i++) c.tableOperations().clone(firstTable, tables[i], true, null, null); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java index 37d9b66..5745ad0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java @@ -25,6 +25,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestIngest.IngestParams; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -46,15 +47,12 @@ public class FateStarvationIT extends AccumuloClusterHarness { c.tableOperations().addSplits(tableName, TestIngest.getSplitPoints(0, 100000, 50)); - TestIngest.Opts opts = new TestIngest.Opts(); - opts.random = 89; - opts.timestamp = 7; - opts.dataSize = 50; - opts.rows = 100000; - opts.cols = 1; - opts.setTableName(tableName); - opts.setClientProperties(getClientProperties()); - TestIngest.ingest(c, opts); + IngestParams params = new IngestParams(getClientProperties(), tableName, 100_000); + params.random = 89; + params.timestamp = 7; + params.dataSize = 50; + params.cols = 1; + TestIngest.ingest(c, params); c.tableOperations().flush(tableName, null, null, true); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 6a76d48..676b57d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -126,17 +126,17 @@ public class FunctionalTestUtils { ExecutorService threadPool = Executors.newFixedThreadPool(threads); final AtomicBoolean fail = new AtomicBoolean(false); for (int i = 0; i < rows; i += rows / splits) { - final TestIngest.Opts opts = new TestIngest.Opts(); - opts.outputFile = String.format("%s/mf%s", path, i); - opts.random = 56; - opts.timestamp = 1; - opts.dataSize = 50; - opts.rows = rows / splits; - opts.startRow = i; - opts.cols = 1; + TestIngest.IngestParams params = new TestIngest.IngestParams(c.properties()); + params.outputFile = String.format("%s/mf%s", path, i); + params.random = 56; + params.timestamp = 1; + params.dataSize = 50; + params.rows = rows / splits; + params.startRow = i; + params.cols = 1; threadPool.execute(() -> { try { - TestIngest.ingest(c, fs, opts); + TestIngest.ingest(c, fs, params); } catch (Exception e) { fail.set(true); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java index 05aa813..91b924d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java @@ -58,6 +58,7 @@ import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException; import org.apache.accumulo.miniclusterImpl.ProcessReference; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; @@ -109,17 +110,14 @@ public class GarbageCollectorIT extends ConfigurableMacBase { @Test public void gcTest() throws Exception { killMacGc(); + final String table = "test_ingest"; try (AccumuloClient c = createClient()) { - c.tableOperations().create("test_ingest"); - c.tableOperations().setProperty("test_ingest", Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K"); - TestIngest.Opts opts = new TestIngest.Opts(); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - vopts.rows = opts.rows = 10000; - vopts.cols = opts.cols = 1; - opts.setClientProperties(getClientProperties()); - vopts.setClientProperties(getClientProperties()); - TestIngest.ingest(c, cluster.getFileSystem(), opts); - c.tableOperations().compact("test_ingest", null, null, true, true); + c.tableOperations().create(table); + c.tableOperations().setProperty(table, Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K"); + VerifyParams params = new VerifyParams(getClientProperties(), table, 10_000); + params.cols = 1; + TestIngest.ingest(c, cluster.getFileSystem(), params); + c.tableOperations().compact(table, null, null, true, true); int before = countFiles(); while (true) { sleepUninterruptibly(1, TimeUnit.SECONDS); @@ -133,7 +131,7 @@ public class GarbageCollectorIT extends ConfigurableMacBase { getCluster().start(); sleepUninterruptibly(15, TimeUnit.SECONDS); int after = countFiles(); - VerifyIngest.verifyIngest(c, vopts); + VerifyIngest.verifyIngest(c, params); assertTrue(after < before); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java index 27be097..9ffbeaf 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java @@ -174,10 +174,9 @@ public class HalfDeadTServerIT extends ConfigurableMacBase { if (seconds <= 10) { assertEquals(0, ingest.waitFor()); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - vopts.rows = rows; - vopts.setClientProperties(getClientProperties()); - VerifyIngest.verifyIngest(c, vopts); + VerifyIngest.VerifyParams params = new VerifyIngest.VerifyParams(getClientProperties()); + params.rows = rows; + VerifyIngest.verifyIngest(c, params); } else { sleepUninterruptibly(5, TimeUnit.SECONDS); tserver.waitFor(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java index 9c2bef8..d92bcd1 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java @@ -27,6 +27,7 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.conf.Configuration; import org.junit.Test; @@ -50,10 +51,8 @@ public class MasterFailoverIT extends AccumuloClusterHarness { try (AccumuloClient c = createAccumuloClient()) { String[] names = getUniqueNames(2); c.tableOperations().create(names[0]); - TestIngest.Opts opts = new TestIngest.Opts(); - opts.setTableName(names[0]); - opts.setClientProperties(getClientProperties()); - TestIngest.ingest(c, opts); + VerifyParams params = new VerifyParams(getClientProperties(), names[0]); + TestIngest.ingest(c, params); ClusterControl control = cluster.getClusterControl(); control.stopAllServers(ServerType.MASTER); @@ -61,10 +60,8 @@ public class MasterFailoverIT extends AccumuloClusterHarness { control.startAllServers(ServerType.MASTER); // talk to it c.tableOperations().rename(names[0], names[1]); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - vopts.setTableName(names[1]); - vopts.setClientProperties(getClientProperties()); - VerifyIngest.verifyIngest(c, vopts); + params.tableName = names[1]; + VerifyIngest.verifyIngest(c, params); } } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MaxOpenIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MaxOpenIT.java index 22b3bba..5bedf8d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MaxOpenIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MaxOpenIT.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestIngest.IngestParams; import org.apache.accumulo.test.VerifyIngest; import org.apache.hadoop.conf.Configuration; import org.junit.After; @@ -103,15 +104,13 @@ public class MaxOpenIT extends AccumuloClusterHarness { // the following loop should create three tablets in each map file for (int i = 0; i < 3; i++) { - TestIngest.Opts opts = new TestIngest.Opts(); - opts.timestamp = i; - opts.dataSize = 50; - opts.rows = NUM_TO_INGEST; - opts.cols = 1; - opts.random = i; - opts.setTableName(tableName); - opts.setClientProperties(getClientProperties()); - TestIngest.ingest(c, opts); + IngestParams params = new IngestParams(getClientProperties(), tableName, NUM_TO_INGEST); + params.timestamp = i; + params.dataSize = 50; + params.rows = NUM_TO_INGEST; + params.cols = 1; + params.random = i; + TestIngest.ingest(c, params); c.tableOperations().flush(tableName, null, null, true); FunctionalTestUtils.checkRFiles(c, tableName, NUM_TABLETS, NUM_TABLETS, i + 1, i + 1); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java index dc90fc2..0c58c24 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java @@ -80,8 +80,10 @@ import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestIngest.IngestParams; import org.apache.accumulo.test.TestMultiTableIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.accumulo.test.categories.StandaloneCapableClusterTests; import org.apache.accumulo.test.categories.SunnyDayTests; import org.apache.hadoop.conf.Configuration; @@ -206,17 +208,13 @@ public class ReadWriteIT extends AccumuloClusterHarness { public static void ingest(AccumuloClient accumuloClient, ClientInfo info, int rows, int cols, int width, int offset, String colf, String tableName) throws Exception { - TestIngest.Opts opts = new TestIngest.Opts(); - opts.rows = rows; - opts.cols = cols; - opts.dataSize = width; - opts.startRow = offset; - opts.columnFamily = colf; - opts.createTable = true; - opts.setTableName(tableName); - opts.setClientProperties(info.getProperties()); - - TestIngest.ingest(accumuloClient, opts); + IngestParams params = new IngestParams(info.getProperties(), tableName, rows); + params.cols = cols; + params.dataSize = width; + params.startRow = offset; + params.columnFamily = colf; + params.createTable = true; + TestIngest.ingest(accumuloClient, params); } public static void verify(AccumuloClient accumuloClient, ClientInfo info, int rows, int cols, @@ -226,16 +224,13 @@ public class ReadWriteIT extends AccumuloClusterHarness { private static void verify(AccumuloClient accumuloClient, ClientInfo info, int rows, int cols, int width, int offset, String colf, String tableName) throws Exception { - VerifyIngest.Opts opts = new VerifyIngest.Opts(); - opts.rows = rows; - opts.cols = cols; - opts.dataSize = width; - opts.startRow = offset; - opts.columnFamily = colf; - opts.setTableName(tableName); - opts.setClientProperties(info.getProperties()); - - VerifyIngest.verifyIngest(accumuloClient, opts); + VerifyParams params = new VerifyParams(info.getProperties(), tableName, rows); + params.rows = rows; + params.dataSize = width; + params.startRow = offset; + params.columnFamily = colf; + params.cols = cols; + VerifyIngest.verifyIngest(accumuloClient, params); } public static String[] args(String... args) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java index 07c48e8..0697070 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java @@ -21,6 +21,7 @@ import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.junit.Test; public class RenameIT extends AccumuloClusterHarness { @@ -35,26 +36,20 @@ public class RenameIT extends AccumuloClusterHarness { String[] tableNames = getUniqueNames(2); String name1 = tableNames[0]; String name2 = tableNames[1]; - TestIngest.Opts opts = new TestIngest.Opts(); - opts.createTable = true; - opts.setTableName(name1); - opts.setClientProperties(cluster.getClientProperties()); + VerifyParams params = new VerifyParams(cluster.getClientProperties(), name1); + params.createTable = true; try (AccumuloClient c = createAccumuloClient()) { - TestIngest.ingest(c, opts); + TestIngest.ingest(c, params); c.tableOperations().rename(name1, name2); - TestIngest.ingest(c, opts); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - vopts.setClientProperties(cluster.getClientProperties()); - vopts.setTableName(name2); - VerifyIngest.verifyIngest(c, vopts); + TestIngest.ingest(c, params); + params.tableName = name2; + VerifyIngest.verifyIngest(c, params); c.tableOperations().delete(name1); c.tableOperations().rename(name2, name1); - vopts.setTableName(name1); - VerifyIngest.verifyIngest(c, vopts); - + params.tableName = name1; + VerifyIngest.verifyIngest(c, params); FunctionalTestUtils.assertNoDanglingFateLocks((ClientContext) c, getCluster()); } } - } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java index 83204e8..fc778e0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java @@ -42,7 +42,9 @@ import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestIngest.IngestParams; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; import org.junit.After; @@ -68,12 +70,6 @@ public class RestartIT extends AccumuloClusterHarness { hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } - private static final VerifyIngest.Opts VOPTS = new VerifyIngest.Opts(); - private static final TestIngest.Opts OPTS = new TestIngest.Opts(); - static { - OPTS.rows = VOPTS.rows = 10 * 1000; - } - private ExecutorService svc; @Before @@ -102,13 +98,13 @@ public class RestartIT extends AccumuloClusterHarness { final String tableName = getUniqueNames(1)[0]; c.tableOperations().create(tableName); final ClusterControl control = getCluster().getClusterControl(); - VOPTS.setTableName(tableName); - VOPTS.setClientProperties(getClientProperties()); + + VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000); Future<Integer> ret = svc.submit(() -> { try { return control.exec(TestIngest.class, new String[] {"-c", cluster.getClientPropsPath(), - "--rows", "" + OPTS.rows, "--table", tableName}); + "--rows", "" + params.rows, "--table", tableName}); } catch (IOException e) { log.error("Error running TestIngest", e); return -1; @@ -118,7 +114,7 @@ public class RestartIT extends AccumuloClusterHarness { control.stopAllServers(ServerType.MASTER); control.startAllServers(ServerType.MASTER); assertEquals(0, ret.get().intValue()); - VerifyIngest.verifyIngest(c, VOPTS); + VerifyIngest.verifyIngest(c, params); } } @@ -127,11 +123,8 @@ public class RestartIT extends AccumuloClusterHarness { try (AccumuloClient c = createAccumuloClient()) { String tableName = getUniqueNames(1)[0]; c.tableOperations().create(tableName); - OPTS.setTableName(tableName); - VOPTS.setTableName(tableName); - OPTS.setClientProperties(getClientProperties()); - VOPTS.setClientProperties(getClientProperties()); - TestIngest.ingest(c, OPTS); + VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000); + TestIngest.ingest(c, params); ClusterControl control = getCluster().getClusterControl(); // TODO implement a kill all too? @@ -169,7 +162,7 @@ public class RestartIT extends AccumuloClusterHarness { } } while (masterLockData != null); cluster.start(); - VerifyIngest.verifyIngest(c, VOPTS); + VerifyIngest.verifyIngest(c, params); } } @@ -181,13 +174,12 @@ public class RestartIT extends AccumuloClusterHarness { c.tableOperations().create(tableName); c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K"); - VOPTS.setTableName(tableName); - VOPTS.setClientProperties(getClientProperties()); + VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000); Future<Integer> ret = svc.submit(() -> { try { return control.exec(TestIngest.class, new String[] {"-c", cluster.getClientPropsPath(), - "--rows", "" + VOPTS.rows, "--table", tableName}); + "--rows", "" + params.rows, "--table", tableName}); } catch (Exception e) { log.error("Error running TestIngest", e); return -1; @@ -211,7 +203,7 @@ public class RestartIT extends AccumuloClusterHarness { cluster.start(); assertEquals(0, ret.get().intValue()); - VerifyIngest.verifyIngest(c, VOPTS); + VerifyIngest.verifyIngest(c, params); } } @@ -220,15 +212,12 @@ public class RestartIT extends AccumuloClusterHarness { try (AccumuloClient c = createAccumuloClient()) { String tableName = getUniqueNames(1)[0]; c.tableOperations().create(tableName); - OPTS.setTableName(tableName); - VOPTS.setTableName(tableName); - OPTS.setClientProperties(getClientProperties()); - VOPTS.setClientProperties(getClientProperties()); - TestIngest.ingest(c, OPTS); - VerifyIngest.verifyIngest(c, VOPTS); + VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000); + TestIngest.ingest(c, params); + VerifyIngest.verifyIngest(c, params); cluster.getClusterControl().stopAllServers(ServerType.TABLET_SERVER); cluster.start(); - VerifyIngest.verifyIngest(c, VOPTS); + VerifyIngest.verifyIngest(c, params); } } @@ -253,9 +242,8 @@ public class RestartIT extends AccumuloClusterHarness { try (AccumuloClient c = createAccumuloClient()) { String tableName = getUniqueNames(1)[0]; c.tableOperations().create(tableName); - OPTS.setTableName(tableName); - OPTS.setClientProperties(getClientProperties()); - TestIngest.ingest(c, OPTS); + IngestParams params = new IngestParams(getClientProperties(), tableName, 10_000); + TestIngest.ingest(c, params); try { getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); getCluster().getClusterControl().adminStopAll(); @@ -269,9 +257,7 @@ public class RestartIT extends AccumuloClusterHarness { public void shutdownDuringCompactingSplitting() throws Exception { try (AccumuloClient c = createAccumuloClient()) { String tableName = getUniqueNames(1)[0]; - VOPTS.setTableName(tableName); - OPTS.setClientProperties(getClientProperties()); - VOPTS.setClientProperties(getClientProperties()); + VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000); c.tableOperations().create(tableName); c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"); String splitThreshold = null; @@ -285,12 +271,9 @@ public class RestartIT extends AccumuloClusterHarness { try { c.tableOperations().setProperty(MetadataTable.NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(), "20K"); - TestIngest.Opts opts = new TestIngest.Opts(); - opts.setTableName(tableName); - opts.setClientProperties(getClientProperties()); - TestIngest.ingest(c, opts); + TestIngest.ingest(c, params); c.tableOperations().flush(tableName, null, null, false); - VerifyIngest.verifyIngest(c, VOPTS); + VerifyIngest.verifyIngest(c, params); getCluster().stop(); } finally { if (getClusterType() == ClusterType.STANDALONE) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java index 41bb33a..9335d8d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java @@ -33,6 +33,7 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; import org.junit.After; @@ -83,12 +84,6 @@ public class RestartStressIT extends AccumuloClusterHarness { } } - private static final VerifyIngest.Opts VOPTS; - static { - VOPTS = new VerifyIngest.Opts(); - VOPTS.rows = 10 * 1000; - } - @Test public void test() throws Exception { try (AccumuloClient c = createAccumuloClient()) { @@ -97,10 +92,12 @@ public class RestartStressIT extends AccumuloClusterHarness { c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "500K"); final ClusterControl control = getCluster().getClusterControl(); + VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000); + Future<Integer> retCode = svc.submit(() -> { try { return control.exec(TestIngest.class, new String[] {"-c", cluster.getClientPropsPath(), - "--rows", "" + VOPTS.rows, "--table", tableName}); + "--rows", "" + params.rows, "--table", tableName}); } catch (Exception e) { log.error("Error running TestIngest", e); return -1; @@ -113,9 +110,7 @@ public class RestartStressIT extends AccumuloClusterHarness { control.startAllServers(ServerType.TABLET_SERVER); } assertEquals(0, retCode.get().intValue()); - VOPTS.setTableName(tableName); - VOPTS.setClientProperties(getClientProperties()); - VerifyIngest.verifyIngest(c, VOPTS); + VerifyIngest.verifyIngest(c, params); } } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java index 95008cb..97f3b37 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java @@ -71,10 +71,9 @@ public class SimpleBalancerFairnessIT extends ConfigurableMacBase { log.info("Creating {} splits", splits.size()); c.tableOperations().addSplits("unused", splits); List<String> tservers = c.instanceOperations().getTabletServers(); - TestIngest.Opts opts = new TestIngest.Opts(); - opts.rows = 50000; - opts.setClientProperties(getClientProperties()); - TestIngest.ingest(c, opts); + TestIngest.IngestParams params = new TestIngest.IngestParams(getClientProperties()); + params.rows = 50000; + TestIngest.ingest(c, params); c.tableOperations().flush("test_ingest", null, null, false); sleepUninterruptibly(45, TimeUnit.SECONDS); Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD)); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java index c23cb9f..002a594 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java @@ -42,6 +42,7 @@ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.util.CheckForMetadataProblems; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.conf.Configuration; import org.junit.After; import org.junit.Assume; @@ -124,17 +125,9 @@ public class SplitIT extends AccumuloClusterHarness { c.tableOperations().setProperty(table, Property.TABLE_SPLIT_THRESHOLD.getKey(), "256K"); c.tableOperations().setProperty(table, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K"); - TestIngest.Opts opts = new TestIngest.Opts(); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - opts.rows = 100000; - opts.setTableName(table); - opts.setClientProperties(getClientProperties()); - - TestIngest.ingest(c, opts); - vopts.rows = opts.rows; - vopts.setTableName(table); - vopts.setClientProperties(getClientProperties()); - VerifyIngest.verifyIngest(c, vopts); + VerifyParams params = new VerifyParams(getClientProperties(), table, 100_000); + TestIngest.ingest(c, params); + VerifyIngest.verifyIngest(c, params); while (c.tableOperations().listSplits(table).size() < 10) { sleepUninterruptibly(15, TimeUnit.SECONDS); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java index 3f196c8..c385a18 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java @@ -35,6 +35,7 @@ import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.accumulo.test.categories.MiniClusterOnlyTests; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -66,15 +67,10 @@ public class TableIT extends AccumuloClusterHarness { String tableName = getUniqueNames(1)[0]; to.create(tableName); - TestIngest.Opts opts = new TestIngest.Opts(); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - opts.setClientProperties(getClientProperties()); - vopts.setClientProperties(getClientProperties()); - opts.setTableName(tableName); - TestIngest.ingest(c, opts); + VerifyParams params = new VerifyParams(getClientProperties(), tableName); + TestIngest.ingest(c, params); to.flush(tableName, null, null, true); - vopts.setTableName(tableName); - VerifyIngest.verifyIngest(c, vopts); + VerifyIngest.verifyIngest(c, params); TableId id = TableId.of(to.tableIdMap().get(tableName)); try (Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { s.setRange(new KeyExtent(id, null, null).toMetadataRange()); @@ -93,8 +89,8 @@ public class TableIT extends AccumuloClusterHarness { } assertNull(to.tableIdMap().get(tableName)); to.create(tableName); - TestIngest.ingest(c, opts); - VerifyIngest.verifyIngest(c, vopts); + TestIngest.ingest(c, params); + VerifyIngest.verifyIngest(c, params); to.delete(tableName); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java index 76f23fd..f738eb1 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java @@ -23,6 +23,7 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; import org.junit.Test; @@ -59,18 +60,11 @@ public class WriteAheadLogIT extends AccumuloClusterHarness { public static void testWAL(AccumuloClient c, String tableName) throws Exception { c.tableOperations().create(tableName); c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "750K"); - TestIngest.Opts opts = new TestIngest.Opts(); - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - opts.setTableName(tableName); - opts.setClientProperties(getClientProperties()); - vopts.setClientProperties(getClientProperties()); - - TestIngest.ingest(c, opts); - vopts.setTableName(tableName); - VerifyIngest.verifyIngest(c, vopts); + VerifyParams params = new VerifyParams(getClientProperties(), tableName); + TestIngest.ingest(c, params); + VerifyIngest.verifyIngest(c, params); getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); - VerifyIngest.verifyIngest(c, vopts); + VerifyIngest.verifyIngest(c, params); } - } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java index 9bfa724..a10595e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java @@ -26,7 +26,9 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestIngest.IngestParams; import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.junit.Test; public class WriteLotsIT extends AccumuloClusterHarness { @@ -53,12 +55,9 @@ public class WriteLotsIT extends AccumuloClusterHarness { final int index = i; Runnable r = () -> { try { - TestIngest.Opts opts = new TestIngest.Opts(); - opts.startRow = index * 10000; - opts.rows = 10000; - opts.setTableName(tableName); - opts.setClientProperties(getClientProperties()); - TestIngest.ingest(c, opts); + IngestParams ingestParams = new IngestParams(getClientProperties(), tableName, 10_000); + ingestParams.startRow = index * 10000; + TestIngest.ingest(c, ingestParams); } catch (Exception ex) { ref.set(ex); } @@ -70,12 +69,8 @@ public class WriteLotsIT extends AccumuloClusterHarness { if (ref.get() != null) { throw ref.get(); } - VerifyIngest.Opts vopts = new VerifyIngest.Opts(); - vopts.rows = 10000 * THREADS; - vopts.setTableName(tableName); - vopts.setClientProperties(getClientProperties()); - VerifyIngest.verifyIngest(c, vopts); + VerifyParams params = new VerifyParams(getClientProperties(), tableName, 10_000 * THREADS); + VerifyIngest.verifyIngest(c, params); } } - } diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java index 6282f7a..fc027e1 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java +++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/RowHash.java @@ -23,11 +23,11 @@ import java.util.Collections; import org.apache.accumulo.core.cli.ClientOpts; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.clientImpl.ClientConfConverter; +import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; @@ -70,90 +70,11 @@ public class RowHash extends Configured implements Tool { } private static class Opts extends ClientOpts { - private static final Logger log = LoggerFactory.getLogger(Opts.class); - @Parameter(names = "--column", required = true) String column; @Parameter(names = {"-t", "--table"}, required = true, description = "table to use") - private String tableName; - - public String getTableName() { - return tableName; - } - - public void setAccumuloConfigs(Job job) throws AccumuloSecurityException { - org.apache.accumulo.core.client.ClientConfiguration clientConf = ClientConfConverter - .toClientConf(this.getClientProperties()); - org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setZooKeeperInstance(job, - clientConf); - - org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setZooKeeperInstance(job, - clientConf); - - final String principal = getPrincipal(); - getTableName(); - - AuthenticationToken token = getToken(); - org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setConnectorInfo(job, principal, - token); - org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setConnectorInfo(job, - principal, token); - org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setInputTableName(job, - getTableName()); - org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanAuthorizations(job, - auths); - org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setCreateTables(job, true); - org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setDefaultTableName(job, - getTableName()); - } - - @Override - public AuthenticationToken getToken() { - AuthenticationToken authToken = super.getToken(); - // For MapReduce, Kerberos credentials don't make it to the Mappers and Reducers, - // so we need to request a delegation token and use that instead. - if (authToken instanceof KerberosToken) { - log.info("Received KerberosToken, fetching DelegationToken for MapReduce"); - final KerberosToken krbToken = (KerberosToken) authToken; - - try { - UserGroupInformation user = UserGroupInformation.getCurrentUser(); - if (!user.hasKerberosCredentials()) { - throw new IllegalStateException("Expected current user to have Kerberos credentials"); - } - - String newPrincipal = user.getUserName(); - log.info("Obtaining delegation token for {}", newPrincipal); - - setPrincipal(newPrincipal); - AccumuloClient client = Accumulo.newClient().from(getClientProperties()) - .as(newPrincipal, krbToken).build(); - - // Do the explicit check to see if the user has the permission to get a delegation token - if (!client.securityOperations().hasSystemPermission(client.whoami(), - SystemPermission.OBTAIN_DELEGATION_TOKEN)) { - log.error( - "{} doesn't have the {} SystemPermission neccesary to obtain a delegation" - + " token. MapReduce tasks cannot automatically use the client's" - + " credentials on remote servers. Delegation tokens provide a means to run" - + " MapReduce without distributing the user's credentials.", - user.getUserName(), SystemPermission.OBTAIN_DELEGATION_TOKEN.name()); - throw new IllegalStateException( - client.whoami() + " does not have permission to obtain a delegation token"); - } - - // Get the delegation token from Accumulo - return client.securityOperations().getDelegationToken(new DelegationTokenConfig()); - } catch (Exception e) { - final String msg = "Failed to acquire DelegationToken for use with MapReduce"; - log.error(msg, e); - throw new RuntimeException(msg, e); - } - } - return authToken; - } - + String tableName; } @Override @@ -163,8 +84,28 @@ public class RowHash extends Configured implements Tool { job.setJarByClass(this.getClass()); Opts opts = new Opts(); opts.parseArgs(RowHash.class.getName(), args); + job.setInputFormatClass(org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.class); - opts.setAccumuloConfigs(job); + org.apache.accumulo.core.client.ClientConfiguration clientConf = ClientConfConverter + .toClientConf(opts.getClientProps()); + org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setZooKeeperInstance(job, + clientConf); + org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setZooKeeperInstance(job, + clientConf); + + final String principal = ClientProperty.AUTH_PRINCIPAL.getValue(opts.getClientProps()); + AuthenticationToken token = opts.getToken(); + org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setConnectorInfo(job, principal, + token); + org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setConnectorInfo(job, + principal, token); + org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setInputTableName(job, + opts.tableName); + org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanAuthorizations(job, + opts.auths); + org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setCreateTables(job, true); + org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setDefaultTableName(job, + opts.tableName); String col = opts.column; int idx = col.indexOf(":"); diff --git a/test/src/main/java/org/apache/accumulo/test/performance/ContinuousIngest.java b/test/src/main/java/org/apache/accumulo/test/performance/ContinuousIngest.java index 028a5b7..631bc26 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/ContinuousIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/ContinuousIngest.java @@ -30,6 +30,7 @@ import java.util.zip.CRC32; import java.util.zip.Checksum; import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; @@ -96,7 +97,7 @@ public class ContinuousIngest { if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) { throw new IllegalArgumentException("bad min and max"); } - try (AccumuloClient client = clientOpts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(clientOpts.getClientProps()).build()) { if (!client.tableOperations().exists(clientOpts.tableName)) { throw new TableNotFoundException(null, clientOpts.tableName, diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java index e00cc0a..3c58f1b 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java @@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -215,7 +216,7 @@ public class CollectTabletStats { runTest("read tablet files w/ table iter stack", tests, opts.numThreads, threadPool); } - try (AccumuloClient client = opts.createClient()) { + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { for (int i = 0; i < opts.iterations; i++) { ArrayList<Test> tests = new ArrayList<>(); for (final KeyExtent ke : tabletsToTest) {