ACCUMULO-1992 Remove CachedConfiguration from examples
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7b7521dd Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7b7521dd Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7b7521dd Branch: refs/heads/1.6.0-SNAPSHOT Commit: 7b7521dd92b81f4c91eae2415f6d835944b15355 Parents: 5f90d0b Author: Christopher Tubbs <ctubb...@apache.org> Authored: Mon Dec 9 13:22:21 2013 -0500 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Wed Dec 11 16:59:48 2013 -0500 ---------------------------------------------------------------------- .../simple/filedata/CharacterHistogram.java | 28 +++--- .../examples/simple/mapreduce/NGramIngest.java | 36 ++++---- .../examples/simple/mapreduce/RegexExample.java | 24 ++--- .../examples/simple/mapreduce/RowHash.java | 22 ++--- .../examples/simple/mapreduce/TableToFile.java | 28 +++--- .../simple/mapreduce/TeraSortIngest.java | 97 ++++++++++---------- .../simple/mapreduce/TokenFileWordCount.java | 33 +++---- .../simple/mapreduce/UniqueColumns.java | 44 ++++----- .../examples/simple/mapreduce/WordCount.java | 29 +++--- .../mapreduce/bulk/BulkIngestExample.java | 51 +++++----- .../simple/mapreduce/bulk/GenerateTestData.java | 20 ++-- .../simple/filedata/ChunkInputFormatTest.java | 58 ++++++------ 12 files changed, 235 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/CharacterHistogram.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/CharacterHistogram.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/CharacterHistogram.java index 11eda3e..d0662b6 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/CharacterHistogram.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/CharacterHistogram.java @@ -29,8 +29,8 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.SummingArrayCombiner; import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.examples.simple.mapreduce.JobUtil; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -46,14 +46,15 @@ import com.beust.jcommander.Parameter; */ public class CharacterHistogram extends Configured implements Tool { public static final String VIS = "vis"; - + public static void main(String[] args) throws Exception { - System.exit(ToolRunner.run(CachedConfiguration.getInstance(), new CharacterHistogram(), args)); + System.exit(ToolRunner.run(new Configuration(), new CharacterHistogram(), args)); } - + public static class HistMapper extends Mapper<List<Entry<Key,Value>>,InputStream,Text,Mutation> { private ColumnVisibility cv; - + + @Override public void map(List<Entry<Key,Value>> k, InputStream v, Context context) throws IOException, InterruptedException { Long[] hist = new Long[256]; for (int i = 0; i < hist.length; i++) @@ -68,19 +69,18 @@ public class CharacterHistogram extends Configured implements Tool { m.put("info", "hist", cv, new Value(SummingArrayCombiner.STRING_ARRAY_ENCODER.encode(Arrays.asList(hist)))); context.write(new Text(), m); } - + @Override protected void setup(Context context) throws IOException, InterruptedException { cv = new ColumnVisibility(context.getConfiguration().get(VIS, "")); } } - + static class Opts extends ClientOnRequiredTable { - @Parameter(names="--vis") + @Parameter(names = "--vis") String visibilities = ""; } - - + @Override public int run(String[] args) throws Exception { Job job = JobUtil.getJob(getConf()); @@ -93,15 +93,15 @@ public class CharacterHistogram extends Configured implements Tool { job.setInputFormatClass(ChunkInputFormat.class); opts.setAccumuloConfigs(job); job.getConfiguration().set(VIS, opts.visibilities.toString()); - + job.setMapperClass(HistMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Mutation.class); - + job.setNumReduceTasks(0); - + job.setOutputFormatClass(AccumuloOutputFormat.class); - + job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java index 2f9b01a..93b589d 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java @@ -24,7 +24,7 @@ import org.apache.accumulo.core.cli.ClientOnRequiredTable; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -39,19 +39,18 @@ import org.apache.log4j.Logger; import com.beust.jcommander.Parameter; /** - * Map job to ingest n-gram files from - * http://storage.googleapis.com/books/ngrams/books/datasetsv2.html + * Map job to ingest n-gram files from http://storage.googleapis.com/books/ngrams/books/datasetsv2.html */ -public class NGramIngest extends Configured implements Tool { - +public class NGramIngest extends Configured implements Tool { + private static final Logger log = Logger.getLogger(NGramIngest.class); - - + static class Opts extends ClientOnRequiredTable { - @Parameter(names = "--input", required=true) + @Parameter(names = "--input", required = true) String inputDirectory; } - static class NGramMapper extends Mapper<LongWritable, Text, Text, Mutation> { + + static class NGramMapper extends Mapper<LongWritable,Text,Text,Mutation> { @Override protected void map(LongWritable location, Text value, Context context) throws IOException, InterruptedException { @@ -75,19 +74,18 @@ public class NGramIngest extends Configured implements Tool { Job job = JobUtil.getJob(getConf()); job.setJobName(getClass().getSimpleName()); job.setJarByClass(getClass()); - + opts.setAccumuloConfigs(job); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(AccumuloOutputFormat.class); - + job.setMapperClass(NGramMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Mutation.class); - + job.setNumReduceTasks(0); job.setSpeculativeExecution(false); - - + if (!opts.getConnector().tableOperations().exists(opts.tableName)) { log.info("Creating table " + opts.tableName); opts.getConnector().tableOperations().create(opts.tableName); @@ -95,23 +93,23 @@ public class NGramIngest extends Configured implements Tool { String numbers[] = "1 2 3 4 5 6 7 8 9".split("\\s"); String lower[] = "a b c d e f g h i j k l m n o p q r s t u v w x y z".split("\\s"); String upper[] = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z".split("\\s"); - for (String[] array : new String[][]{numbers, lower, upper}) { + for (String[] array : new String[][] {numbers, lower, upper}) { for (String s : array) { splits.add(new Text(s)); } } opts.getConnector().tableOperations().addSplits(opts.tableName, splits); } - + TextInputFormat.addInputPath(job, new Path(opts.inputDirectory)); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } - + public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new NGramIngest(), args); + int res = ToolRunner.run(new Configuration(), new NGramIngest(), args); if (res != 0) System.exit(res); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java index 9acc694..47e5879 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java @@ -24,7 +24,7 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.RegExFilter; -import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; @@ -42,7 +42,7 @@ public class RegexExample extends Configured implements Tool { context.write(row, data); } } - + static class Opts extends ClientOnRequiredTable { @Parameter(names = "--rowRegex") String rowRegex; @@ -55,7 +55,7 @@ public class RegexExample extends Configured implements Tool { @Parameter(names = "--output", required = true) String destination; } - + @Override public int run(String[] args) throws Exception { Opts opts = new Opts(); @@ -64,34 +64,34 @@ public class RegexExample extends Configured implements Tool { Job job = JobUtil.getJob(getConf()); job.setJobName(getClass().getSimpleName()); job.setJarByClass(getClass()); - + job.setInputFormatClass(AccumuloInputFormat.class); opts.setAccumuloConfigs(job); - + IteratorSetting regex = new IteratorSetting(50, "regex", RegExFilter.class); RegExFilter.setRegexs(regex, opts.rowRegex, opts.columnFamilyRegex, opts.columnQualifierRegex, opts.valueRegex, false); AccumuloInputFormat.addIterator(job, regex); - + job.setMapperClass(RegexMapper.class); job.setMapOutputKeyClass(Key.class); job.setMapOutputValueClass(Value.class); - + job.setNumReduceTasks(0); - + job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(opts.destination)); - + System.out.println("setRowRegex: " + opts.rowRegex); System.out.println("setColumnFamilyRegex: " + opts.columnFamilyRegex); System.out.println("setColumnQualifierRegex: " + opts.columnQualifierRegex); System.out.println("setValueRegex: " + opts.valueRegex); - + job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } - + public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new RegexExample(), args); + int res = ToolRunner.run(new Configuration(), new RegexExample(), args); if (res != 0) System.exit(res); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java index 2ca3587..1fa9b8f 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java @@ -25,9 +25,9 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.Pair; import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; @@ -50,16 +50,16 @@ public class RowHash extends Configured implements Tool { context.write(null, m); context.progress(); } - + @Override public void setup(Context job) {} } - + private static class Opts extends ClientOnRequiredTable { @Parameter(names = "--column", required = true) String column = null; } - + @Override public int run(String[] args) throws Exception { Job job = JobUtil.getJob(getConf()); @@ -69,27 +69,27 @@ public class RowHash extends Configured implements Tool { opts.parseArgs(RowHash.class.getName(), args); job.setInputFormatClass(AccumuloInputFormat.class); opts.setAccumuloConfigs(job); - + String col = opts.column; int idx = col.indexOf(":"); Text cf = new Text(idx < 0 ? col : col.substring(0, idx)); Text cq = idx < 0 ? null : new Text(col.substring(idx + 1)); if (cf.getLength() > 0) AccumuloInputFormat.fetchColumns(job, Collections.singleton(new Pair<Text,Text>(cf, cq))); - + job.setMapperClass(HashDataMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Mutation.class); - + job.setNumReduceTasks(0); - + job.setOutputFormatClass(AccumuloOutputFormat.class); - + job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } - + public static void main(String[] args) throws Exception { - ToolRunner.run(CachedConfiguration.getInstance(), new RowHash(), args); + ToolRunner.run(new Configuration(), new RowHash(), args); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java index 8bdc195..3a211e2 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java @@ -25,9 +25,9 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; @@ -45,14 +45,14 @@ import com.beust.jcommander.Parameter; * <tablename> <column> <hdfs-output-path> */ public class TableToFile extends Configured implements Tool { - + static class Opts extends ClientOnRequiredTable { @Parameter(names = "--output", description = "output directory", required = true) String output; @Parameter(names = "--columns", description = "columns to extract, in cf:cq{,cf:cq,...} form") String columns = ""; } - + /** * The Mapper class that given a row number, will generate the appropriate output line. */ @@ -66,12 +66,12 @@ public class TableToFile extends Configured implements Tool { public Key getKey() { return r; } - + @Override public Value getValue() { return v; } - + @Override public Value setValue(Value value) { return null; @@ -81,7 +81,7 @@ public class TableToFile extends Configured implements Tool { context.setStatus("Outputed Value"); } } - + @Override public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException { Job job = JobUtil.getJob(getConf()); @@ -89,10 +89,10 @@ public class TableToFile extends Configured implements Tool { job.setJarByClass(this.getClass()); Opts opts = new Opts(); opts.parseArgs(getClass().getName(), args); - + job.setInputFormatClass(AccumuloInputFormat.class); opts.setAccumuloConfigs(job); - + HashSet<Pair<Text,Text>> columnsToFetch = new HashSet<Pair<Text,Text>>(); for (String col : opts.columns.split(",")) { int idx = col.indexOf(":"); @@ -103,20 +103,20 @@ public class TableToFile extends Configured implements Tool { } if (!columnsToFetch.isEmpty()) AccumuloInputFormat.fetchColumns(job, columnsToFetch); - + job.setMapperClass(TTFMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); - + job.setNumReduceTasks(0); - + job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(opts.output)); - + job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } - + /** * * @param args @@ -124,6 +124,6 @@ public class TableToFile extends Configured implements Tool { * @throws Exception */ public static void main(String[] args) throws Exception { - ToolRunner.run(CachedConfiguration.getInstance(), new TableToFile(), args); + ToolRunner.run(new Configuration(), new TableToFile(), args); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java index dd2fea4..f9f2d39 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java @@ -30,7 +30,6 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; @@ -77,37 +76,37 @@ public class TeraSortIngest extends Configured implements Tool { static class RangeInputSplit extends InputSplit implements Writable { long firstRow; long rowCount; - + public RangeInputSplit() {} - + public RangeInputSplit(long offset, long length) { firstRow = offset; rowCount = length; } - + @Override public long getLength() throws IOException { return 0; } - + @Override public String[] getLocations() throws IOException { return new String[] {}; } - + @Override public void readFields(DataInput in) throws IOException { firstRow = WritableUtils.readVLong(in); rowCount = WritableUtils.readVLong(in); } - + @Override public void write(DataOutput out) throws IOException { WritableUtils.writeVLong(out, firstRow); WritableUtils.writeVLong(out, rowCount); } } - + /** * A record reader that will generate a range of numbers. */ @@ -115,36 +114,36 @@ public class TeraSortIngest extends Configured implements Tool { long startRow; long finishedRows; long totalRows; - + LongWritable currentKey; - + public RangeRecordReader(RangeInputSplit split) { startRow = split.firstRow; finishedRows = 0; totalRows = split.rowCount; } - + @Override public void close() throws IOException {} - + @Override public float getProgress() throws IOException { return finishedRows / (float) totalRows; } - + @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return new LongWritable(startRow + finishedRows); } - + @Override public NullWritable getCurrentValue() throws IOException, InterruptedException { return NullWritable.get(); } - + @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {} - + @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (finishedRows < totalRows) { @@ -154,13 +153,13 @@ public class TeraSortIngest extends Configured implements Tool { return false; } } - + @Override public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { // reporter.setStatus("Creating record reader"); return new RangeRecordReader((RangeInputSplit) split); } - + /** * Create the desired number of splits, dividing the number of rows between the mappers. */ @@ -180,12 +179,12 @@ public class TeraSortIngest extends Configured implements Tool { System.out.println("Done Generating."); return splits; } - + } - + private static String NUMSPLITS = "terasort.overridesplits"; private static String NUMROWS = "terasort.numrows"; - + static class RandomGenerator { private long seed = 0; private static final long mask32 = (1l << 32) - 1; @@ -199,7 +198,7 @@ public class TeraSortIngest extends Configured implements Tool { private static final long[] seeds = new long[] {0L, 4160749568L, 4026531840L, 3892314112L, 3758096384L, 3623878656L, 3489660928L, 3355443200L, 3221225472L, 3087007744L, 2952790016L, 2818572288L, 2684354560L, 2550136832L, 2415919104L, 2281701376L, 2147483648L, 2013265920L, 1879048192L, 1744830464L, 1610612736L, 1476395008L, 1342177280L, 1207959552L, 1073741824L, 939524096L, 805306368L, 671088640L, 536870912L, 402653184L, 268435456L, 134217728L,}; - + /** * Start the random number generator on the given iteration. * @@ -213,17 +212,17 @@ public class TeraSortIngest extends Configured implements Tool { next(); } } - + RandomGenerator() { this(0); } - + long next() { seed = (seed * 3141592621l + 663896637) & mask32; return seed; } } - + /** * The Mapper class that given a row number, will generate the appropriate output line. */ @@ -233,7 +232,7 @@ public class TeraSortIngest extends Configured implements Tool { private int maxkeylength = 0; private int minvaluelength = 0; private int maxvaluelength = 0; - + private Text key = new Text(); private Text value = new Text(); private RandomGenerator rand; @@ -248,18 +247,18 @@ public class TeraSortIngest extends Configured implements Tool { } } } - + /** * Add a random key to the text */ private Random random = new Random(); - + private void addKey() { int range = random.nextInt(maxkeylength - minkeylength + 1); int keylen = range + minkeylength; int keyceil = keylen + (4 - (keylen % 4)); keyBytes = new byte[keyceil]; - + long temp = 0; for (int i = 0; i < keyceil / 4; i++) { temp = rand.next() / 52; @@ -273,7 +272,7 @@ public class TeraSortIngest extends Configured implements Tool { } key.set(keyBytes, 0, keylen); } - + /** * Add the rowid to the row. * @@ -289,7 +288,7 @@ public class TeraSortIngest extends Configured implements Tool { paddedRowIdString.append(rowid, 0, Math.min(rowid.length, 10)); return paddedRowIdString; } - + /** * Add the required filler bytes. Each row consists of 7 blocks of 10 characters and 1 block of 8 characters. * @@ -298,22 +297,22 @@ public class TeraSortIngest extends Configured implements Tool { */ private void addFiller(long rowId) { int base = (int) ((rowId * 8) % 26); - + // Get Random var Random random = new Random(rand.seed); - + int range = random.nextInt(maxvaluelength - minvaluelength + 1); int valuelen = range + minvaluelength; - + while (valuelen > 10) { value.append(filler[(base + valuelen) % 26], 0, 10); valuelen -= 10; } - + if (valuelen > 0) value.append(filler[(base + valuelen) % 26], 0, valuelen); } - + @Override public void map(LongWritable row, NullWritable ignored, Context context) throws IOException, InterruptedException { context.setStatus("Entering"); @@ -326,18 +325,18 @@ public class TeraSortIngest extends Configured implements Tool { value.clear(); // addRowId(rowId); addFiller(rowId); - + // New Mutation m = new Mutation(key); m.put(new Text("c"), // column family getRowIdString(rowId), // column qual new Value(value.toString().getBytes())); // data - + context.setStatus("About to add to accumulo"); context.write(table, m); context.setStatus("Added to accumulo " + key.toString()); } - + @Override public void setup(Context job) { minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0); @@ -347,11 +346,11 @@ public class TeraSortIngest extends Configured implements Tool { table = new Text(job.getConfiguration().get("cloudgen.tablename")); } } - + public static void main(String[] args) throws Exception { - ToolRunner.run(CachedConfiguration.getInstance(), new TeraSortIngest(), args); + ToolRunner.run(new Configuration(), new TeraSortIngest(), args); } - + static class Opts extends ClientOnRequiredTable { @Parameter(names = "--count", description = "number of rows to ingest", required = true) long numRows; @@ -366,7 +365,7 @@ public class TeraSortIngest extends Configured implements Tool { @Parameter(names = "--splits", description = "number of splits to create in the table") int splits = 0; } - + @Override public int run(String[] args) throws Exception { Job job = JobUtil.getJob(getConf()); @@ -374,19 +373,19 @@ public class TeraSortIngest extends Configured implements Tool { job.setJarByClass(this.getClass()); Opts opts = new Opts(); opts.parseArgs(TeraSortIngest.class.getName(), args); - + job.setInputFormatClass(RangeInputFormat.class); job.setMapperClass(SortGenMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Mutation.class); - + job.setNumReduceTasks(0); - + job.setOutputFormatClass(AccumuloOutputFormat.class); opts.setAccumuloConfigs(job); BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(10L * 1000 * 1000); AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig); - + Configuration conf = job.getConfiguration(); conf.setLong(NUMROWS, opts.numRows); conf.setInt("cloudgen.minkeylength", opts.minKeyLength); @@ -394,10 +393,10 @@ public class TeraSortIngest extends Configured implements Tool { conf.setInt("cloudgen.minvaluelength", opts.minValueLength); conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength); conf.set("cloudgen.tablename", opts.tableName); - + if (args.length > 10) conf.setInt(NUMSPLITS, opts.splits); - + job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TokenFileWordCount.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TokenFileWordCount.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TokenFileWordCount.java index fc4b27f..c3f6cdb 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TokenFileWordCount.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TokenFileWordCount.java @@ -22,7 +22,7 @@ import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -38,17 +38,17 @@ import org.apache.hadoop.util.ToolRunner; * */ public class TokenFileWordCount extends Configured implements Tool { - + public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> { @Override public void map(LongWritable key, Text value, Context output) throws IOException { String[] words = value.toString().split("\\s+"); - + for (String word : words) { - + Mutation mutation = new Mutation(new Text(word)); mutation.put(new Text("count"), new Text("20080906"), new Value("1".getBytes())); - + try { output.write(null, mutation); } catch (InterruptedException e) { @@ -57,43 +57,44 @@ public class TokenFileWordCount extends Configured implements Tool { } } } - + + @Override public int run(String[] args) throws Exception { - + String instance = args[0]; String zookeepers = args[1]; String user = args[2]; String tokenFile = args[3]; String input = args[4]; String tableName = args[5]; - + Job job = JobUtil.getJob(getConf()); job.setJobName(TokenFileWordCount.class.getName()); job.setJarByClass(this.getClass()); - + job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job, input); - + job.setMapperClass(MapClass.class); - + job.setNumReduceTasks(0); - + job.setOutputFormatClass(AccumuloOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Mutation.class); - + // AccumuloInputFormat not used here, but it uses the same functions. AccumuloOutputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers)); AccumuloOutputFormat.setConnectorInfo(job, user, tokenFile); AccumuloOutputFormat.setCreateTables(job, true); AccumuloOutputFormat.setDefaultTableName(job, tableName); - + job.waitForCompletion(true); return 0; } - + public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new TokenFileWordCount(), args); + int res = ToolRunner.run(new Configuration(), new TokenFileWordCount(), args); System.exit(res); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java index 23d9d47..e0e29ce 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java @@ -26,7 +26,7 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -44,35 +44,35 @@ import com.beust.jcommander.Parameter; * table. */ public class UniqueColumns extends Configured implements Tool { - + private static final Text EMPTY = new Text(); - + public static class UMapper extends Mapper<Key,Value,Text,Text> { private Text temp = new Text(); private static final Text CF = new Text("cf:"); private static final Text CQ = new Text("cq:"); - + @Override public void map(Key key, Value value, Context context) throws IOException, InterruptedException { temp.set(CF); ByteSequence cf = key.getColumnFamilyData(); temp.append(cf.getBackingArray(), cf.offset(), cf.length()); context.write(temp, EMPTY); - + temp.set(CQ); ByteSequence cq = key.getColumnQualifierData(); temp.append(cq.getBackingArray(), cq.offset(), cq.length()); context.write(temp, EMPTY); } } - + public static class UReducer extends Reducer<Text,Text,Text,Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(key, EMPTY); } } - + static class Opts extends ClientOnRequiredTable { @Parameter(names = "--output", description = "output directory") String output; @@ -81,21 +81,21 @@ public class UniqueColumns extends Configured implements Tool { @Parameter(names = "--offline", description = "run against an offline table") boolean offline = false; } - + @Override public int run(String[] args) throws Exception { Opts opts = new Opts(); opts.parseArgs(UniqueColumns.class.getName(), args); - + String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis(); Job job = JobUtil.getJob(getConf()); job.setJobName(jobName); job.setJarByClass(this.getClass()); - + String clone = opts.tableName; Connector conn = null; - + opts.setAccumuloConfigs(job); if (opts.offline) { @@ -103,41 +103,41 @@ public class UniqueColumns extends Configured implements Tool { * this example clones the table and takes it offline. If you plan to run map reduce jobs over a table many times, it may be more efficient to compact the * table, clone it, and then keep using the same clone as input for map reduce. */ - + conn = opts.getConnector(); clone = opts.tableName + "_" + jobName; conn.tableOperations().clone(opts.tableName, clone, true, new HashMap<String,String>(), new HashSet<String>()); conn.tableOperations().offline(clone); - + AccumuloInputFormat.setOfflineTableScan(job, true); AccumuloInputFormat.setInputTableName(job, clone); } - + job.setInputFormatClass(AccumuloInputFormat.class); job.setMapperClass(UMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); - + job.setCombinerClass(UReducer.class); job.setReducerClass(UReducer.class); - + job.setNumReduceTasks(opts.reducers); - + job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(opts.output)); - + job.waitForCompletion(true); - + if (opts.offline) { conn.tableOperations().delete(clone); } - + return job.isSuccessful() ? 0 : 1; } - + public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new UniqueColumns(), args); + int res = ToolRunner.run(new Configuration(), new UniqueColumns(), args); System.exit(res); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/WordCount.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/WordCount.java index 8ca8cbc..220b85c 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/WordCount.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/WordCount.java @@ -22,7 +22,7 @@ import org.apache.accumulo.core.cli.ClientOnRequiredTable; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -40,22 +40,22 @@ import com.beust.jcommander.Parameter; * */ public class WordCount extends Configured implements Tool { - + static class Opts extends ClientOnRequiredTable { - @Parameter(names="--input", description="input directory") + @Parameter(names = "--input", description = "input directory") String inputDirectory; } - + public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> { @Override public void map(LongWritable key, Text value, Context output) throws IOException { String[] words = value.toString().split("\\s+"); - + for (String word : words) { - + Mutation mutation = new Mutation(new Text(word)); mutation.put(new Text("count"), new Text("20080906"), new Value("1".getBytes())); - + try { output.write(null, mutation); } catch (InterruptedException e) { @@ -64,7 +64,8 @@ public class WordCount extends Configured implements Tool { } } } - + + @Override public int run(String[] args) throws Exception { Opts opts = new Opts(); opts.parseArgs(WordCount.class.getName(), args); @@ -72,14 +73,14 @@ public class WordCount extends Configured implements Tool { Job job = JobUtil.getJob(getConf()); job.setJobName(WordCount.class.getName()); job.setJarByClass(this.getClass()); - + job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job, new Path(opts.inputDirectory)); - + job.setMapperClass(MapClass.class); - + job.setNumReduceTasks(0); - + job.setOutputFormatClass(AccumuloOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Mutation.class); @@ -87,8 +88,8 @@ public class WordCount extends Configured implements Tool { job.waitForCompletion(true); return 0; } - + public static void main(String[] args) throws Exception { - ToolRunner.run(CachedConfiguration.getInstance(), new WordCount(), args); + ToolRunner.run(new Configuration(), new WordCount(), args); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/BulkIngestExample.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/BulkIngestExample.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/BulkIngestExample.java index 5f9b975..72bd7eb 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/BulkIngestExample.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/BulkIngestExample.java @@ -27,7 +27,6 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.examples.simple.mapreduce.JobUtil; import org.apache.commons.codec.binary.Base64; @@ -53,7 +52,7 @@ public class BulkIngestExample extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable,Text,Text,Text> { private Text outputKey = new Text(); private Text outputValue = new Text(); - + @Override public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException { // split on tab @@ -64,7 +63,7 @@ public class BulkIngestExample extends Configured implements Tool { break; } } - + if (index > 0) { outputKey.set(value.getBytes(), 0, index); outputValue.set(value.getBytes(), index + 1, value.getLength() - (index + 1)); @@ -72,8 +71,9 @@ public class BulkIngestExample extends Configured implements Tool { } } } - + public static class ReduceClass extends Reducer<Text,Text,Key,Value> { + @Override public void reduce(Text key, Iterable<Text> values, Context output) throws IOException, InterruptedException { // be careful with the timestamp... if you run on a cluster // where the time is whacked you may not see your updates in @@ -82,82 +82,83 @@ public class BulkIngestExample extends Configured implements Tool { // cluster or consider using logical time... one options is // to let accumulo set the time long timestamp = System.currentTimeMillis(); - + int index = 0; for (Text value : values) { Key outputKey = new Key(key, new Text("colf"), new Text(String.format("col_%07d", index)), timestamp); index++; - + Value outputValue = new Value(value.getBytes(), 0, value.getLength()); output.write(outputKey, outputValue); } } } - + static class Opts extends ClientOnRequiredTable { - @Parameter(names="--inputDir", required=true) + @Parameter(names = "--inputDir", required = true) String inputDir; - @Parameter(names="--workDir", required=true) + @Parameter(names = "--workDir", required = true) String workDir; } - + + @Override public int run(String[] args) { Opts opts = new Opts(); opts.parseArgs(BulkIngestExample.class.getName(), args); - + Configuration conf = getConf(); PrintStream out = null; try { Job job = JobUtil.getJob(conf); job.setJobName("bulk ingest example"); job.setJarByClass(this.getClass()); - + job.setInputFormatClass(TextInputFormat.class); - + job.setMapperClass(MapClass.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); - + job.setReducerClass(ReduceClass.class); job.setOutputFormatClass(AccumuloFileOutputFormat.class); opts.setAccumuloConfigs(job); - + Connector connector = opts.getConnector(); - + TextInputFormat.setInputPaths(job, new Path(opts.inputDir)); AccumuloFileOutputFormat.setOutputPath(job, new Path(opts.workDir + "/files")); - + FileSystem fs = FileSystem.get(conf); out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.workDir + "/splits.txt")))); - + Collection<Text> splits = connector.tableOperations().listSplits(opts.tableName, 100); for (Text split : splits) out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split)))); - + job.setNumReduceTasks(splits.size() + 1); out.close(); - + job.setPartitionerClass(RangePartitioner.class); RangePartitioner.setSplitFile(job, opts.workDir + "/splits.txt"); - + job.waitForCompletion(true); Path failures = new Path(opts.workDir, "failures"); fs.delete(failures, true); fs.mkdirs(new Path(opts.workDir, "failures")); connector.tableOperations().importDirectory(opts.tableName, opts.workDir + "/files", opts.workDir + "/failures", false); - + } catch (Exception e) { throw new RuntimeException(e); } finally { if (out != null) out.close(); } - + return 0; } - + public static void main(String[] args) throws Exception { - int res = ToolRunner.run(CachedConfiguration.getInstance(), new BulkIngestExample(), args); + int res = ToolRunner.run(new Configuration(), new BulkIngestExample(), args); System.exit(res); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/GenerateTestData.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/GenerateTestData.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/GenerateTestData.java index c1a13b3..5cb4a0b 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/GenerateTestData.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/bulk/GenerateTestData.java @@ -20,34 +20,34 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.io.PrintStream; -import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.beust.jcommander.Parameter; public class GenerateTestData { - + static class Opts extends org.apache.accumulo.core.cli.Help { - @Parameter(names="--start-row", required=true) + @Parameter(names = "--start-row", required = true) int startRow = 0; - @Parameter(names="--count", required=true) + @Parameter(names = "--count", required = true) int numRows = 0; - @Parameter(names="--output", required=true) + @Parameter(names = "--output", required = true) String outputFile; } - + public static void main(String[] args) throws IOException { Opts opts = new Opts(); opts.parseArgs(GenerateTestData.class.getName(), args); - - FileSystem fs = FileSystem.get(CachedConfiguration.getInstance()); + + FileSystem fs = FileSystem.get(new Configuration()); PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.outputFile)))); - + for (int i = 0; i < opts.numRows; i++) { out.println(String.format("row_%010d\tvalue_%010d", i + opts.startRow, i + opts.startRow)); } out.close(); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/7b7521dd/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java b/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java index 3d99838..dab1e10 100644 --- a/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java +++ b/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java @@ -34,8 +34,8 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.examples.simple.mapreduce.JobUtil; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -48,12 +48,12 @@ public class ChunkInputFormatTest extends TestCase { private static AssertionError e1 = null; private static AssertionError e2 = null; private static IOException e3 = null; - + private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D"); - + private static List<Entry<Key,Value>> data; private static List<Entry<Key,Value>> baddata; - + { data = new ArrayList<Entry<Key,Value>>(); ChunkInputStreamTest.addData(data, "a", "refs", "ida\0ext", "A&B", "ext"); @@ -71,16 +71,16 @@ public class ChunkInputFormatTest extends TestCase { ChunkInputStreamTest.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext"); ChunkInputStreamTest.addData(baddata, "c", "refs", "ida\0name", "A&B", "name"); } - + public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) { assertEquals(e1.getKey(), e2.getKey()); assertEquals(e1.getValue(), e2.getValue()); } - + public static class CIFTester extends Configured implements Tool { public static class TestMapper extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> { int count = 0; - + @Override protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException { byte[] b = new byte[20]; @@ -113,7 +113,7 @@ public class ChunkInputFormatTest extends TestCase { } count++; } - + @Override protected void cleanup(Context context) throws IOException, InterruptedException { try { @@ -123,10 +123,10 @@ public class ChunkInputFormatTest extends TestCase { } } } - + public static class TestNoClose extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> { int count = 0; - + @Override protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException { byte[] b = new byte[5]; @@ -152,7 +152,7 @@ public class ChunkInputFormatTest extends TestCase { } } } - + public static class TestBadData extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> { @Override protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException { @@ -182,13 +182,13 @@ public class ChunkInputFormatTest extends TestCase { } catch (Exception e) {} } } - + @Override public int run(String[] args) throws Exception { if (args.length != 5) { throw new IllegalArgumentException("Usage : " + CIFTester.class.getName() + " <instance name> <user> <pass> <table> <mapperClass>"); } - + String instance = args[0]; String user = args[1]; String pass = args[2]; @@ -197,39 +197,39 @@ public class ChunkInputFormatTest extends TestCase { Job job = JobUtil.getJob(getConf()); job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); job.setJarByClass(this.getClass()); - + job.setInputFormatClass(ChunkInputFormat.class); - + ChunkInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); ChunkInputFormat.setInputTableName(job, table); ChunkInputFormat.setScanAuthorizations(job, AUTHS); ChunkInputFormat.setMockInstance(job, instance); - + @SuppressWarnings("unchecked") Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class.forName(args[4]); job.setMapperClass(forName); job.setMapOutputKeyClass(Key.class); job.setMapOutputValueClass(Value.class); job.setOutputFormatClass(NullOutputFormat.class); - + job.setNumReduceTasks(0); - + job.waitForCompletion(true); - + return job.isSuccessful() ? 0 : 1; } - + public static int main(String[] args) throws Exception { - return ToolRunner.run(CachedConfiguration.getInstance(), new CIFTester(), args); + return ToolRunner.run(new Configuration(), new CIFTester(), args); } } - + public void test() throws Exception { MockInstance instance = new MockInstance("instance1"); Connector conn = instance.getConnector("root", new PasswordToken("")); conn.tableOperations().create("test"); BatchWriter bw = conn.createBatchWriter("test", new BatchWriterConfig()); - + for (Entry<Key,Value> e : data) { Key k = e.getKey(); Mutation m = new Mutation(k.getRow()); @@ -237,18 +237,18 @@ public class ChunkInputFormatTest extends TestCase { bw.addMutation(m); } bw.close(); - + assertEquals(0, CIFTester.main(new String[] {"instance1", "root", "", "test", CIFTester.TestMapper.class.getName()})); assertNull(e1); assertNull(e2); } - + public void testErrorOnNextWithoutClose() throws Exception { MockInstance instance = new MockInstance("instance2"); Connector conn = instance.getConnector("root", new PasswordToken("")); conn.tableOperations().create("test"); BatchWriter bw = conn.createBatchWriter("test", new BatchWriterConfig()); - + for (Entry<Key,Value> e : data) { Key k = e.getKey(); Mutation m = new Mutation(k.getRow()); @@ -256,13 +256,13 @@ public class ChunkInputFormatTest extends TestCase { bw.addMutation(m); } bw.close(); - + assertEquals(1, CIFTester.main(new String[] {"instance2", "root", "", "test", CIFTester.TestNoClose.class.getName()})); assertNull(e1); assertNull(e2); assertNotNull(e3); } - + public void testInfoWithoutChunks() throws Exception { MockInstance instance = new MockInstance("instance3"); Connector conn = instance.getConnector("root", new PasswordToken("")); @@ -275,7 +275,7 @@ public class ChunkInputFormatTest extends TestCase { bw.addMutation(m); } bw.close(); - + assertEquals(0, CIFTester.main(new String[] {"instance3", "root", "", "test", CIFTester.TestBadData.class.getName()})); assertNull(e0); assertNull(e1);