This is an automated email from the ASF dual-hosted git repository. mwalch pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/master by this push: new d87d448 Refactored RowHash and TeraSortIngest (#68) d87d448 is described below commit d87d448278d43750cc978c9f37bacaf7e8591bb9 Author: Mike Walch <mwa...@apache.org> AuthorDate: Thu Mar 21 16:05:39 2019 -0400 Refactored RowHash and TeraSortIngest (#68) --- bin/mapred | 67 ++++++++++++++++++ conf/accumulo-testing.properties.example | 29 ++++++++ .../org/apache/accumulo/testing/TestProps.java | 16 +++++ .../apache/accumulo/testing/mapreduce/RowHash.java | 34 ++++----- .../accumulo/testing/mapreduce/TeraSortIngest.java | 82 +++++++++------------- .../randomwalk/ReplicationRandomWalkIT.java | 1 - 6 files changed, 160 insertions(+), 69 deletions(-) diff --git a/bin/mapred b/bin/mapred new file mode 100755 index 0000000..f943b45 --- /dev/null +++ b/bin/mapred @@ -0,0 +1,67 @@ +#! /usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +bin_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +at_home=$( cd "$( dirname "$bin_dir" )" && pwd ) + +function print_usage() { + cat <<EOF + +Usage: mapred <application> {-o test.<prop>=<value>} + +Available applications: + + terasort Run Terasort + rowhash Run RowHash +EOF +} + +if [ -f "$at_home/conf/env.sh" ]; then + . "$at_home"/conf/env.sh +else + . "$at_home"/conf/env.sh.example +fi + +if [ -z "$1" ]; then + echo "ERROR: <application> needs to be set" + print_usage + exit 1 +fi + +mr_package="org.apache.accumulo.testing.mapreduce" +case "$1" in + terasort) + mr_main="${mr_package}.TeraSortIngest" + ;; + rowhash) + mr_main="${mr_package}.RowHash" + ;; + *) + echo "Unknown application: $1" + print_usage + exit 1 +esac + +export CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH" + +if [ ! -z $HADOOP_HOME ]; then + export HADOOP_USE_CLIENT_CLASSLOADER=true + "$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$mr_main" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS" "${@:2}" +else + echo "Hadoop must be installed and HADOOP_HOME must be set!" + exit 1 +fi diff --git a/conf/accumulo-testing.properties.example b/conf/accumulo-testing.properties.example index 502bcde..9dbe4e0 100644 --- a/conf/accumulo-testing.properties.example +++ b/conf/accumulo-testing.properties.example @@ -119,3 +119,32 @@ test.ci.bulk.map.nodes=1000000 # produce a bulk import file. test.ci.bulk.reducers.max=1024 +################# +# MapReduce Tests +################# + +# RowHash test +# ------------ +# Table containing input data +test.rowhash.input.table = terasort +# Table where data will be output to +test.rowhash.output.table = rowhash +# Column that is fetched in input table +test.rowhash.column = c + +# TeraSort ingest +# --------------- +# Table to ingest into +test.terasort.table = terasort +# Number of rows to ingest +test.terasort.num.rows = 10000 +# Minimum key size +test.terasort.min.keysize = 10 +# Maximum key size +test.terasort.max.keysize = 10 +# Minimum value size +test.terasort.min.valuesize = 78 +# Maximum value size +test.terasort.max.valuesize = 78 +# Number of table splits +test.terasort.num.splits = 4 diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java index 3f2ca15..49ea718 100644 --- a/src/main/java/org/apache/accumulo/testing/TestProps.java +++ b/src/main/java/org/apache/accumulo/testing/TestProps.java @@ -33,6 +33,8 @@ public class TestProps { private static final String CI_SCANNER = CI + "scanner."; private static final String CI_VERIFY = CI + "verify."; private static final String CI_BULK = CI + "bulk."; + public static final String TERASORT = PREFIX + "terasort."; + public static final String ROWHASH = PREFIX + "rowhash."; /** Common properties **/ // HDFS root path. Should match 'fs.defaultFS' property in Hadoop's core-site.xml @@ -122,6 +124,20 @@ public class TestProps { public static final String CI_BULK_MAP_NODES = CI_BULK + "map.nodes"; public static final String CI_BULK_REDUCERS = CI_BULK + "reducers.max"; + /** TeraSort **/ + public static final String TERASORT_TABLE = TERASORT + "table"; + public static final String TERASORT_NUM_ROWS = TERASORT + "num.rows"; + public static final String TERASORT_MIN_KEYSIZE = TERASORT + "min.keysize"; + public static final String TERASORT_MAX_KEYSIZE = TERASORT + "max.keysize"; + public static final String TERASORT_MIN_VALUESIZE = TERASORT + "min.valuesize"; + public static final String TERASORT_MAX_VALUESIZE = TERASORT + "max.valuesize"; + public static final String TERASORT_NUM_SPLITS = TERASORT + "num.splits"; + + /** RowHash **/ + public static final String ROWHASH_INPUT_TABLE = ROWHASH + "input.table"; + public static final String ROWHASH_OUTPUT_TABLE = ROWHASH + "output.table"; + public static final String ROWHASH_COLUMN = ROWHASH + "column"; + public static Properties loadFromFile(String propsFilePath) { try { return loadFromStream(new FileInputStream(propsFilePath)); diff --git a/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java b/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java index dca7128..4722799 100644 --- a/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java +++ b/src/main/java/org/apache/accumulo/testing/mapreduce/RowHash.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Base64; import java.util.Collection; import java.util.Collections; +import java.util.Properties; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.data.Key; @@ -27,8 +28,8 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.testing.cli.ClientOpts; -import org.apache.hadoop.conf.Configuration; +import org.apache.accumulo.testing.TestEnv; +import org.apache.accumulo.testing.TestProps; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; @@ -37,8 +38,6 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.beust.jcommander.Parameter; - public class RowHash extends Configured implements Tool { /** * The Mapper class that given a row number, will generate the appropriate output line. @@ -57,23 +56,16 @@ public class RowHash extends Configured implements Tool { public void setup(Context job) {} } - private static class Opts extends ClientOpts { - @Parameter(names = "--column", required = true) - String column; - @Parameter(names = {"-t", "--table"}, required = true, description = "table to use") - String tableName; - } - @Override public int run(String[] args) throws Exception { + TestEnv env = new TestEnv(args); Job job = Job.getInstance(getConf()); job.setJobName(this.getClass().getName()); job.setJarByClass(this.getClass()); - Opts opts = new Opts(); - opts.parseArgs(RowHash.class.getName(), args); job.setInputFormatClass(AccumuloInputFormat.class); - String col = opts.column; + Properties props = env.getTestProperties(); + String col = props.getProperty(TestProps.ROWHASH_COLUMN); int idx = col.indexOf(":"); Text cf = new Text(idx < 0 ? col : col.substring(0, idx)); Text cq = idx < 0 ? null : new Text(col.substring(idx + 1)); @@ -81,12 +73,15 @@ public class RowHash extends Configured implements Tool { if (cf.getLength() > 0) cols = Collections.singleton(new IteratorSetting.Column(cf, cq)); - AccumuloInputFormat.configure().clientProperties(opts.getClientProps()).table(opts.tableName) - .auths(opts.auths).fetchColumns(cols).store(job); + String inputTable = props.getProperty(TestProps.ROWHASH_INPUT_TABLE); + String outputTable = props.getProperty(TestProps.ROWHASH_OUTPUT_TABLE); - AccumuloOutputFormat.configure().clientProperties(opts.getClientProps()) - .defaultTable(opts.tableName).createTables(true).store(job); + AccumuloInputFormat.configure().clientProperties(env.getClientProps()).table(inputTable) + .fetchColumns(cols).store(job); + AccumuloOutputFormat.configure().clientProperties(env.getClientProps()) + .defaultTable(outputTable).createTables(true).store(job); + job.getConfiguration().set("mapreduce.job.classloader", "true"); job.setMapperClass(HashDataMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Mutation.class); @@ -100,6 +95,7 @@ public class RowHash extends Configured implements Tool { } public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new RowHash(), args); + TestEnv env = new TestEnv(args); + ToolRunner.run(env.getHadoopConfiguration(), new RowHash(), args); } } diff --git a/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java b/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java index cd47e1e..ee2b5d0 100644 --- a/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java +++ b/src/main/java/org/apache/accumulo/testing/mapreduce/TeraSortIngest.java @@ -23,12 +23,15 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Objects; +import java.util.Properties; import java.util.Random; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.testing.cli.ClientOpts; +import org.apache.accumulo.testing.TestEnv; +import org.apache.accumulo.testing.TestProps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; @@ -46,8 +49,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.beust.jcommander.Parameter; - /** * Generate the *almost* official terasort input data set. (See below) The user specifies the number * of rows and the output directory and this class runs a map/reduce program to generate the data. @@ -79,6 +80,8 @@ public class TeraSortIngest extends Configured implements Tool { long firstRow; long rowCount; + RangeInputSplit() {} + public RangeInputSplit(long offset, long length) { firstRow = offset; rowCount = length; @@ -165,8 +168,8 @@ public class TeraSortIngest extends Configured implements Tool { */ @Override public List<InputSplit> getSplits(JobContext job) { - long totalRows = job.getConfiguration().getLong(NUMROWS, 0); - int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1); + long totalRows = job.getConfiguration().getLong(TestProps.TERASORT_NUM_ROWS, 0); + int numSplits = job.getConfiguration().getInt(TestProps.TERASORT_NUM_SPLITS, 1); long rowsPerSplit = totalRows / numSplits; System.out.println( "Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit); @@ -183,12 +186,9 @@ public class TeraSortIngest extends Configured implements Tool { } - private static String NUMSPLITS = "terasort.overridesplits"; - private static String NUMROWS = "terasort.numrows"; - static class RandomGenerator { private long seed = 0; - private static final long mask32 = (1l << 32) - 1; + private static final long mask32 = (1L << 32) - 1; /** * The number of iterations separating the precomputed seeds. */ @@ -343,65 +343,49 @@ public class TeraSortIngest extends Configured implements Tool { @Override public void setup(Context job) { - minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0); - maxkeylength = job.getConfiguration().getInt("cloudgen.maxkeylength", 0); - minvaluelength = job.getConfiguration().getInt("cloudgen.minvaluelength", 0); - maxvaluelength = job.getConfiguration().getInt("cloudgen.maxvaluelength", 0); - tableName = new Text(job.getConfiguration().get("cloudgen.tablename")); + minkeylength = job.getConfiguration().getInt(TestProps.TERASORT_MIN_KEYSIZE, 0); + maxkeylength = job.getConfiguration().getInt(TestProps.TERASORT_MAX_KEYSIZE, 0); + minvaluelength = job.getConfiguration().getInt(TestProps.TERASORT_MIN_VALUESIZE, 0); + maxvaluelength = job.getConfiguration().getInt(TestProps.TERASORT_MAX_VALUESIZE, 0); + tableName = new Text(job.getConfiguration().get(TestProps.TERASORT_TABLE)); } } public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new TeraSortIngest(), args); - } - - static class Opts extends ClientOpts { - @Parameter(names = {"-t", "--table"}, required = true, description = "table to use") - String tableName; - @Parameter(names = "--count", description = "number of rows to ingest", required = true) - long numRows; - @Parameter(names = {"-nk", "--minKeySize"}, description = "miniumum key size", required = true) - int minKeyLength; - @Parameter(names = {"-xk", "--maxKeySize"}, description = "maximum key size", required = true) - int maxKeyLength; - @Parameter(names = {"-nv", "--minValueSize"}, description = "minimum key size", required = true) - int minValueLength; - @Parameter(names = {"-xv", "--maxValueSize"}, description = "maximum key size", required = true) - int maxValueLength; - @Parameter(names = "--splits", description = "number of splits to create in the table") - int splits = 0; + TestEnv env = new TestEnv(args); + ToolRunner.run(env.getHadoopConfiguration(), new TeraSortIngest(), args); } @Override public int run(String[] args) throws Exception { + + TestEnv env = new TestEnv(args); + Job job = Job.getInstance(getConf()); - job.setJobName("TeraSortCloud"); + job.setJobName("TeraSortIngest"); job.setJarByClass(this.getClass()); - Opts opts = new Opts(); - opts.parseArgs(TeraSortIngest.class.getName(), args); - job.setInputFormatClass(RangeInputFormat.class); job.setMapperClass(SortGenMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Mutation.class); - job.setNumReduceTasks(0); - job.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.configure().clientProperties(opts.getClientProps()).createTables(true) - .defaultTable(opts.tableName); + Properties testProps = env.getTestProperties(); + String tableName = testProps.getProperty(TestProps.TERASORT_TABLE); + Objects.requireNonNull(tableName); + + AccumuloOutputFormat.configure().clientProperties(env.getClientProps()).createTables(true) + .defaultTable(tableName).store(job); Configuration conf = job.getConfiguration(); - conf.setLong(NUMROWS, opts.numRows); - conf.setInt("cloudgen.minkeylength", opts.minKeyLength); - conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength); - conf.setInt("cloudgen.minvaluelength", opts.minValueLength); - conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength); - conf.set("cloudgen.tablename", opts.tableName); - - if (args.length > 10) - conf.setInt(NUMSPLITS, opts.splits); + conf.set("mapreduce.job.classloader", "true"); + for (Object keyObj : testProps.keySet()) { + String key = (String) keyObj; + if (key.startsWith(TestProps.TERASORT)) { + conf.set(key, testProps.getProperty(key)); + } + } job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; diff --git a/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java b/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java index 6271fd4..972206a 100644 --- a/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java +++ b/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java @@ -50,5 +50,4 @@ public class ReplicationRandomWalkIT extends ConfigurableMacBase { r.visit(null, env, null); } - }