This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new e770862  Backport BulkIngest test. Closes #1288 (#1312)
e770862 is described below

commit e770862036128d963e49e6c9715e703f9b69c270
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Thu Aug 15 09:52:57 2019 -0400

    Backport BulkIngest test. Closes #1288 (#1312)
    
    * Created classes for running the bulk generate job
    * Created run-bulk-generate.sh
    * Also updated the example test files
---
 .../accumulo/test/continuous/BulkIngest.java       | 132 ++++++++++++++
 .../accumulo/test/continuous/ContinuousIngest.java |  11 +-
 .../test/continuous/ContinuousInputFormat.java     | 194 +++++++++++++++++++++
 test/system/continuous/continuous-env.sh.example   |   8 +
 test/system/continuous/ingesters.txt.example       |   4 +-
 test/system/continuous/run-bulk-generate.sh        |  48 +++++
 test/system/continuous/walkers.txt.example         |   4 +-
 7 files changed, 393 insertions(+), 8 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/continuous/BulkIngest.java 
b/test/src/main/java/org/apache/accumulo/test/continuous/BulkIngest.java
new file mode 100644
index 0000000..22638b3
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/BulkIngest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.continuous;
+
+import java.io.BufferedOutputStream;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.UUID;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable;
+import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import 
org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.Base64;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
+/**
+ * Bulk import a million random key value pairs. Same format as 
ContinuousIngest and can be verified
+ * by running ContinuousVerify.
+ */
+public class BulkIngest extends Configured implements Tool {
+  static class Opts extends ContinuousOpts {
+    @Parameter(names = "--dir", description = "the bulk dir to use", required 
= true)
+    String dir;
+
+    @Parameter(names = "--reducers", description = "the number of reducers to 
use",
+        validateWith = PositiveInteger.class)
+    int reducers = 10;
+
+    @Parameter(names = "--mapTasks", description = "the number of map tasks to 
use",
+        validateWith = PositiveInteger.class)
+    int mapTasks = 10;
+
+    @Parameter(names = "--mapNodes",
+        description = "the number of linked list key value nodes per mapper",
+        validateWith = PositiveInteger.class)
+    int mapNodes = 1000;
+  }
+
+  public static final Logger log = LoggerFactory.getLogger(BulkIngest.class);
+
+  @Override
+  public int run(String[] args) throws Exception {
+    String ingestInstanceId = UUID.randomUUID().toString();
+
+    Job job = Job.getInstance(getConf());
+    job.setJobName("BulkIngest_" + ingestInstanceId);
+    job.setJarByClass(BulkIngest.class);
+    // very important to prevent guava conflicts
+    job.getConfiguration().set("mapreduce.job.classloader", "true");
+    FileSystem fs = FileSystem.get(job.getConfiguration());
+
+    log.info(String.format("UUID %d %s", System.currentTimeMillis(), 
ingestInstanceId));
+
+    job.setInputFormatClass(ContinuousInputFormat.class);
+
+    // map the generated random longs to key values
+    job.setMapOutputKeyClass(Key.class);
+    job.setMapOutputValueClass(Value.class);
+
+    Opts opts = new Opts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    MapReduceClientOnDefaultTable clientOpts = new 
MapReduceClientOnDefaultTable("ci");
+    clientOpts.parseArgs(BulkIngest.class.getName(), args, bwOpts, opts);
+
+    fs.mkdirs(new Path(opts.dir));
+
+    // output RFiles for the import
+    job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+
+    AccumuloFileOutputFormat.setOutputPath(job, new Path(opts.dir + "/files"));
+
+    ContinuousInputFormat.configure(job.getConfiguration(), ingestInstanceId, 
opts);
+
+    String tableName = clientOpts.getTableName();
+
+    // create splits file for KeyRangePartitioner
+    String splitsFile = opts.dir + "/splits.txt";
+
+    // make sure splits file is closed before continuing
+    try (PrintStream out =
+        new PrintStream(new BufferedOutputStream(fs.create(new 
Path(splitsFile))))) {
+      Collection<Text> splits =
+          clientOpts.getConnector().tableOperations().listSplits(tableName, 
opts.reducers - 1);
+      for (Text split : splits) {
+        out.println(Base64.encodeBase64String(split.copyBytes()));
+      }
+      job.setNumReduceTasks(splits.size() + 1);
+    }
+
+    job.setPartitionerClass(KeyRangePartitioner.class);
+    KeyRangePartitioner.setSplitFile(job, fs.getUri() + splitsFile);
+
+    job.waitForCompletion(true);
+    boolean success = job.isSuccessful();
+
+    return success ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new BulkIngest(), args);
+    System.exit(ret);
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java 
b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
index b4f6aeb..f1fd4d9 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
@@ -213,10 +213,14 @@ public class ContinuousIngest {
     Mutation m = new Mutation(new Text(rowString));
 
     m.put(new Text(cfString), new Text(cqString), cv,
-        createValue(ingestInstanceId, count, prevRow, cksum));
+        new Value(createValue(ingestInstanceId, count, prevRow, cksum)));
     return m;
   }
 
