Hi Everyone,
I have been trying to run a mapreduce on HBase 0.20.2 - Source and Sink both
being HBase Tables.
On running this mapreduce code, the Map completes successfully, but before
reduce starts, I am hitting an IOException as below:
2011-07-26 23:06:07,337 WARN [Thread-11] mapred.LocalJobRunner$Job(255):
job_local_0001
java.io.IOException: Pass a Delete or a Put
at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:96)
at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:55)
at
org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:154)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:174)
at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:563)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:215)
*My Mapper class:*
public static class Mapper1 extends TableMapper<Text, IntArrayWritable>
{
public void map(ImmutableBytesWritable row, Result values, Context
context) throws IOException
{
......
......
context.write(cookie, out); //out is an IntArrayWritable
}
}
---------------------------------------------------------------------------------------------------------------------------------------
*
Reducer Class:*
public static class Reducer1 extends TableReducer<Text, IntArrayWritable,
Text>
{
public void reduce(Text key, Iterator <IntArrayWritable> values,
Context context)
throws IOException, InterruptedException
{
....
....
Put put = new Put(rowid.getBytes());
put.add(Bytes.toBytes("cf"), Bytes.toBytes("stats"),
Bytes.toBytes(val));
context.write(new Text(rowid), put);
}
}
-----------------------------------------------------------------------------------------------------------------------------------------
*Main()*
public static void main(String[] args) throws Exception
{
HBaseConfiguration conf = new HBaseConfiguration();
Job job = new Job(conf, "CookieJob");
job.setJarByClass(CookieJob.class);
scan.addFamily("cf".getBytes()); // The InputTable has only 1 column
family - cf.
scan.setMaxVersions();
TableMapReduceUtil.initTableMapperJob("InputTable", scan,
Mapper1.class, Text.class,
IntArrayWritable.class, job);
TableMapReduceUtil.initTableReducerJob("OutputTable",
Reducer1.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
---------------------------------------------------------------------------------------------------------------------------------------
Please let me know if somebody knows what could be the reason behind this. I
have been bogged down by this exception for a very long time.
If somebody could throw light on why this exception is occurring before the
reduce starts, I would be very much thankful.
Please let me know if you need any other infomation to debug this issue.
Regards,
Narayanan