Thanks Ted. Regards, Shahab
On Thu, Sep 5, 2013 at 12:57 PM, Ted Yu <[email protected]> wrote: > The reducer also serves as combiner whose output would be sent to reducer. > > org.apache.hadoop.mapreduce.Reducer<Text, Text, > ImmutableBytesWritable, org.apache.hadoop.io.Writable>.Context context) > > So the type parameters above should facilitate this. > Take a look at the PutCombiner from HBase source code: > > public class PutCombiner<K> extends Reducer<K, Put, K, Put> { > Cheers > > On Thu, Sep 5, 2013 at 9:46 AM, Shahab Yunus <[email protected]> > wrote: > > > Ted, > > > > Might be a something very basic that I am missing but why should OP's > > reducer's key be of type ImmutableBytesWritable if he is emitting Text in > > the mapper? Thanks. > > > > protected void map( > > > > ImmutableBytesWritable key, > > > > Result value, > > > > org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable, > > Result, Text, Text>.Context context) > > > > throws IOException, InterruptedException { > > > > context.write(this.sentenseOriginal, this.sentenseParsed); > > //sentenseOriginal > > is Text > > > > > > Regards, > > Shahab > > > > > > On Thu, Sep 5, 2013 at 10:34 AM, Ted Yu <[email protected]> wrote: > > > > > public class SentimentCalculationHBaseReducer extends > > > > > > TableReducer<Text, Text, ImmutableBytesWritable> { > > > > > > The first type parameter for reducer should be ImmutableBytesWritable > > > > > > Cheers > > > > > > > > > On Wed, Sep 4, 2013 at 11:16 PM, Omkar Joshi < > > [email protected] > > > >wrote: > > > > > > > I'm trying to execute a MR code over stand-alone HBase(0.94.11). I > had > > > > read the HBase api and modified my MR code to read data and getting > > > > exceptions in the Reduce phase. > > > > > > > > The exception I get is : > > > > > > > > 13/09/05 16:16:17 INFO mapred.JobClient: map 0% reduce 0% > > > > > > > > 13/09/05 16:23:31 INFO mapred.JobClient: Task Id : > > > > attempt_201309051437_0005_m_000000_0, Status : FAILED > > > > > > > > java.io.IOException: wrong key class: class > > > > org.apache.hadoop.hbase.io.ImmutableBytesWritable is not class > > > > org.apache.hadoop.io.Text > > > > > > > > at > org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164) > > > > > > > > at > > > > > > > > > > org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1168) > > > > > > > > at > > > > > > > > > > org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1492) > > > > > > > > at > > > > > > > > > > org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) > > > > > > > > at > > > > > > > > > > com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:199) > > > > > > > > at > > > > > > > > > > com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:1) > > > > > > > > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) > > > > > > > > at > > > > > org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1513) > > > > > > > > at > > > > > > > > > > org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436) > > > > > > > > at > > > > > > org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1298) > > > > > > > > at > > > > > > > > > > org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:699) > > > > > > > > at > > > org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:766) > > > > > > > > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) > > > > > > > > at org.apache.hadoop.mapred.Child$4.run(Child.java:255) > > > > > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > > > > > at javax.security.auth.Subject.doAs(Subject.java:415) > > > > > > > > at > > > > > > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149) > > > > > > > > at org.apache.hadoop.mapred.Child.main(Child.java:249) > > > > > > > > > > > > > > > > Providing the partial(excluding the business logic) codes > > > > > > > > > > > > Mapper: > > > > > > > > > > > > public class SentimentCalculationHBaseMapper extends > TableMapper<Text, > > > > Text> { > > > > > > > > > > > > > > > > private Text sentenseOriginal = new Text(); > > > > > > > > private Text sentenseParsed = new Text(); > > > > > > > > > > > > > > > > @Override > > > > > > > > protected void map( > > > > > > > > ImmutableBytesWritable key, > > > > > > > > Result value, > > > > > > > > > org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable, > > > > Result, Text, Text>.Context context) > > > > > > > > throws IOException, InterruptedException { > > > > > > > > context.write(this.sentenseOriginal, this.sentenseParsed); > > > > > > > > } > > > > > > > > } > > > > > > > > Reducer : > > > > > > > > > > > > public class SentimentCalculationHBaseReducer extends > > > > > > > > TableReducer<Text, Text, ImmutableBytesWritable> { > > > > > > > > > > > > > > > > @Override > > > > > > > > protected void reduce( > > > > > > > > Text key, > > > > > > > > java.lang.Iterable<Text> values, > > > > > > > > org.apache.hadoop.mapreduce.Reducer<Text, Text, > > > > ImmutableBytesWritable, org.apache.hadoop.io.Writable>.Context > context) > > > > > > > > throws IOException, InterruptedException { > > > > > > > > > > > > > > > > Double mdblSentimentOverall = 0.0; > > > > > > > > > > > > > > > > > > > > > > > > String d3 = key + "@12321@" + s11.replaceFirst(":::", "") > > > > > > > > + "@12321@" + mstpositiveWords + "@12321@" > > > > > > > > + mstnegativeWords + "@12321@" + > mstneutralWords; > > > > > > > > > > > > > > > > System.out.println("d3 : " + d3 + " , > mdblSentimentOverall > > : > > > " > > > > > > > > + mdblSentimentOverall); > > > > > > > > > > > > > > > > Put put = new Put(d3.getBytes()); > > > > > > > > > > > > > > > > put.add(Bytes.toBytes("word_attributes"), > > > > > > > > Bytes.toBytes("mdblSentimentOverall"), > > > > > > > > Bytes.toBytes(mdblSentimentOverall)); > > > > > > > > > > > > > > > > System.out.println("Context is " + context); > > > > > > > > > > > > > > > > context.write(new ImmutableBytesWritable(d3.getBytes()), > > > put); > > > > > > > > } > > > > > > > > } > > > > > > > > SentimentCalculatorHBase - the Tool/main class : > > > > package com.hbase.mapreduce; > > > > > > > > import java.util.Calendar; > > > > > > > > import org.apache.hadoop.conf.Configuration; > > > > import org.apache.hadoop.conf.Configured; > > > > import org.apache.hadoop.hbase.client.Put; > > > > import org.apache.hadoop.hbase.client.Scan; > > > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > > > > import org.apache.hadoop.hbase.mapreduce.TableInputFormat; > > > > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; > > > > import org.apache.hadoop.io.Text; > > > > import org.apache.hadoop.mapreduce.Job; > > > > import org.apache.hadoop.util.Tool; > > > > import org.apache.hadoop.util.ToolRunner; > > > > > > > > public class SentimentCalculatorHBase extends Configured implements > > Tool > > > { > > > > > > > > /** > > > > * @param args > > > > * @throws Exception > > > > */ > > > > public static void main(String[] args) throws Exception { > > > > // TODO Auto-generated method stub > > > > SentimentCalculatorHBase sentimentCalculatorHBase = new > > > > SentimentCalculatorHBase(); > > > > ToolRunner.run(sentimentCalculatorHBase, args); > > > > } > > > > > > > > @Override > > > > public int run(String[] arg0) throws Exception { > > > > // TODO Auto-generated method stub > > > > > > > > > > > > System.out > > > > .println("***********************Configuration > > > > started***********************"); > > > > Configuration configuration = getConf(); > > > > System.out.println("Conf: " + configuration); > > > > > > > > > > > > Job sentiCalcJob = new Job(configuration, "HBase > > > > SentimentCalculation"); > > > > > > > > sentiCalcJob.setJarByClass(SentimentCalculatorHBase.class); > > > > > > > sentiCalcJob.setMapperClass(SentimentCalculationHBaseMapper.class); > > > > > > > > > sentiCalcJob.setCombinerClass(SentimentCalculationHBaseReducer.class); > > > > > > > > sentiCalcJob.setReducerClass(SentimentCalculationHBaseReducer.class); > > > > > > > > > > > > sentiCalcJob.setInputFormatClass(TableInputFormat.class); > > > > sentiCalcJob.setOutputFormatClass(TableOutputFormat.class); > > > > > > > > /* Start : Added out of exasperation! */ > > > > sentiCalcJob.setOutputKeyClass(ImmutableBytesWritable.class); > > > > sentiCalcJob.setOutputValueClass(Put.class); > > > > /* End : Added out of exasperation! */ > > > > > > > > Scan twitterdataUserScan = new Scan(); > > > > twitterdataUserScan.setCaching(500); > > > > > > > > twitterdataUserScan.addColumn("word_attributes".getBytes(), > > > > "TwitterText".getBytes()); > > > > > > > > TableMapReduceUtil.initTableMapperJob("twitterdata_user", > > > > twitterdataUserScan, > > > SentimentCalculationHBaseMapper.class, > > > > Text.class, Text.class, sentiCalcJob); > > > > > > > > TableMapReduceUtil.initTableReducerJob("sentiment_output", > > > > SentimentCalculationHBaseReducer.class, > sentiCalcJob); > > > > > > > > Calendar beforeJob = Calendar.getInstance(); > > > > System.out.println("Job Time started---------------- " > > > > + beforeJob.getTime()); > > > > boolean check = sentiCalcJob.waitForCompletion(true); > > > > if (check == true) { > > > > System.out > > > > .println("*******************Job completed- > > > > SentimentCalculation********************"); > > > > } > > > > Calendar afterJob = Calendar.getInstance(); > > > > System.out > > > > .println("Job Time ended > > > > SentimentCalculation---------------- " > > > > + afterJob.getTime()); > > > > return 0; > > > > } > > > > } > > > > > > > > > > > > > > > > Regards, > > > > Omkar Joshi > > > > > > > > ________________________________ > > > > The contents of this e-mail and any attachment(s) may contain > > > confidential > > > > or privileged information for the intended recipient(s). Unintended > > > > recipients are prohibited from taking action on the basis of > > information > > > in > > > > this e-mail and using or disseminating the information, and must > notify > > > the > > > > sender and delete it from their system. L&T Infotech will not accept > > > > responsibility or liability for the accuracy or completeness of, or > the > > > > presence of any virus or disabling code in this e-mail" > > > > > > > > > >