+  public static byte[] genCol(int cfInt) {
+    return FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES);
+  }
+
   public static final long genLong(long min, long max, Random r) {
     return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min;
   }
@@ -229,8 +233,7 @@ public class ContinuousIngest {
     return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES);
   }
 
-  private static Value createValue(byte[] ingestInstanceId, long count, byte[] 
prevRow,
-      Checksum cksum) {
+  static byte[] createValue(byte[] ingestInstanceId, long count, byte[] 
prevRow, Checksum cksum) {
     int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : 
prevRow.length) + 3;
     if (cksum != null)
       dataLen += 8;
@@ -258,6 +261,6 @@ public class ContinuousIngest {
 
     // System.out.println("val "+new String(val));
 
-    return new Value(val);
+    return val;
   }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java
 
b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java
new file mode 100644
index 0000000..6aed14a
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.continuous;
+
+import static org.apache.accumulo.test.continuous.ContinuousIngest.genCol;
+import static org.apache.accumulo.test.continuous.ContinuousIngest.genLong;
+import static org.apache.accumulo.test.continuous.ContinuousIngest.genRow;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Generates a continuous ingest linked list per map reduce split. Each linked 
list is of
+ * configurable length.
+ */
+public class ContinuousInputFormat extends InputFormat<Key,Value> {
+
+  private static final String PROP_UUID = "mrbulk.uuid";
+  private static final String PROP_MAP_TASK = "mrbulk.map.task";
+  private static final String PROP_MAP_NODES = "mrbulk.map.nodes";
+  private static final String PROP_ROW_MIN = "mrbulk.row.min";
+  private static final String PROP_ROW_MAX = "mrbulk.row.max";
+  private static final String PROP_FAM_MAX = "mrbulk.fam.max";
+  private static final String PROP_QUAL_MAX = "mrbulk.qual.max";
+  private static final String PROP_CHECKSUM = "mrbulk.checksum";
+
+  private static class RandomSplit extends InputSplit implements Writable {
+    @Override
+    public void write(DataOutput dataOutput) {}
+
+    @Override
+    public void readFields(DataInput dataInput) {}
+
+    @Override
+    public long getLength() {
+      return 0;
+    }
+
+    @Override
+    public String[] getLocations() {
+      return new String[0];
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext) {
+    int numTask = jobContext.getConfiguration().getInt(PROP_MAP_TASK, 1);
+    List<InputSplit> splits = new ArrayList<>();
+    for (int i = 0; i < numTask; i++) {
+      splits.add(new RandomSplit());
+    }
+    return splits;
+  }
+
+  public static void configure(Configuration conf, String uuid, 
BulkIngest.Opts opts) {
+    conf.set(PROP_UUID, uuid);
+    conf.setInt(PROP_MAP_TASK, opts.mapTasks);
+    conf.setLong(PROP_MAP_NODES, opts.mapNodes);
+    conf.setLong(PROP_ROW_MIN, opts.min);
+    conf.setLong(PROP_ROW_MAX, opts.max);
+    conf.setInt(PROP_FAM_MAX, opts.maxColF);
+    conf.setInt(PROP_QUAL_MAX, opts.maxColQ);
+    conf.setBoolean(PROP_CHECKSUM, opts.checksum);
+  }
+
+  @Override
+  public RecordReader<Key,Value> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) {
+    return new RecordReader<Key,Value>() {
+      long numNodes;
+      long nodeCount;
+      private Random random;
+
+      private byte[] uuid;
+
+      long minRow;
+      long maxRow;
+      int maxFam;
+      int maxQual;
+      boolean checksum;
+
+      Key prevKey;
+      Key currKey;
+      Value currValue;
+
+      @Override
+      public void initialize(InputSplit inputSplit, TaskAttemptContext job) {
+        numNodes = job.getConfiguration().getLong(PROP_MAP_NODES, 1000000);
+        uuid = 
job.getConfiguration().get(PROP_UUID).getBytes(StandardCharsets.UTF_8);
+
+        minRow = job.getConfiguration().getLong(PROP_ROW_MIN, 0);
+        maxRow = job.getConfiguration().getLong(PROP_ROW_MAX, Long.MAX_VALUE);
+        maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE);
+        maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, 
Short.MAX_VALUE);
+        checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false);
+
+        random = new Random(new SecureRandom().nextLong());
+
+        nodeCount = 0;
+      }
+
+      private Key genKey(CRC32 cksum) {
+
+        byte[] row = genRow(genLong(minRow, maxRow, random));
+
+        byte[] fam = genCol(random.nextInt(maxFam));
+        byte[] qual = genCol(random.nextInt(maxQual));
+
+        if (cksum != null) {
+          cksum.update(row);
+          cksum.update(fam);
+          cksum.update(qual);
+          cksum.update(new byte[0]); // TODO col vis
+        }
+
+        return new Key(row, fam, qual);
+      }
+
+      private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, 
Checksum cksum) {
+        return ContinuousIngest.createValue(ingestInstanceId, nodeCount, 
prevRow, cksum);
+      }
+
+      @Override
+      public boolean nextKeyValue() {
+
+        if (nodeCount < numNodes) {
+          CRC32 cksum = checksum ? new CRC32() : null;
+          prevKey = currKey;
+          byte[] prevRow = prevKey != null ? prevKey.getRowData().toArray() : 
null;
+          currKey = genKey(cksum);
+          currValue = new Value(createValue(uuid, prevRow, cksum));
+
+          nodeCount++;
+          return true;
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public Key getCurrentKey() {
+        return currKey;
+      }
+
+      @Override
+      public Value getCurrentValue() {
+        return currValue;
+      }
+
+      @Override
+      public float getProgress() {
+        return nodeCount * 1.0f / numNodes;
+      }
+
+      @Override
+      public void close() throws IOException {
+
+      }
+    };
+  }
+}
diff --git a/test/system/continuous/continuous-env.sh.example 
b/test/system/continuous/continuous-env.sh.example
index 0abd8c3..10cea32 100644
--- a/test/system/continuous/continuous-env.sh.example
+++ b/test/system/continuous/continuous-env.sh.example
@@ -129,3 +129,11 @@ BATCH_WALKER_THREADS=8
 # sleep in seconds
 SCANNER_SLEEP_TIME=10
 SCANNER_ENTRIES=5000
