Basically, BatchUpdate becomes Put and RowResult becomes Result.

J-D

On Tue, Feb 1, 2011 at 6:36 PM, Mark Kerzner <[email protected]> wrote:
> Hi,
>
> below is a textbook example of using HBase from a MapReduce job. I am trying
> to rewrite it in the 0.89 API, and I have not succeeded yet. Can
> anyone please give me some pointers?
>
> Thank you very much. Sincerely,
> Mark
>
> import java.io.IOException;
> import java.util.Iterator;
> import java.util.Map;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.io.BatchUpdate;
> import org.apache.hadoop.hbase.io.Cell;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.io.RowResult;
> import org.apache.hadoop.hbase.mapred.TableMap;
> import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
> import org.apache.hadoop.hbase.mapred.TableReduce;
> import org.apache.hadoop.io.IntWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.FileOutputFormat;
> import org.apache.hadoop.mapred.JobClient;
> import org.apache.hadoop.mapred.JobConf;
> import org.apache.hadoop.mapred.OutputCollector;
> import org.apache.hadoop.mapred.Reporter;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
> public class MailboxIdCount extends Configured implements Tool {
>    // Name of this 'program'
>
>    static final String NAME = "mailboxid-counter";
>    private static IntWritable ONE = new IntWritable(1);
>
>    static class RowCounterMapper
>            implements TableMap<Text, IntWritable> {
>
>        private static enum Counters {
>
>            ROWS
>        }
>
>        @Override
>        public void map(ImmutableBytesWritable row, RowResult value,
>                OutputCollector<Text, IntWritable> output,
>                Reporter reporter)
>                throws IOException {
>            for (Map.Entry<byte[], Cell> e : value.entrySet()) {
>                Cell cell = e.getValue();
>                if (cell != null && cell.getValue().length > 0) {
>                    Text text = new Text(cell.getValue());
>                    output.collect(text, ONE);
>                }
>            }
>
>        }
>
>        @Override
>        public void configure(JobConf jc) {
>            // Nothing to do.
>        }
>
>        @Override
>        public void close() throws IOException {
>            // Nothing to do.
>        }
>    }
>
>    public static class RowCounterReducer
>            implements TableReduce<Text, IntWritable> {
>
>        @Override
>        public void configure(JobConf jc) {
>            // do nothing
>        }
>
>        @Override
>        public void close() throws IOException {
>            // do nothing
>        }
>
>        @Override
>        public void reduce(Text k2, Iterator<IntWritable> itrtr,
> OutputCollector<ImmutableBytesWritable, BatchUpdate> oc, Reporter rprtr)
> throws IOException {
>            int sum = 0;
>            while (itrtr.hasNext()) {
>                IntWritable val = itrtr.next();
>                sum += val.get();
>            }
>            String family = "stats:";
>            String familyCell = family + k2.toString();
>            BatchUpdate update = new BatchUpdate(k2.toString());
>            update.put(familyCell, String.valueOf(sum).getBytes());
>            oc.collect(new ImmutableBytesWritable(familyCell.getBytes()),
> update);
>        }
>    }
>
>    public JobConf createSubmittableJob(String[] args) throws IOException {
>        JobConf c = new JobConf(getConf(), getClass());
>        c.setJobName(NAME);
>        // Columns are space delimited
>        StringBuilder sb = new StringBuilder();
>        final int columnoffset = 2;
>        for (int i = columnoffset; i < args.length; i++) {
>            if (i > columnoffset) {
>                sb.append(" ");
>            }
>            sb.append(args[i]);
>        }
>        // Second argument is the table name.
>        TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
>                RowCounterMapper.class, Text.class, IntWritable.class, c);
>        TableMapReduceUtil.initTableReduceJob("mailbox-status",
> RowCounterReducer.class, c);
>        // First arg is the output directory.
>        FileOutputFormat.setOutputPath(c, new Path(args[0]));
>        return c;
>    }
>
>    static int printUsage() {
>        System.out.println(NAME
>                + " <outputdir> <tablename> <column1> [<column2>...]");
>        return -1;
>    }
>
>    @Override
>    public int run(final String[] args) throws Exception {
>        // Make sure there are at least 3 parameters
>        if (args.length < 3) {
>            System.err.println("ERROR: Wrong number of parameters: " +
> args.length);
>            return printUsage();
>        }
>        JobClient.runJob(createSubmittableJob(args));
>        return 0;
>    }
>
>    public static void main(String[] args) throws Exception {
>        HBaseConfiguration c = new HBaseConfiguration();
>        int errCode = ToolRunner.run(c, new MailboxIdCount(), args);
>        System.exit(errCode);
>    }
> }
>

Reply via email to