public class SentimentCalculationHBaseReducer extends
TableReducer<Text, Text, ImmutableBytesWritable> {
The first type parameter for reducer should be ImmutableBytesWritable
Cheers
On Wed, Sep 4, 2013 at 11:16 PM, Omkar Joshi <[email protected]>wrote:
> I'm trying to execute a MR code over stand-alone HBase(0.94.11). I had
> read the HBase api and modified my MR code to read data and getting
> exceptions in the Reduce phase.
>
> The exception I get is :
>
> 13/09/05 16:16:17 INFO mapred.JobClient: map 0% reduce 0%
>
> 13/09/05 16:23:31 INFO mapred.JobClient: Task Id :
> attempt_201309051437_0005_m_000000_0, Status : FAILED
>
> java.io.IOException: wrong key class: class
> org.apache.hadoop.hbase.io.ImmutableBytesWritable is not class
> org.apache.hadoop.io.Text
>
> at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
>
> at
> org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1168)
>
> at
> org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1492)
>
> at
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
>
> at
> com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:199)
>
> at
> com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:1)
>
> at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>
> at
> org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1513)
>
> at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436)
>
> at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1298)
>
> at
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:699)
>
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:766)
>
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
>
> at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:415)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
>
> at org.apache.hadoop.mapred.Child.main(Child.java:249)
>
>
>
> Providing the partial(excluding the business logic) codes
>
>
> Mapper:
>
>
> public class SentimentCalculationHBaseMapper extends TableMapper<Text,
> Text> {
>
>
>
> private Text sentenseOriginal = new Text();
>
> private Text sentenseParsed = new Text();
>
>
>
> @Override
>
> protected void map(
>
> ImmutableBytesWritable key,
>
> Result value,
>
> org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable,
> Result, Text, Text>.Context context)
>
> throws IOException, InterruptedException {
>
> context.write(this.sentenseOriginal, this.sentenseParsed);
>
> }
>
> }
>
> Reducer :
>
>
> public class SentimentCalculationHBaseReducer extends
>
> TableReducer<Text, Text, ImmutableBytesWritable> {
>
>
>
> @Override
>
> protected void reduce(
>
> Text key,
>
> java.lang.Iterable<Text> values,
>
> org.apache.hadoop.mapreduce.Reducer<Text, Text,
> ImmutableBytesWritable, org.apache.hadoop.io.Writable>.Context context)
>
> throws IOException, InterruptedException {
>
>
>
> Double mdblSentimentOverall = 0.0;
>
>
>
>
>
> String d3 = key + "@12321@" + s11.replaceFirst(":::", "")
>
> + "@12321@" + mstpositiveWords + "@12321@"
>
> + mstnegativeWords + "@12321@" + mstneutralWords;
>
>
>
> System.out.println("d3 : " + d3 + " , mdblSentimentOverall : "
>
> + mdblSentimentOverall);
>
>
>
> Put put = new Put(d3.getBytes());
>
>
>
> put.add(Bytes.toBytes("word_attributes"),
>
> Bytes.toBytes("mdblSentimentOverall"),
>
> Bytes.toBytes(mdblSentimentOverall));
>
>
>
> System.out.println("Context is " + context);
>
>
>
> context.write(new ImmutableBytesWritable(d3.getBytes()), put);
>
> }
>
> }
>
> SentimentCalculatorHBase - the Tool/main class :
> package com.hbase.mapreduce;
>
> import java.util.Calendar;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.client.Scan;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
> import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
> public class SentimentCalculatorHBase extends Configured implements Tool {
>
> /**
> * @param args
> * @throws Exception
> */
> public static void main(String[] args) throws Exception {
> // TODO Auto-generated method stub
> SentimentCalculatorHBase sentimentCalculatorHBase = new
> SentimentCalculatorHBase();
> ToolRunner.run(sentimentCalculatorHBase, args);
> }
>
> @Override
> public int run(String[] arg0) throws Exception {
> // TODO Auto-generated method stub
>
>
> System.out
> .println("***********************Configuration
> started***********************");
> Configuration configuration = getConf();
> System.out.println("Conf: " + configuration);
>
>
> Job sentiCalcJob = new Job(configuration, "HBase
> SentimentCalculation");
>
> sentiCalcJob.setJarByClass(SentimentCalculatorHBase.class);
> sentiCalcJob.setMapperClass(SentimentCalculationHBaseMapper.class);
>
> sentiCalcJob.setCombinerClass(SentimentCalculationHBaseReducer.class);
>
> sentiCalcJob.setReducerClass(SentimentCalculationHBaseReducer.class);
>
>
> sentiCalcJob.setInputFormatClass(TableInputFormat.class);
> sentiCalcJob.setOutputFormatClass(TableOutputFormat.class);
>
> /* Start : Added out of exasperation! */
> sentiCalcJob.setOutputKeyClass(ImmutableBytesWritable.class);
> sentiCalcJob.setOutputValueClass(Put.class);
> /* End : Added out of exasperation! */
>
> Scan twitterdataUserScan = new Scan();
> twitterdataUserScan.setCaching(500);
>
> twitterdataUserScan.addColumn("word_attributes".getBytes(),
> "TwitterText".getBytes());
>
> TableMapReduceUtil.initTableMapperJob("twitterdata_user",
> twitterdataUserScan, SentimentCalculationHBaseMapper.class,
> Text.class, Text.class, sentiCalcJob);
>
> TableMapReduceUtil.initTableReducerJob("sentiment_output",
> SentimentCalculationHBaseReducer.class, sentiCalcJob);
>
> Calendar beforeJob = Calendar.getInstance();
> System.out.println("Job Time started---------------- "
> + beforeJob.getTime());
> boolean check = sentiCalcJob.waitForCompletion(true);
> if (check == true) {
> System.out
> .println("*******************Job completed-
> SentimentCalculation********************");
> }
> Calendar afterJob = Calendar.getInstance();
> System.out
> .println("Job Time ended
> SentimentCalculation---------------- "
> + afterJob.getTime());
> return 0;
> }
> }
>
>
>
> Regards,
> Omkar Joshi
>
> ________________________________
> The contents of this e-mail and any attachment(s) may contain confidential
> or privileged information for the intended recipient(s). Unintended
> recipients are prohibited from taking action on the basis of information in
> this e-mail and using or disseminating the information, and must notify the
> sender and delete it from their system. L&T Infotech will not accept
> responsibility or liability for the accuracy or completeness of, or the
> presence of any virus or disabling code in this e-mail"
>