Ted,
Might be a something very basic that I am missing but why should OP's
reducer's key be of type ImmutableBytesWritable if he is emitting Text in
the mapper? Thanks.
protected void map(
ImmutableBytesWritable key,
Result value,
org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable,
Result, Text, Text>.Context context)
throws IOException, InterruptedException {
context.write(this.sentenseOriginal, this.sentenseParsed); //sentenseOriginal
is Text
Regards,
Shahab
On Thu, Sep 5, 2013 at 10:34 AM, Ted Yu <[email protected]> wrote:
> public class SentimentCalculationHBaseReducer extends
>
> TableReducer<Text, Text, ImmutableBytesWritable> {
>
> The first type parameter for reducer should be ImmutableBytesWritable
>
> Cheers
>
>
> On Wed, Sep 4, 2013 at 11:16 PM, Omkar Joshi <[email protected]
> >wrote:
>
> > I'm trying to execute a MR code over stand-alone HBase(0.94.11). I had
> > read the HBase api and modified my MR code to read data and getting
> > exceptions in the Reduce phase.
> >
> > The exception I get is :
> >
> > 13/09/05 16:16:17 INFO mapred.JobClient: map 0% reduce 0%
> >
> > 13/09/05 16:23:31 INFO mapred.JobClient: Task Id :
> > attempt_201309051437_0005_m_000000_0, Status : FAILED
> >
> > java.io.IOException: wrong key class: class
> > org.apache.hadoop.hbase.io.ImmutableBytesWritable is not class
> > org.apache.hadoop.io.Text
> >
> > at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
> >
> > at
> >
> org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1168)
> >
> > at
> >
> org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1492)
> >
> > at
> >
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> >
> > at
> >
> com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:199)
> >
> > at
> >
> com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:1)
> >
> > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
> >
> > at
> > org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1513)
> >
> > at
> >
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436)
> >
> > at
> > org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1298)
> >
> > at
> >
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:699)
> >
> > at
> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:766)
> >
> > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
> >
> > at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
> >
> > at java.security.AccessController.doPrivileged(Native Method)
> >
> > at javax.security.auth.Subject.doAs(Subject.java:415)
> >
> > at
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
> >
> > at org.apache.hadoop.mapred.Child.main(Child.java:249)
> >
> >
> >
> > Providing the partial(excluding the business logic) codes
> >
> >
> > Mapper:
> >
> >
> > public class SentimentCalculationHBaseMapper extends TableMapper<Text,
> > Text> {
> >
> >
> >
> > private Text sentenseOriginal = new Text();
> >
> > private Text sentenseParsed = new Text();
> >
> >
> >
> > @Override
> >
> > protected void map(
> >
> > ImmutableBytesWritable key,
> >
> > Result value,
> >
> > org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable,
> > Result, Text, Text>.Context context)
> >
> > throws IOException, InterruptedException {
> >
> > context.write(this.sentenseOriginal, this.sentenseParsed);
> >
> > }
> >
> > }
> >
> > Reducer :
> >
> >
> > public class SentimentCalculationHBaseReducer extends
> >
> > TableReducer<Text, Text, ImmutableBytesWritable> {
> >
> >
> >
> > @Override
> >
> > protected void reduce(
> >
> > Text key,
> >
> > java.lang.Iterable<Text> values,
> >
> > org.apache.hadoop.mapreduce.Reducer<Text, Text,
> > ImmutableBytesWritable, org.apache.hadoop.io.Writable>.Context context)
> >
> > throws IOException, InterruptedException {
> >
> >
> >
> > Double mdblSentimentOverall = 0.0;
> >
> >
> >
> >
> >
> > String d3 = key + "@12321@" + s11.replaceFirst(":::", "")
> >
> > + "@12321@" + mstpositiveWords + "@12321@"
> >
> > + mstnegativeWords + "@12321@" + mstneutralWords;
> >
> >
> >
> > System.out.println("d3 : " + d3 + " , mdblSentimentOverall :
> "
> >
> > + mdblSentimentOverall);
> >
> >
> >
> > Put put = new Put(d3.getBytes());
> >
> >
> >
> > put.add(Bytes.toBytes("word_attributes"),
> >
> > Bytes.toBytes("mdblSentimentOverall"),
> >
> > Bytes.toBytes(mdblSentimentOverall));
> >
> >
> >
> > System.out.println("Context is " + context);
> >
> >
> >
> > context.write(new ImmutableBytesWritable(d3.getBytes()),
> put);
> >
> > }
> >
> > }
> >
> > SentimentCalculatorHBase - the Tool/main class :
> > package com.hbase.mapreduce;
> >
> > import java.util.Calendar;
> >
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.conf.Configured;
> > import org.apache.hadoop.hbase.client.Put;
> > import org.apache.hadoop.hbase.client.Scan;
> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> > import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
> > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> > import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
> > import org.apache.hadoop.io.Text;
> > import org.apache.hadoop.mapreduce.Job;
> > import org.apache.hadoop.util.Tool;
> > import org.apache.hadoop.util.ToolRunner;
> >
> > public class SentimentCalculatorHBase extends Configured implements Tool
> {
> >
> > /**
> > * @param args
> > * @throws Exception
> > */
> > public static void main(String[] args) throws Exception {
> > // TODO Auto-generated method stub
> > SentimentCalculatorHBase sentimentCalculatorHBase = new
> > SentimentCalculatorHBase();
> > ToolRunner.run(sentimentCalculatorHBase, args);
> > }
> >
> > @Override
> > public int run(String[] arg0) throws Exception {
> > // TODO Auto-generated method stub
> >
> >
> > System.out
> > .println("***********************Configuration
> > started***********************");
> > Configuration configuration = getConf();
> > System.out.println("Conf: " + configuration);
> >
> >
> > Job sentiCalcJob = new Job(configuration, "HBase
> > SentimentCalculation");
> >
> > sentiCalcJob.setJarByClass(SentimentCalculatorHBase.class);
> >
> sentiCalcJob.setMapperClass(SentimentCalculationHBaseMapper.class);
> >
> > sentiCalcJob.setCombinerClass(SentimentCalculationHBaseReducer.class);
> >
> > sentiCalcJob.setReducerClass(SentimentCalculationHBaseReducer.class);
> >
> >
> > sentiCalcJob.setInputFormatClass(TableInputFormat.class);
> > sentiCalcJob.setOutputFormatClass(TableOutputFormat.class);
> >
> > /* Start : Added out of exasperation! */
> > sentiCalcJob.setOutputKeyClass(ImmutableBytesWritable.class);
> > sentiCalcJob.setOutputValueClass(Put.class);
> > /* End : Added out of exasperation! */
> >
> > Scan twitterdataUserScan = new Scan();
> > twitterdataUserScan.setCaching(500);
> >
> > twitterdataUserScan.addColumn("word_attributes".getBytes(),
> > "TwitterText".getBytes());
> >
> > TableMapReduceUtil.initTableMapperJob("twitterdata_user",
> > twitterdataUserScan,
> SentimentCalculationHBaseMapper.class,
> > Text.class, Text.class, sentiCalcJob);
> >
> > TableMapReduceUtil.initTableReducerJob("sentiment_output",
> > SentimentCalculationHBaseReducer.class, sentiCalcJob);
> >
> > Calendar beforeJob = Calendar.getInstance();
> > System.out.println("Job Time started---------------- "
> > + beforeJob.getTime());
> > boolean check = sentiCalcJob.waitForCompletion(true);
> > if (check == true) {
> > System.out
> > .println("*******************Job completed-
> > SentimentCalculation********************");
> > }
> > Calendar afterJob = Calendar.getInstance();
> > System.out
> > .println("Job Time ended
> > SentimentCalculation---------------- "
> > + afterJob.getTime());
> > return 0;
> > }
> > }
> >
> >
> >
> > Regards,
> > Omkar Joshi
> >
> > ________________________________
> > The contents of this e-mail and any attachment(s) may contain
> confidential
> > or privileged information for the intended recipient(s). Unintended
> > recipients are prohibited from taking action on the basis of information
> in
> > this e-mail and using or disseminating the information, and must notify
> the
> > sender and delete it from their system. L&T Infotech will not accept
> > responsibility or liability for the accuracy or completeness of, or the
> > presence of any virus or disabling code in this e-mail"
> >
>