Hi Manish,
First, instead of "stocks".getBytes() you need to use
Bytes.toBytes("stocks"). Same for the other strings.
Second, on your map task, you creates 2 bytes arrays for those string at
each iteration. You don't want that on a production environment. You should
move that outside of the map method and make them final static.
JM
2013/8/16 manish dunani <[email protected]>
> hello,
>
> I am using apache hadoop 1.1.2 and hbase 0.94.9 on pseudo distibuted mode.
>
> I am trying to find Average open stocks values.
>
> *sample dataset in hbase::**(table name:nyse4)*
>
>
> 2010-02-04 column=stocks:open, timestamp=1376567559424,
> value=2.5
> 2010-02-04 column=stocks:symbol, timestamp=1376567559424,
> value=QXM
> 2010-02-05 column=stocks:open, timestamp=1376567559429,
> value=2.42
> 2010-02-05 column=stocks:symbol, timestamp=1376567559429,
> value=QXM
> 2010-02-08 column=stocks:open, timestamp=1376567559431,
> value=2.33
> 2010-02-08 column=stocks:symbol, timestamp=1376567559431,
> value=QXM
>
> *code:*(please ignores the lines that are commenting)
>
>
> > package com.maddy;
> >
> > import java.io.IOException;
> >
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.fs.Path;
> > import org.apache.hadoop.hbase.HBaseConfiguration;
> > import org.apache.hadoop.hbase.client.Put;
> > import org.apache.hadoop.hbase.client.Result;
> > import org.apache.hadoop.hbase.client.Scan;
> > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> > import org.apache.hadoop.hbase.mapreduce.TableMapper;
> > import org.apache.hadoop.hbase.mapreduce.TableReducer;
> > import org.apache.hadoop.hbase.util.Bytes;
> > //import org.apache.hadoop.io.DoubleWritable;
> > import org.apache.hadoop.io.FloatWritable;
> > import org.apache.hadoop.mapreduce.Job;
> > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> >
> >
> > public class openaveragestock
> > {
> > public static class map extends
> > TableMapper<ImmutableBytesWritable,FloatWritable>
> > {
> > @Override
> >
> > public void map(ImmutableBytesWritable row,Result value,Context
> > context) throws IOException
> > {
> >
> > byte[]
> > val=(value.getValue("stocks".getBytes(),"open".getBytes()));
> > //byte[]
> > val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
> >
> >
> > ImmutableBytesWritable stock_symbol=new
> > ImmutableBytesWritable("symbol".getBytes());
> >
> >
> > try
> > {
> > context.write(stock_symbol,new
> > FloatWritable(Bytes.toFloat(val)));
> > }
> > catch(InterruptedException e)
> >
> > {
> > throw new IOException(e);
> > }
> >
> >
> > }
> >
> >
> > }
> >
> >
> > public static class reduce extends
> > TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
> > {
> >
> > @Override
> > public void reduce(ImmutableBytesWritable
> > key,Iterable<FloatWritable>values,Context context) throws IOException,
> > InterruptedException
> > {
> > float sum=0;
> > int count=0;
> > // float average=0;
> > for(FloatWritable val:values)
> > {
> > sum+=val.get();
> > count++;
> > }
> > //average=(sum/count);
> > Put put=new Put(key.get());
> >
> >
> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(sum/count));
> > System.out.println("For\t"+count+"\t average
> is:"+(sum/count));
> > context.write(key,put);
> >
> > }
> >
> > }
> >
> > public static void main(String args[]) throws IOException,
> > ClassNotFoundException, InterruptedException
> > {
> > Configuration config=HBaseConfiguration.create();
> > config.addResource("/home/manish/workspace/hbase
> > project/bin/hbase-site.xml");
> > Job job=new Job(config,"openstockaverage1");
> >
> >
> > Scan scan=new Scan();
> > scan.addFamily("stocks".getBytes());
> > scan.setFilter(new FirstKeyOnlyFilter());
> >
> > TableMapReduceUtil.initTableMapperJob("nyse4",
> > scan,
> > map.class,
> > ImmutableBytesWritable.class,
> > FloatWritable.class,
> > job);
> >
> > TableMapReduceUtil.initTableReducerJob("nyse5",
> > reduce.class,
> > job);
> > //job.setReducerClass(reduce.class);
> >
> > FileOutputFormat.setOutputPath(job, new Path(
> >
> "hdfs://localhost:54310/user/manish/edurekahbasehadoop1"));
> > job.waitForCompletion(true);
> > }
> >
> > }
> >
> >
> >
> > *===>Got stuck into error:*
> >
> >
> > 13/08/16 03:21:45 INFO mapred.JobClient: Running job: job_local_0001
> > 13/08/16 03:21:46 INFO mapred.JobClient: map 0% reduce 0%
> > 13/08/16 03:21:46 INFO mapreduce.TableOutputFormat: Created table
> instance
> > for nyse5
> > 13/08/16 03:21:46 INFO util.ProcessTree: setsid exited with exit code 0
> > 13/08/16 03:21:47 INFO mapred.Task: Using ResourceCalculatorPlugin :
> > org.apache.hadoop.util.LinuxResourceCalculatorPlugin@50b77c
> > 13/08/16 03:21:47 INFO mapred.MapTask: io.sort.mb = 100
> > 13/08/16 03:21:53 INFO mapred.MapTask: data buffer = 79691776/99614720
> > 13/08/16 03:21:53 INFO mapred.MapTask: record buffer = 262144/327680
> > 13/08/16 03:21:54 WARN mapred.LocalJobRunner: job_local_0001
> > java.lang.IllegalArgumentException: offset (0) + length (4) exceed the
> > capacity of the array: 3
> > at
> >
> org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:543)
> > at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:690)
> > at org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:584)
> > at org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:574)
> > at com.maddy.openaveragestock$map.map(openaveragestock.java:41)
> > at com.maddy.openaveragestock$map.map(openaveragestock.java:1)
> > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
> > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
> > at
> > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
> >
> >
> I cannot find where is it fail??
> Can you please tell me??
> where i was wrong..?
>
>
> Your help will be appreciated.
>
>
>
>
>
>
>
>
>
>
> --
> Regards
>
> *Manish Dunani*
> *Contact No* : +91 9408329137
> *skype id* : manish.dunani*
> *
>