This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch 1.9 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.9 by this push: new e770862 Backport BulkIngest test. Closes #1288 (#1312) e770862 is described below commit e770862036128d963e49e6c9715e703f9b69c270 Author: Mike Miller <mmil...@apache.org> AuthorDate: Thu Aug 15 09:52:57 2019 -0400 Backport BulkIngest test. Closes #1288 (#1312) * Created classes for running the bulk generate job * Created run-bulk-generate.sh * Also updated the example test files --- .../accumulo/test/continuous/BulkIngest.java | 132 ++++++++++++++ .../accumulo/test/continuous/ContinuousIngest.java | 11 +- .../test/continuous/ContinuousInputFormat.java | 194 +++++++++++++++++++++ test/system/continuous/continuous-env.sh.example | 8 + test/system/continuous/ingesters.txt.example | 4 +- test/system/continuous/run-bulk-generate.sh | 48 +++++ test/system/continuous/walkers.txt.example | 4 +- 7 files changed, 393 insertions(+), 8 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/BulkIngest.java b/test/src/main/java/org/apache/accumulo/test/continuous/BulkIngest.java new file mode 100644 index 0000000..22638b3 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/continuous/BulkIngest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test.continuous; + +import java.io.BufferedOutputStream; +import java.io.PrintStream; +import java.util.Collection; +import java.util.UUID; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable; +import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; +import org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.Base64; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.validators.PositiveInteger; + +/** + * Bulk import a million random key value pairs. Same format as ContinuousIngest and can be verified + * by running ContinuousVerify. + */ +public class BulkIngest extends Configured implements Tool { + static class Opts extends ContinuousOpts { + @Parameter(names = "--dir", description = "the bulk dir to use", required = true) + String dir; + + @Parameter(names = "--reducers", description = "the number of reducers to use", + validateWith = PositiveInteger.class) + int reducers = 10; + + @Parameter(names = "--mapTasks", description = "the number of map tasks to use", + validateWith = PositiveInteger.class) + int mapTasks = 10; + + @Parameter(names = "--mapNodes", + description = "the number of linked list key value nodes per mapper", + validateWith = PositiveInteger.class) + int mapNodes = 1000; + } + + public static final Logger log = LoggerFactory.getLogger(BulkIngest.class); + + @Override + public int run(String[] args) throws Exception { + String ingestInstanceId = UUID.randomUUID().toString(); + + Job job = Job.getInstance(getConf()); + job.setJobName("BulkIngest_" + ingestInstanceId); + job.setJarByClass(BulkIngest.class); + // very important to prevent guava conflicts + job.getConfiguration().set("mapreduce.job.classloader", "true"); + FileSystem fs = FileSystem.get(job.getConfiguration()); + + log.info(String.format("UUID %d %s", System.currentTimeMillis(), ingestInstanceId)); + + job.setInputFormatClass(ContinuousInputFormat.class); + + // map the generated random longs to key values + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + + Opts opts = new Opts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + MapReduceClientOnDefaultTable clientOpts = new MapReduceClientOnDefaultTable("ci"); + clientOpts.parseArgs(BulkIngest.class.getName(), args, bwOpts, opts); + + fs.mkdirs(new Path(opts.dir)); + + // output RFiles for the import + job.setOutputFormatClass(AccumuloFileOutputFormat.class); + + AccumuloFileOutputFormat.setOutputPath(job, new Path(opts.dir + "/files")); + + ContinuousInputFormat.configure(job.getConfiguration(), ingestInstanceId, opts); + + String tableName = clientOpts.getTableName(); + + // create splits file for KeyRangePartitioner + String splitsFile = opts.dir + "/splits.txt"; + + // make sure splits file is closed before continuing + try (PrintStream out = + new PrintStream(new BufferedOutputStream(fs.create(new Path(splitsFile))))) { + Collection<Text> splits = + clientOpts.getConnector().tableOperations().listSplits(tableName, opts.reducers - 1); + for (Text split : splits) { + out.println(Base64.encodeBase64String(split.copyBytes())); + } + job.setNumReduceTasks(splits.size() + 1); + } + + job.setPartitionerClass(KeyRangePartitioner.class); + KeyRangePartitioner.setSplitFile(job, fs.getUri() + splitsFile); + + job.waitForCompletion(true); + boolean success = job.isSuccessful(); + + return success ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new BulkIngest(), args); + System.exit(ret); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java index b4f6aeb..f1fd4d9 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java @@ -213,10 +213,14 @@ public class ContinuousIngest { Mutation m = new Mutation(new Text(rowString)); m.put(new Text(cfString), new Text(cqString), cv, - createValue(ingestInstanceId, count, prevRow, cksum)); + new Value(createValue(ingestInstanceId, count, prevRow, cksum))); return m; } + public static byte[] genCol(int cfInt) { + return FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES); + } + public static final long genLong(long min, long max, Random r) { return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min; } @@ -229,8 +233,7 @@ public class ContinuousIngest { return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES); } - private static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, - Checksum cksum) { + static byte[] createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) { int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3; if (cksum != null) dataLen += 8; @@ -258,6 +261,6 @@ public class ContinuousIngest { // System.out.println("val "+new String(val)); - return new Value(val); + return val; } } diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java new file mode 100644 index 0000000..6aed14a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test.continuous; + +import static org.apache.accumulo.test.continuous.ContinuousIngest.genCol; +import static org.apache.accumulo.test.continuous.ContinuousIngest.genLong; +import static org.apache.accumulo.test.continuous.ContinuousIngest.genRow; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Generates a continuous ingest linked list per map reduce split. Each linked list is of + * configurable length. + */ +public class ContinuousInputFormat extends InputFormat<Key,Value> { + + private static final String PROP_UUID = "mrbulk.uuid"; + private static final String PROP_MAP_TASK = "mrbulk.map.task"; + private static final String PROP_MAP_NODES = "mrbulk.map.nodes"; + private static final String PROP_ROW_MIN = "mrbulk.row.min"; + private static final String PROP_ROW_MAX = "mrbulk.row.max"; + private static final String PROP_FAM_MAX = "mrbulk.fam.max"; + private static final String PROP_QUAL_MAX = "mrbulk.qual.max"; + private static final String PROP_CHECKSUM = "mrbulk.checksum"; + + private static class RandomSplit extends InputSplit implements Writable { + @Override + public void write(DataOutput dataOutput) {} + + @Override + public void readFields(DataInput dataInput) {} + + @Override + public long getLength() { + return 0; + } + + @Override + public String[] getLocations() { + return new String[0]; + } + } + + @Override + public List<InputSplit> getSplits(JobContext jobContext) { + int numTask = jobContext.getConfiguration().getInt(PROP_MAP_TASK, 1); + List<InputSplit> splits = new ArrayList<>(); + for (int i = 0; i < numTask; i++) { + splits.add(new RandomSplit()); + } + return splits; + } + + public static void configure(Configuration conf, String uuid, BulkIngest.Opts opts) { + conf.set(PROP_UUID, uuid); + conf.setInt(PROP_MAP_TASK, opts.mapTasks); + conf.setLong(PROP_MAP_NODES, opts.mapNodes); + conf.setLong(PROP_ROW_MIN, opts.min); + conf.setLong(PROP_ROW_MAX, opts.max); + conf.setInt(PROP_FAM_MAX, opts.maxColF); + conf.setInt(PROP_QUAL_MAX, opts.maxColQ); + conf.setBoolean(PROP_CHECKSUM, opts.checksum); + } + + @Override + public RecordReader<Key,Value> createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) { + return new RecordReader<Key,Value>() { + long numNodes; + long nodeCount; + private Random random; + + private byte[] uuid; + + long minRow; + long maxRow; + int maxFam; + int maxQual; + boolean checksum; + + Key prevKey; + Key currKey; + Value currValue; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext job) { + numNodes = job.getConfiguration().getLong(PROP_MAP_NODES, 1000000); + uuid = job.getConfiguration().get(PROP_UUID).getBytes(StandardCharsets.UTF_8); + + minRow = job.getConfiguration().getLong(PROP_ROW_MIN, 0); + maxRow = job.getConfiguration().getLong(PROP_ROW_MAX, Long.MAX_VALUE); + maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE); + maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, Short.MAX_VALUE); + checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false); + + random = new Random(new SecureRandom().nextLong()); + + nodeCount = 0; + } + + private Key genKey(CRC32 cksum) { + + byte[] row = genRow(genLong(minRow, maxRow, random)); + + byte[] fam = genCol(random.nextInt(maxFam)); + byte[] qual = genCol(random.nextInt(maxQual)); + + if (cksum != null) { + cksum.update(row); + cksum.update(fam); + cksum.update(qual); + cksum.update(new byte[0]); // TODO col vis + } + + return new Key(row, fam, qual); + } + + private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, Checksum cksum) { + return ContinuousIngest.createValue(ingestInstanceId, nodeCount, prevRow, cksum); + } + + @Override + public boolean nextKeyValue() { + + if (nodeCount < numNodes) { + CRC32 cksum = checksum ? new CRC32() : null; + prevKey = currKey; + byte[] prevRow = prevKey != null ? prevKey.getRowData().toArray() : null; + currKey = genKey(cksum); + currValue = new Value(createValue(uuid, prevRow, cksum)); + + nodeCount++; + return true; + } else { + return false; + } + } + + @Override + public Key getCurrentKey() { + return currKey; + } + + @Override + public Value getCurrentValue() { + return currValue; + } + + @Override + public float getProgress() { + return nodeCount * 1.0f / numNodes; + } + + @Override + public void close() throws IOException { + + } + }; + } +} diff --git a/test/system/continuous/continuous-env.sh.example b/test/system/continuous/continuous-env.sh.example index 0abd8c3..10cea32 100644 --- a/test/system/continuous/continuous-env.sh.example +++ b/test/system/continuous/continuous-env.sh.example @@ -129,3 +129,11 @@ BATCH_WALKER_THREADS=8 # sleep in seconds SCANNER_SLEEP_TIME=10 SCANNER_ENTRIES=5000 + +# bulk ingest options +BULK_DIR=/bulk +# number of mappers +BULK_MAP_TASKS=10 +# number of key value pairs to generate per mapper +BULK_MAP_NODES=10000 + diff --git a/test/system/continuous/ingesters.txt.example b/test/system/continuous/ingesters.txt.example index b66d790..445ba7d 100644 --- a/test/system/continuous/ingesters.txt.example +++ b/test/system/continuous/ingesters.txt.example @@ -13,5 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -host1 -host2 +localhost + diff --git a/test/system/continuous/run-bulk-generate.sh b/test/system/continuous/run-bulk-generate.sh new file mode 100755 index 0000000..75305b2 --- /dev/null +++ b/test/system/continuous/run-bulk-generate.sh @@ -0,0 +1,48 @@ +#! /usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# usage: run-bulk-generate.sh [bulk-dir] + +# Start: Resolve Script Directory +SOURCE="${BASH_SOURCE[0]}" +while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink + bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) + SOURCE=$(readlink "${SOURCE}") + [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located +done +bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) +script=$( basename "${SOURCE}" ) +# Stop: Resolve Script Directory + +CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}} +. "$CONTINUOUS_CONF_DIR/continuous-env.sh" + +# first arg will override bulk dir if set +if [[ -n "$1" ]]; then + echo "Overriding configured BULK_DIR=$BULK_DIR with $1" + BULK_DIR=$1 +fi + +SERVER_LIBJAR="$ACCUMULO_HOME/lib/accumulo-test.jar" + +VIS_OPT='' +[[ -n $VISIBILITIES ]] && VIS_OPT="--visibilities \"$VISIBILITIES\"" + +CHECKSUM_OPT='--addCheckSum' +[[ $CHECKSUM == false ]] && CHECKSUM_OPT='' + +"$ACCUMULO_HOME/bin/tool.sh" "$SERVER_LIBJAR" org.apache.accumulo.test.continuous.BulkIngest -Dmapreduce.job.reduce.slowstart.completedmaps=0.95 -libjars "$SERVER_LIBJAR" "$VIS_OPT" -i $INSTANCE_NAME -z $ZOO_KEEPERS -u $USER -p $PASS --table $TABLE --dir $BULK_DIR --mapTasks $BULK_MAP_TASKS --mapNodes $BULK_MAP_NODES --min $MIN --max $MAX --maxColF $MAX_CF --maxColQ $MAX_CQ $CHECKSUM_OPT diff --git a/test/system/continuous/walkers.txt.example b/test/system/continuous/walkers.txt.example index b59052d..445ba7d 100644 --- a/test/system/continuous/walkers.txt.example +++ b/test/system/continuous/walkers.txt.example @@ -13,5 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -host3 -host4 +localhost +