Basically, BatchUpdate becomes Put and RowResult becomes Result. J-D
On Tue, Feb 1, 2011 at 6:36 PM, Mark Kerzner <[email protected]> wrote: > Hi, > > below is a textbook example of using HBase from a MapReduce job. I am trying > to rewrite it in the 0.89 API, and I have not succeeded yet. Can > anyone please give me some pointers? > > Thank you very much. Sincerely, > Mark > > import java.io.IOException; > import java.util.Iterator; > import java.util.Map; > import org.apache.hadoop.conf.Configured; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.hbase.HBaseConfiguration; > import org.apache.hadoop.hbase.io.BatchUpdate; > import org.apache.hadoop.hbase.io.Cell; > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > import org.apache.hadoop.hbase.io.RowResult; > import org.apache.hadoop.hbase.mapred.TableMap; > import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; > import org.apache.hadoop.hbase.mapred.TableReduce; > import org.apache.hadoop.io.IntWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.FileOutputFormat; > import org.apache.hadoop.mapred.JobClient; > import org.apache.hadoop.mapred.JobConf; > import org.apache.hadoop.mapred.OutputCollector; > import org.apache.hadoop.mapred.Reporter; > import org.apache.hadoop.util.Tool; > import org.apache.hadoop.util.ToolRunner; > > public class MailboxIdCount extends Configured implements Tool { > // Name of this 'program' > > static final String NAME = "mailboxid-counter"; > private static IntWritable ONE = new IntWritable(1); > > static class RowCounterMapper > implements TableMap<Text, IntWritable> { > > private static enum Counters { > > ROWS > } > > @Override > public void map(ImmutableBytesWritable row, RowResult value, > OutputCollector<Text, IntWritable> output, > Reporter reporter) > throws IOException { > for (Map.Entry<byte[], Cell> e : value.entrySet()) { > Cell cell = e.getValue(); > if (cell != null && cell.getValue().length > 0) { > Text text = new Text(cell.getValue()); > output.collect(text, ONE); > } > } > > } > > @Override > public void configure(JobConf jc) { > // Nothing to do. > } > > @Override > public void close() throws IOException { > // Nothing to do. > } > } > > public static class RowCounterReducer > implements TableReduce<Text, IntWritable> { > > @Override > public void configure(JobConf jc) { > // do nothing > } > > @Override > public void close() throws IOException { > // do nothing > } > > @Override > public void reduce(Text k2, Iterator<IntWritable> itrtr, > OutputCollector<ImmutableBytesWritable, BatchUpdate> oc, Reporter rprtr) > throws IOException { > int sum = 0; > while (itrtr.hasNext()) { > IntWritable val = itrtr.next(); > sum += val.get(); > } > String family = "stats:"; > String familyCell = family + k2.toString(); > BatchUpdate update = new BatchUpdate(k2.toString()); > update.put(familyCell, String.valueOf(sum).getBytes()); > oc.collect(new ImmutableBytesWritable(familyCell.getBytes()), > update); > } > } > > public JobConf createSubmittableJob(String[] args) throws IOException { > JobConf c = new JobConf(getConf(), getClass()); > c.setJobName(NAME); > // Columns are space delimited > StringBuilder sb = new StringBuilder(); > final int columnoffset = 2; > for (int i = columnoffset; i < args.length; i++) { > if (i > columnoffset) { > sb.append(" "); > } > sb.append(args[i]); > } > // Second argument is the table name. > TableMapReduceUtil.initTableMapJob(args[1], sb.toString(), > RowCounterMapper.class, Text.class, IntWritable.class, c); > TableMapReduceUtil.initTableReduceJob("mailbox-status", > RowCounterReducer.class, c); > // First arg is the output directory. > FileOutputFormat.setOutputPath(c, new Path(args[0])); > return c; > } > > static int printUsage() { > System.out.println(NAME > + " <outputdir> <tablename> <column1> [<column2>...]"); > return -1; > } > > @Override > public int run(final String[] args) throws Exception { > // Make sure there are at least 3 parameters > if (args.length < 3) { > System.err.println("ERROR: Wrong number of parameters: " + > args.length); > return printUsage(); > } > JobClient.runJob(createSubmittableJob(args)); > return 0; > } > > public static void main(String[] args) throws Exception { > HBaseConfiguration c = new HBaseConfiguration(); > int errCode = ToolRunner.run(c, new MailboxIdCount(), args); > System.exit(errCode); > } > } >
