Hi, I am new to hbase, so few noob questions. So, I created a table in hbase: A quick scan gives me the following: hbase(main):001:0> scan 'test' ROW COLUMN+CELL
row1 column=cf:word, timestamp=1377298314160, value=foo row2 column=cf:word, timestamp=1377298326124, value=bar row3 column=cf:word, timestamp=1377298332856, value=bar foo row4 column=cf:word, timestamp=1377298347602, value=bar world foo Now, I want to do the word count and write the result back to another table in hbase So I followed the code given below: http://hbase.apache.org/book.html#mapreduce Snapshot in the end: Now, I am getting an error java.lang.NullPointerException at java.lang.String.<init>(String.java:601) at org.rdf.HBaseExperiment$MyMapper.map(HBaseExperiment.java:42) at org.rdf.HBaseExperiment$MyMapper.map(HBaseExperiment.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093) at org.apache.hadoop.mapred.Child.main(Child.java:249) Line 42 points to *public static final byte[] ATTR1 = "attr1".getBytes();* Now I think attr1 is family qualifier. I am wondering, what exactly is a family qualifier? Do I need to set something while creating a table just like I did "cf" when I was creating the table. Similiarly what do I need to do on the "output" table as well? So, what I am saying is.. what do I need to to on hbase shell so that I can run this word count example? Thanks import java.io.IOException; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.co_occurance.Pair; import org.co_occurance.PairsMethod; import org.co_occurance.PairsMethod.MeanReducer; import org.co_occurance.PairsMethod.PairsMapper; public class HBaseExperiment { public static class MyMapper extends TableMapper<Text, IntWritable> { public static final byte[] CF = "cf".getBytes(); *public static final byte[] ATTR1 = "attr1".getBytes();* private final IntWritable ONE = new IntWritable(1); private Text text = new Text(); public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { String val = new String(value.getValue(CF, ATTR1)); //text.set(val); // we can only emit Writables... text.set(value.toString()); context.write(text, ONE); } } public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public static final byte[] CF = "cf".getBytes(); public static final byte[] COUNT = "count".getBytes(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int i = 0; for (IntWritable val : values) { i += val.get(); } Put put = new Put(Bytes.toBytes(key.toString())); put.add(CF, COUNT, Bytes.toBytes(i)); context.write(null, put); } } public static void main(String[] args) throws Exception { Configuration config = HBaseConfiguration.create(); Job job = new Job(config,"ExampleSummary"); job.setJarByClass(HBaseExperiment.class); // class that contains mapper and reducer Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs // set other scan attrs TableMapReduceUtil.initTableMapperJob( "test", // input table scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper class Text.class, // mapper output key IntWritable.class, // mapper output value job); TableMapReduceUtil.initTableReducerJob( "output", // output table MyTableReducer.class, // reducer class job); job.setNumReduceTasks(1); // at least one, adjust as required long start = new Date().getTime(); boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } long end = new Date().getTime(); System.out.println("Job took " + ((end-start)/1000) + " seconds" ); } }