+
+# bulk ingest options
+BULK_DIR=/bulk
+# number of mappers
+BULK_MAP_TASKS=10
+# number of key value pairs to generate per mapper
+BULK_MAP_NODES=10000
+
diff --git a/test/system/continuous/ingesters.txt.example 
b/test/system/continuous/ingesters.txt.example
index b66d790..445ba7d 100644
--- a/test/system/continuous/ingesters.txt.example
+++ b/test/system/continuous/ingesters.txt.example
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-host1
-host2
+localhost
+
diff --git a/test/system/continuous/run-bulk-generate.sh 
b/test/system/continuous/run-bulk-generate.sh
new file mode 100755
index 0000000..75305b2
--- /dev/null
+++ b/test/system/continuous/run-bulk-generate.sh
@@ -0,0 +1,48 @@
+#! /usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# usage: run-bulk-generate.sh [bulk-dir]
+
+# Start: Resolve Script Directory
+SOURCE="${BASH_SOURCE[0]}"
+while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a 
symlink
+   bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
+   SOURCE=$(readlink "${SOURCE}")
+   [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a 
relative symlink, we need to resolve it relative to the path where the symlink 
file was located
+done
+bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
+script=$( basename "${SOURCE}" )
+# Stop: Resolve Script Directory
+
+CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}}
+. "$CONTINUOUS_CONF_DIR/continuous-env.sh"
+
+# first arg will override bulk dir if set
+if [[ -n "$1" ]]; then
+   echo "Overriding configured BULK_DIR=$BULK_DIR with $1"
+   BULK_DIR=$1
+fi
+
+SERVER_LIBJAR="$ACCUMULO_HOME/lib/accumulo-test.jar"
+
+VIS_OPT=''
+[[ -n $VISIBILITIES ]] && VIS_OPT="--visibilities \"$VISIBILITIES\""
+
+CHECKSUM_OPT='--addCheckSum'
+[[ $CHECKSUM == false ]] && CHECKSUM_OPT=''
+
+"$ACCUMULO_HOME/bin/tool.sh" "$SERVER_LIBJAR" 
org.apache.accumulo.test.continuous.BulkIngest 
-Dmapreduce.job.reduce.slowstart.completedmaps=0.95 -libjars "$SERVER_LIBJAR" 
"$VIS_OPT" -i $INSTANCE_NAME -z $ZOO_KEEPERS -u $USER -p $PASS --table $TABLE 
--dir $BULK_DIR --mapTasks $BULK_MAP_TASKS --mapNodes $BULK_MAP_NODES --min 
$MIN --max $MAX --maxColF $MAX_CF --maxColQ $MAX_CQ $CHECKSUM_OPT
diff --git a/test/system/continuous/walkers.txt.example 
b/test/system/continuous/walkers.txt.example
index b59052d..445ba7d 100644
--- a/test/system/continuous/walkers.txt.example
+++ b/test/system/continuous/walkers.txt.example
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-host3
-host4
+localhost
+

Reply via email to