Hi,
       Getting this error while using hbase as a sink.

Error
java.io.IOException: Pass a Delete or a Put
        at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
        at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
        at
org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:587)
        at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
        at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
        at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)



 Below is my code
Using the following version

Hbase = 0.94
Hadoop - 1.0.3

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.*;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class DailyAggMapReduce {

    public static void main(String args[]) throws Exception {
        Configuration config = HBaseConfiguration.create();
        Job job = new Job(config, "DailyAverageMR");
        job.setJarByClass(DailyAggMapReduce.class);
        Scan scan = new Scan();
        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCaching(500);
        // don't set to true for MR jobs
        scan.setCacheBlocks(false);

        TableMapReduceUtil.initTableMapperJob(
                "HTASDB",        // input table
                scan,               // Scan instance to control CF and
attribute selection
                DailySumMapper.class,     // mapper class
                Text.class,         // mapper output key
                Text.class,  // mapper output value
                job);

        TableMapReduceUtil.initTableReducerJob(
                "DA",        // output table
                DailySumReducer.class,    // reducer class
                job);

        //job.setOutputValueClass(Put.class);
        job.setNumReduceTasks(1);   // at least one, adjust as required

        boolean b = job.waitForCompletion(true);
        if (!b) {
            throw new IOException("error with job!");
        }

    }


    public static class DailySumMapper extends TableMapper<Text, Text> {

        public void map(ImmutableBytesWritable row, Result value,
Mapper.Context context) throws IOException, InterruptedException {
            List<String> key = getRowKey(row.get());
            Text rowKey = new Text(key.get(0));
            int time = Integer.parseInt(key.get(1));
            //limiting the time for one day (Aug 04 2012) -- Testing, Not a
good way
            if (time <= 1344146400) {
                List<KeyValue> data = value.list();
                long inbound = 0l;
                long outbound = 0l;
                for (KeyValue kv : data) {
                    List<Long> values = getValues(kv.getValue());
                    if (values.get(0) != -1) {
                        inbound = inbound + values.get(0);
                    }
                    if (values.get(1) != -1) {
                        outbound = outbound + values.get(1);
                    }
                }
                context.write(rowKey, new Text(String.valueOf(inbound) +
"-" + String.valueOf(outbound)));
            }
        }

        private static List<Long> getValues(byte[] data) {
            List<Long> values = new ArrayList<Long>();
            ByteBuffer buffer = ByteBuffer.wrap(data);
            values.add(buffer.getLong());
            values.add(buffer.getLong());
            return values;
        }

        private static List<String> getRowKey(byte[] key) {
            List<String> keys = new ArrayList<String>();
            ByteBuffer buffer = ByteBuffer.wrap(key);
            StringBuilder sb = new StringBuilder();
            sb.append(buffer.getInt());
            sb.append("-");
            if (key.length == 13) {
                sb.append(buffer.getInt());
                sb.append("-");
            }
            sb.append(buffer.get());
            keys.add(sb.toString());
            keys.add(String.valueOf(buffer.getInt()));
            return keys;
        }
    }

    public static class DailySumReducer extends TableReducer<Text, Text,
Put> {
        private int count = 0;
        public void reduce(Text key, Iterable<Text> values, Reducer.Context
context) throws IOException, InterruptedException {
            long inbound = 0l;
            long outbound = 0l;
            for (Text val : values) {
                String text = val.toString();
                int index = text.indexOf("-");
                String in = text.substring(0,index);
                String out = text.substring(index+1,text.length());
                inbound = inbound + Long.parseLong(in);
                outbound = outbound + Long.parseLong(out);
            }
            ByteBuffer data = ByteBuffer.wrap(new byte[16]);
            data.putLong(inbound);
            data.putLong(outbound);
            Put put = new Put(Bytes.toBytes(key.toString()+20120804));
            put.add(Bytes.toBytes("t"), Bytes.toBytes("s"),data.array());
            context.setStatus("Emitting Put " + count++);
            context.write(key, put);
        }
    }
}

Thanks
Jothikumar

Reply via email to