Hi,
Getting this error while using hbase as a sink.
Error
java.io.IOException: Pass a Delete or a Put
at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
at
org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:587)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Below is my code
Using the following version
Hbase = 0.94
Hadoop - 1.0.3
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class DailyAggMapReduce {
public static void main(String args[]) throws Exception {
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "DailyAverageMR");
job.setJarByClass(DailyAggMapReduce.class);
Scan scan = new Scan();
// 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCaching(500);
// don't set to true for MR jobs
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(
"HTASDB", // input table
scan, // Scan instance to control CF and
attribute selection
DailySumMapper.class, // mapper class
Text.class, // mapper output key
Text.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
"DA", // output table
DailySumReducer.class, // reducer class
job);
//job.setOutputValueClass(Put.class);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
public static class DailySumMapper extends TableMapper<Text, Text> {
public void map(ImmutableBytesWritable row, Result value,
Mapper.Context context) throws IOException, InterruptedException {
List<String> key = getRowKey(row.get());
Text rowKey = new Text(key.get(0));
int time = Integer.parseInt(key.get(1));
//limiting the time for one day (Aug 04 2012) -- Testing, Not a
good way
if (time <= 1344146400) {
List<KeyValue> data = value.list();
long inbound = 0l;
long outbound = 0l;
for (KeyValue kv : data) {
List<Long> values = getValues(kv.getValue());
if (values.get(0) != -1) {
inbound = inbound + values.get(0);
}
if (values.get(1) != -1) {
outbound = outbound + values.get(1);
}
}
context.write(rowKey, new Text(String.valueOf(inbound) +
"-" + String.valueOf(outbound)));
}
}
private static List<Long> getValues(byte[] data) {
List<Long> values = new ArrayList<Long>();
ByteBuffer buffer = ByteBuffer.wrap(data);
values.add(buffer.getLong());
values.add(buffer.getLong());
return values;
}
private static List<String> getRowKey(byte[] key) {
List<String> keys = new ArrayList<String>();
ByteBuffer buffer = ByteBuffer.wrap(key);
StringBuilder sb = new StringBuilder();
sb.append(buffer.getInt());
sb.append("-");
if (key.length == 13) {
sb.append(buffer.getInt());
sb.append("-");
}
sb.append(buffer.get());
keys.add(sb.toString());
keys.add(String.valueOf(buffer.getInt()));
return keys;
}
}
public static class DailySumReducer extends TableReducer<Text, Text,
Put> {
private int count = 0;
public void reduce(Text key, Iterable<Text> values, Reducer.Context
context) throws IOException, InterruptedException {
long inbound = 0l;
long outbound = 0l;
for (Text val : values) {
String text = val.toString();
int index = text.indexOf("-");
String in = text.substring(0,index);
String out = text.substring(index+1,text.length());
inbound = inbound + Long.parseLong(in);
outbound = outbound + Long.parseLong(out);
}
ByteBuffer data = ByteBuffer.wrap(new byte[16]);
data.putLong(inbound);
data.putLong(outbound);
Put put = new Put(Bytes.toBytes(key.toString()+20120804));
put.add(Bytes.toBytes("t"), Bytes.toBytes("s"),data.array());
context.setStatus("Emitting Put " + count++);
context.write(key, put);
}
}
}
Thanks
Jothikumar