Hi,
below is a textbook example of using HBase from a MapReduce job. I am trying
to rewrite it in the 0.89 API, and I have not succeeded yet. Can
anyone please give me some pointers?
Thank you very much. Sincerely,
Mark
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MailboxIdCount extends Configured implements Tool {
// Name of this 'program'
static final String NAME = "mailboxid-counter";
private static IntWritable ONE = new IntWritable(1);
static class RowCounterMapper
implements TableMap<Text, IntWritable> {
private static enum Counters {
ROWS
}
@Override
public void map(ImmutableBytesWritable row, RowResult value,
OutputCollector<Text, IntWritable> output,
Reporter reporter)
throws IOException {
for (Map.Entry<byte[], Cell> e : value.entrySet()) {
Cell cell = e.getValue();
if (cell != null && cell.getValue().length > 0) {
Text text = new Text(cell.getValue());
output.collect(text, ONE);
}
}
}
@Override
public void configure(JobConf jc) {
// Nothing to do.
}
@Override
public void close() throws IOException {
// Nothing to do.
}
}
public static class RowCounterReducer
implements TableReduce<Text, IntWritable> {
@Override
public void configure(JobConf jc) {
// do nothing
}
@Override
public void close() throws IOException {
// do nothing
}
@Override
public void reduce(Text k2, Iterator<IntWritable> itrtr,
OutputCollector<ImmutableBytesWritable, BatchUpdate> oc, Reporter rprtr)
throws IOException {
int sum = 0;
while (itrtr.hasNext()) {
IntWritable val = itrtr.next();
sum += val.get();
}
String family = "stats:";
String familyCell = family + k2.toString();
BatchUpdate update = new BatchUpdate(k2.toString());
update.put(familyCell, String.valueOf(sum).getBytes());
oc.collect(new ImmutableBytesWritable(familyCell.getBytes()),
update);
}
}
public JobConf createSubmittableJob(String[] args) throws IOException {
JobConf c = new JobConf(getConf(), getClass());
c.setJobName(NAME);
// Columns are space delimited
StringBuilder sb = new StringBuilder();
final int columnoffset = 2;
for (int i = columnoffset; i < args.length; i++) {
if (i > columnoffset) {
sb.append(" ");
}
sb.append(args[i]);
}
// Second argument is the table name.
TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
RowCounterMapper.class, Text.class, IntWritable.class, c);
TableMapReduceUtil.initTableReduceJob("mailbox-status",
RowCounterReducer.class, c);
// First arg is the output directory.
FileOutputFormat.setOutputPath(c, new Path(args[0]));
return c;
}
static int printUsage() {
System.out.println(NAME
+ " <outputdir> <tablename> <column1> [<column2>...]");
return -1;
}
@Override
public int run(final String[] args) throws Exception {
// Make sure there are at least 3 parameters
if (args.length < 3) {
System.err.println("ERROR: Wrong number of parameters: " +
args.length);
return printUsage();
}
JobClient.runJob(createSubmittableJob(args));
return 0;
}
public static void main(String[] args) throws Exception {
HBaseConfiguration c = new HBaseConfiguration();
int errCode = ToolRunner.run(c, new MailboxIdCount(), args);
System.exit(errCode);
}
}