Thanx a lot!!
Jean.
I am very thankful to you..And off course Ted also doing very good job.
*
Revised Code ::*
Package com.maddy;
>
> import java.io.IOException;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.client.Result;
> import org.apache.hadoop.hbase.client.Scan;
> import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> import org.apache.hadoop.hbase.mapreduce.TableMapper;
> import org.apache.hadoop.hbase.mapreduce.TableReducer;
> import org.apache.hadoop.hbase.util.Bytes;
> //import org.apache.hadoop.io.DoubleWritable;
> import org.apache.hadoop.io.FloatWritable;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>
>
> public class openaveragestock
> {
> public static class map extends
> TableMapper<ImmutableBytesWritable,FloatWritable>
> {
> private static String col_family="stocks";
> private static String qul="open";
>
> private static String col_family1="stocks";
> private static String qul1="symbol";
>
> private static byte[] colfamily2=Bytes.toBytes(col_family);
> private static byte[] qul2=Bytes.toBytes(qul);
>
> private static byte[] colfamily3=Bytes.toBytes(col_family1);
> private static byte[] qul3=Bytes.toBytes(qul1);
>
> // public static float toFloat(int qul2)
> // {
> // return Float.intBitsToFloat((qul2));
> //
> // }
> //
>
>
>
> public void map(ImmutableBytesWritable row,Result value,Context
> context) throws IOException
> {
>
>
> //byte[]
> val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
> byte[] val=value.getValue(colfamily2,qul2);
>
>
> ImmutableBytesWritable stock_symbol=new
> ImmutableBytesWritable(qul3);
>
>
>
> try
> {
>
> context.write(stock_symbol,new
> FloatWritable(Float.parseFloat(Bytes.toString(val))));
> }
>
> catch(InterruptedException e)
>
> {
> throw new IOException(e);
> }
>
>
> }
>
>
> }
>
>
> public static class reduce extends
> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
> {
>
> @Override
> public void reduce(ImmutableBytesWritable
> key,Iterable<FloatWritable>values,Context context) throws IOException,
> InterruptedException
> {
> float sum=0;
> int count=0;
> float average=0;
> for(FloatWritable val:values)
> {
> sum+=val.get();
> count++;
> }
> average=(sum/count);
> Put put=new Put(key.get());
>
> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average));
> System.out.println("For\t"+count+"\t average is:"+average);
> context.write(key,put);
>
> }
>
> }
>
> public static void main(String args[]) throws IOException,
> ClassNotFoundException, InterruptedException
> {
> Configuration config=HBaseConfiguration.create();
> config.addResource("/home/manish/workspace/hbase
> project/bin/hbase-site.xml");
> Job job=new Job(config,"openstockaverage1");
>
>
> Scan scan=new Scan();
> scan.addFamily("stocks".getBytes());
> scan.setFilter(new FirstKeyOnlyFilter());
>
> TableMapReduceUtil.initTableMapperJob("nyse4",
> scan,
> map.class,
> ImmutableBytesWritable.class,
> FloatWritable.class,
> job);
>
> TableMapReduceUtil.initTableReducerJob("nyse5",
> reduce.class,
> job);
> //job.setReducerClass(reduce.class);
>
> //FileOutputFormat.setOutputPath(job, new Path(
> //
> "hdfs://localhost:54310/user/manish/final_hbase_hadoop"));
> job.waitForCompletion(true);
> }
>
>
>
> }
>
>
*Sample output at my eclipse:::*
13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments
> 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass, with 1
> segments left of total size: 42242 bytes
> 13/08/17 07:27:21 INFO mapred.LocalJobRunner:
> 13/08/17 07:27:21 INFO mapred.JobClient: map 100% reduce 0%
> *For 2640 average is:6.201058*
> 13/08/17 07:27:21 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is
> done. And is in the process of commiting
> 13/08/17 07:27:21 INFO mapred.LocalJobRunner: reduce > reduce
> 13/08/17 07:27:21 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0'
> done.
> 13/08/17 07:27:22 WARN mapred.FileOutputCommitter: Output path is null in
> cleanup
> 13/08/17 07:27:22 INFO mapred.JobClient: map 100% reduce 100%
> 13/08/17 07:27:22 INFO mapred.JobClient: Job complete: job_local_0001
> 13/08/17 07:27:22 INFO mapred.JobClient: Counters: 30
> 13/08/17 07:27:22 INFO mapred.JobClient: HBase Counters
> 13/08/17 07:27:22 INFO mapred.JobClient: REMOTE_RPC_CALLS=0
> 13/08/17 07:27:22 INFO mapred.JobClient: RPC_CALLS=2643
> 13/08/17 07:27:22 INFO mapred.JobClient: RPC_RETRIES=0
> 13/08/17 07:27:22 INFO mapred.JobClient: NOT_SERVING_REGION_EXCEPTION=0
> 13/08/17 07:27:22 INFO mapred.JobClient: NUM_SCANNER_RESTARTS=0
> 13/08/17 07:27:22 INFO mapred.JobClient: MILLIS_BETWEEN_NEXTS=5849
> 13/08/17 07:27:22 INFO mapred.JobClient: BYTES_IN_RESULTS=126719
> 13/08/17 07:27:22 INFO mapred.JobClient: BYTES_IN_REMOTE_RESULTS=0
> 13/08/17 07:27:22 INFO mapred.JobClient: REGIONS_SCANNED=1
> 13/08/17 07:27:22 INFO mapred.JobClient: REMOTE_RPC_RETRIES=0
> 13/08/17 07:27:22 INFO mapred.JobClient: File Output Format Counters
> 13/08/17 07:27:22 INFO mapred.JobClient: Bytes Written=0
> 13/08/17 07:27:22 INFO mapred.JobClient: FileSystemCounters
> 13/08/17 07:27:22 INFO mapred.JobClient: FILE_BYTES_READ=24176810
> 13/08/17 07:27:22 INFO mapred.JobClient: FILE_BYTES_WRITTEN=24552208
> 13/08/17 07:27:22 INFO mapred.JobClient: File Input Format Counters
> 13/08/17 07:27:22 INFO mapred.JobClient: Bytes Read=0
> 13/08/17 07:27:22 INFO mapred.JobClient: Map-Reduce Framework
> 13/08/17 07:27:22 INFO mapred.JobClient: Reduce input groups=1
> 13/08/17 07:27:22 INFO mapred.JobClient: Map output materialized
> bytes=42246
> 13/08/17 07:27:22 INFO mapred.JobClient: Combine output records=0
> 13/08/17 07:27:22 INFO mapred.JobClient: Map input records=2640
> 13/08/17 07:27:22 INFO mapred.JobClient: Reduce shuffle bytes=0
> 13/08/17 07:27:22 INFO mapred.JobClient: Physical memory (bytes)
> snapshot=0
> 13/08/17 07:27:22 INFO mapred.JobClient: Reduce output records=1
> 13/08/17 07:27:22 INFO mapred.JobClient: Spilled Records=5280
> 13/08/17 07:27:22 INFO mapred.JobClient: Map output bytes=36960
> 13/08/17 07:27:22 INFO mapred.JobClient: CPU time spent (ms)=0
> 13/08/17 07:27:22 INFO mapred.JobClient: Total committed heap usage
> (bytes)=321527808
> 13/08/17 07:27:22 INFO mapred.JobClient: Virtual memory (bytes)
> snapshot=0
> 13/08/17 07:27:22 INFO mapred.JobClient: Combine input records=0
> 13/08/17 07:27:22 INFO mapred.JobClient: Map output records=2640
> 13/08/17 07:27:22 INFO mapred.JobClient: SPLIT_RAW_BYTES=65
> 13/08/17 07:27:22 INFO mapred.JobClient: Reduce input records=2640
>
*
*
*Question:::(please don't laugh at me if i will ask you a silly question)
*
Here in code i set output directory.But,when i seen in hdfs directory it is
not contain any part-0000 file.It contains only SUCCESS file.*
*
Can i ask why it is happen.??*
*
On Sat, Aug 17, 2013 at 5:24 PM, Jean-Marc Spaggiari <
[email protected]> wrote:
> Hi Manish.
>
> Looking a bit more at this, I think the issue is because you "floats" are
> written in you table as strings and not as floats....
>
> Can you try something link:
> context.write(stock_symbol,new FloatWritable(
> Float.parseFloat(Bytes.toString(val)))));
>
> Also, as asked previously, " can you please paste you code on pastbin?
> Same for the exception."?
>
> Thanks,
>
> JM
>
> 2013/8/17 manish dunani <[email protected]>
>
> > Hey jean,
> > I did it according to you.
> > I convert as u told..But still face the same error.
> >
> > And ted I am new to this don't get an idea how can i use this method..Can
> > You please show me..?
> >
> > Your Help will be appreciated..
> >
> >
> >
> > On Fri, Aug 16, 2013 at 10:04 PM, Ted Yu <[email protected]> wrote:
> >
> > > Here is javadoc for toFloat():
> > >
> > > * Presumes float encoded as IEEE 754 floating-point "single format"
> > >
> > > * @param bytes byte array
> > >
> > > * @return Float made from passed byte array.
> > >
> > > */
> > >
> > > public static float toFloat(byte [] bytes) {
> > >
> > > So for values of '2.5', toFloat() is not the proper method.
> > >
> > > You can float conversion provided by Java.
> > >
> > >
> > > Cheers
> > >
> > >
> > > On Fri, Aug 16, 2013 at 6:57 AM, Ted Yu <[email protected]> wrote:
> > >
> > > > Here is code from Bytes:
> > > >
> > > > public static float toFloat(byte [] bytes, int offset) {
> > > >
> > > > return Float.intBitsToFloat(toInt(bytes, offset, SIZEOF_INT));
> > > >
> > > > Looking at your sample data:
> > > >
> > > > 2010-02-04 column=stocks:open, timestamp=1376567559424,
> > > > value=*2.5*
> > > >
> > > > The length of value didn't match SIZEOF_INT.
> > > >
> > > > It seems you need to validate the values first.
> > > >
> > > >
> > > > Cheers
> > > >
> > > >
> > > > On Fri, Aug 16, 2013 at 3:42 AM, manish dunani <[email protected]
> > > >wrote:
> > > >
> > > >> hello,
> > > >>
> > > >> I am using apache hadoop 1.1.2 and hbase 0.94.9 on pseudo distibuted
> > > mode.
> > > >>
> > > >> I am trying to find Average open stocks values.
> > > >>
> > > >> *sample dataset in hbase::**(table name:nyse4)*
> > > >>
> > > >>
> > > >> 2010-02-04 column=stocks:open, timestamp=1376567559424,
> > > >> value=2.5
> > > >> 2010-02-04 column=stocks:symbol, timestamp=1376567559424,
> > > >> value=QXM
> > > >> 2010-02-05 column=stocks:open, timestamp=1376567559429,
> > > >> value=2.42
> > > >> 2010-02-05 column=stocks:symbol, timestamp=1376567559429,
> > > >> value=QXM
> > > >> 2010-02-08 column=stocks:open, timestamp=1376567559431,
> > > >> value=2.33
> > > >> 2010-02-08 column=stocks:symbol, timestamp=1376567559431,
> > > >> value=QXM
> > > >>
> > > >> *code:*(please ignores the lines that are commenting)
> > > >>
> > > >>
> > > >> > package com.maddy;
> > > >> >
> > > >> > import java.io.IOException;
> > > >> >
> > > >> > import org.apache.hadoop.conf.Configuration;
> > > >> > import org.apache.hadoop.fs.Path;
> > > >> > import org.apache.hadoop.hbase.HBaseConfiguration;
> > > >> > import org.apache.hadoop.hbase.client.Put;
> > > >> > import org.apache.hadoop.hbase.client.Result;
> > > >> > import org.apache.hadoop.hbase.client.Scan;
> > > >> > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
> > > >> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapper;
> > > >> > import org.apache.hadoop.hbase.mapreduce.TableReducer;
> > > >> > import org.apache.hadoop.hbase.util.Bytes;
> > > >> > //import org.apache.hadoop.io.DoubleWritable;
> > > >> > import org.apache.hadoop.io.FloatWritable;
> > > >> > import org.apache.hadoop.mapreduce.Job;
> > > >> > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> > > >> >
> > > >> >
> > > >> > public class openaveragestock
> > > >> > {
> > > >> > public static class map extends
> > > >> > TableMapper<ImmutableBytesWritable,FloatWritable>
> > > >> > {
> > > >> > @Override
> > > >> >
> > > >> > public void map(ImmutableBytesWritable row,Result
> > > value,Context
> > > >> > context) throws IOException
> > > >> > {
> > > >> >
> > > >> > byte[]
> > > >> > val=(value.getValue("stocks".getBytes(),"open".getBytes()));
> > > >> > //byte[]
> > > >> > val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
> > > >> >
> > > >> >
> > > >> > ImmutableBytesWritable stock_symbol=new
> > > >> > ImmutableBytesWritable("symbol".getBytes());
> > > >> >
> > > >> >
> > > >> > try
> > > >> > {
> > > >> > context.write(stock_symbol,new
> > > >> > FloatWritable(Bytes.toFloat(val)));
> > > >> > }
> > > >> > catch(InterruptedException e)
> > > >> >
> > > >> > {
> > > >> > throw new IOException(e);
> > > >> > }
> > > >> >
> > > >> >
> > > >> > }
> > > >> >
> > > >> >
> > > >> > }
> > > >> >
> > > >> >
> > > >> > public static class reduce extends
> > > >> >
> > > >>
> > >
> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
> > > >> > {
> > > >> >
> > > >> > @Override
> > > >> > public void reduce(ImmutableBytesWritable
> > > >> > key,Iterable<FloatWritable>values,Context context) throws
> > IOException,
> > > >> > InterruptedException
> > > >> > {
> > > >> > float sum=0;
> > > >> > int count=0;
> > > >> > // float average=0;
> > > >> > for(FloatWritable val:values)
> > > >> > {
> > > >> > sum+=val.get();
> > > >> > count++;
> > > >> > }
> > > >> > //average=(sum/count);
> > > >> > Put put=new Put(key.get());
> > > >> >
> > > >> >
> > > >>
> > >
> >
> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(sum/count));
> > > >> > System.out.println("For\t"+count+"\t average
> > > >> is:"+(sum/count));
> > > >> > context.write(key,put);
> > > >> >
> > > >> > }
> > > >> >
> > > >> > }
> > > >> >
> > > >> > public static void main(String args[]) throws IOException,
> > > >> > ClassNotFoundException, InterruptedException
> > > >> > {
> > > >> > Configuration config=HBaseConfiguration.create();
> > > >> > config.addResource("/home/manish/workspace/hbase
> > > >> > project/bin/hbase-site.xml");
> > > >> > Job job=new Job(config,"openstockaverage1");
> > > >> >
> > > >> >
> > > >> > Scan scan=new Scan();
> > > >> > scan.addFamily("stocks".getBytes());
> > > >> > scan.setFilter(new FirstKeyOnlyFilter());
> > > >> >
> > > >> > TableMapReduceUtil.initTableMapperJob("nyse4",
> > > >> > scan,
> > > >> > map.class,
> > > >> > ImmutableBytesWritable.class,
> > > >> > FloatWritable.class,
> > > >> > job);
> > > >> >
> > > >> > TableMapReduceUtil.initTableReducerJob("nyse5",
> > > >> > reduce.class,
> > > >> > job);
> > > >> > //job.setReducerClass(reduce.class);
> > > >> >
> > > >> > FileOutputFormat.setOutputPath(job, new Path(
> > > >> >
> > > >> "hdfs://localhost:54310/user/manish/edurekahbasehadoop1"));
> > > >> > job.waitForCompletion(true);
> > > >> > }
> > > >> >
> > > >> > }
> > > >> >
> > > >> >
> > > >> >
> > > >> > *===>Got stuck into error:*
> > > >> >
> > > >> >
> > > >> > 13/08/16 03:21:45 INFO mapred.JobClient: Running job:
> job_local_0001
> > > >> > 13/08/16 03:21:46 INFO mapred.JobClient: map 0% reduce 0%
> > > >> > 13/08/16 03:21:46 INFO mapreduce.TableOutputFormat: Created table
> > > >> instance
> > > >> > for nyse5
> > > >> > 13/08/16 03:21:46 INFO util.ProcessTree: setsid exited with exit
> > code
> > > 0
> > > >> > 13/08/16 03:21:47 INFO mapred.Task: Using
> ResourceCalculatorPlugin
> > :
> > > >> > org.apache.hadoop.util.LinuxResourceCalculatorPlugin@50b77c
> > > >> > 13/08/16 03:21:47 INFO mapred.MapTask: io.sort.mb = 100
> > > >> > 13/08/16 03:21:53 INFO mapred.MapTask: data buffer =
> > 79691776/99614720
> > > >> > 13/08/16 03:21:53 INFO mapred.MapTask: record buffer =
> 262144/327680
> > > >> > 13/08/16 03:21:54 WARN mapred.LocalJobRunner: job_local_0001
> > > >> > java.lang.IllegalArgumentException: offset (0) + length (4) exceed
> > the
> > > >> > capacity of the array: 3
> > > >> > at
> > > >> >
> > > >>
> > >
> >
> org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:543)
> > > >> > at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:690)
> > > >> > at org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:584)
> > > >> > at org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:574)
> > > >> > at
> com.maddy.openaveragestock$map.map(openaveragestock.java:41)
> > > >> > at com.maddy.openaveragestock$map.map(openaveragestock.java:1)
> > > >> > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> > > >> > at
> > org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
> > > >> > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
> > > >> > at
> > > >> >
> > >
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
> > > >> >
> > > >> >
> > > >> I cannot find where is it fail??
> > > >> Can you please tell me??
> > > >> where i was wrong..?
> > > >>
> > > >>
> > > >> Your help will be appreciated.
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Regards
> > > >>
> > > >> *Manish Dunani*
> > > >> *Contact No* : +91 9408329137
> > > >> *skype id* : manish.dunani*
> > > >> *
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > Regards
> >
> > *Manish Dunani*
> > *Contact No* : +91 9408329137
> > *skype id* : manish.dunani*
> > *
> >
>
--
Regards
*Manish Dunani*
*Contact No* : +91 9408329137
*skype id* : manish.dunani*
*