hello, jean Did u find it??
On Sun, Aug 18, 2013 at 8:28 AM, manish dunani <[email protected]> wrote: > But i want my output likewise:: > > > ROW CELL+COLUMN > > QXM column=stocks_output:average, > timestamp=XXXXXXXXXX, val > ue=XXXX > QTM column=stocks_output:average, > timestamp=XXXXXXXXXX, val > ue=XXXX > > > *sample dataset in hbase::**(table name:nyse4)* > > > 2010-02-04 column=stocks:open, timestamp=1376567559424, > value=2.5 > 2010-02-04 column=stocks:symbol, timestamp=1376567559424, > value=QXM > 2010-02-05 column=stocks:open, timestamp=1376567559429, > value=2.42 > 2010-02-05 column=stocks:symbol, timestamp=1376567559429, > value=QXM > > > ===>>In my previous output i didn't get any symbol(qulifier)'s values in > my table as Row key.. > > > > hbase(main):004:0> scan 'nyse5' > ROW COLUMN+CELL > symbol column=stocks_output:average, > timestamp=1376749641978, val > ue=@\xC6o\x11 > > *So,that i changed my programme like wise::* > > > package com.maddy; > > import java.io.IOException; > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.hbase.HBaseConfiguration; > import org.apache.hadoop.hbase.client.Result; > import org.apache.hadoop.hbase.client.Scan; > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; > import org.apache.hadoop.hbase.mapreduce.TableMapper; > import org.apache.hadoop.hbase.util.Bytes; > //import org.apache.hadoop.io.DoubleWritable; > import org.apache.hadoop.io.FloatWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.mapreduce.Reducer; > > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; > > > public class openaveragestock > { > public static class map extends TableMapper<Text,FloatWritable> > { > private static String col_family="stocks"; > private static String qul="open"; > > private static String col_family1="stocks"; > private static String qul1="symbol"; > > private static byte[] colfamily2=Bytes.toBytes(col_family); > private static byte[] qul2=Bytes.toBytes(qul); > > private static byte[] colfamily3=Bytes.toBytes(col_family1); > private static byte[] qul3=Bytes.toBytes(qul1); > > // public static float toFloat(int qul2) > // { > // return Float.intBitsToFloat((qul2)); > // > // } > // > private static Text k1=new Text(); > > > > public void map(ImmutableBytesWritable row,Result value,Context > context) throws IOException > { > > > //byte[] > val1=(value.getValue("stocks".getBytes(),"symbol".getBytes())); > //String > k=Bytes.toString(value.getValue(Bytes.toBytes("stocks"),Bytes.toBytes("symbol"))); > byte[] val=value.getValue(colfamily2,qul2); > String k=Bytes.toString(value.getValue(colfamily3,qul3)); > > //ImmutableBytesWritable stock_symbol=new > ImmutableBytesWritable(qul3); > > k1.set(k); > > try > { > > context.write(k1,new > FloatWritable(Float.parseFloat(Bytes.toString(val)))); > > } > > catch(InterruptedException e) > > { > throw new IOException(e); > } > > > } > > > } > > > public static class Reduce extends Reducer<Text,FloatWritable, > Text,FloatWritable> > { > > public void reduce(Text key,Iterable<FloatWritable>values,Context > context) throws IOException, InterruptedException > > { > float sum=0; > int count=0; > float average=0; > for(FloatWritable val:values) > { > sum+=val.get(); > count++; > } > average=(sum/count); > // Put put=new Put(key.getBytes()); > // > put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average)); > > System.out.println("For\t"+count+"\t average is:"+average); > context.write(key,new FloatWritable(average)); > > > } > > } > > public static void main(String args[]) throws IOException, > ClassNotFoundException, InterruptedException > { > Configuration config=HBaseConfiguration.create(); > config.addResource("/home/manish/workspace/hbase > project/bin/hbase-site.xml"); > Job job=new Job(config,"openstockaverage1"); > > > Scan scan=new Scan(); > scan.addFamily("stocks".getBytes()); > scan.setFilter(new FirstKeyOnlyFilter()); > > TableMapReduceUtil.initTableMapperJob("nyse4", > scan, > map.class, > ImmutableBytesWritable.class, > FloatWritable.class, > job); > > // TableMapReduceUtil.initTableReducerJob("nyse5", > // Reduce.class, > // job); > job.setReducerClass(Reduce.class); > > FileOutputFormat.setOutputPath(job, new Path( > "hdfs://localhost:54310/user/manish/Full_final_output_5")); > job.waitForCompletion(true); > } > > > > } > > > *It throws the error::* > > 13/08/17 19:37:59 INFO mapred.JobClient: Running job: job_local_0001 > 13/08/17 19:37:59 INFO util.ProcessTree: setsid exited with exit code 0 > 13/08/17 19:37:59 INFO mapred.Task: Using ResourceCalculatorPlugin : > org.apache.hadoop.util.LinuxResourceCalculatorPlugin@bd96dd > 13/08/17 19:37:59 INFO mapred.MapTask: io.sort.mb = 100 > 13/08/17 19:38:00 INFO mapred.JobClient: map 0% reduce 0% > 13/08/17 19:38:00 INFO mapred.MapTask: data buffer = 79691776/99614720 > 13/08/17 19:38:00 INFO mapred.MapTask: record buffer = 262144/327680 > 13/08/17 19:38:00 WARN mapred.LocalJobRunner: job_local_0001 > java.lang.NullPointerException > at org.apache.hadoop.io.Text.encode(Text.java:388) > at org.apache.hadoop.io.Text.set(Text.java:178) > at com.maddy.openaveragestock$map.map(openaveragestock.java:59) > > at com.maddy.openaveragestock$map.map(openaveragestock.java:1) > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) > at > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214) > 13/08/17 19:38:01 INFO mapred.JobClient: Job complete: job_local_0001 > 13/08/17 19:38:01 INFO mapred.JobClient: Counters: 0 > > > > > > On Sun, Aug 18, 2013 at 7:55 AM, manish dunani <[email protected]>wrote: > >> *Here is output table===>>> >> >> * >>> >>> hbase(main):004:0> scan 'nyse5' >>> ROW >>> COLUMN+CELL >>> symbol column=stocks_output:average, >>> timestamp=1376749641978, val >>> ue=@\xC6o\x11 >>> >> >> >> >> >> *Sample output at my eclipse:::* >> >> 13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments >> 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass, with 1 >> segments left of total size: 42242 bytes >> 13/08/17 07:27:21 INFO mapred.LocalJobRunner: >> 13/08/17 07:27:21 INFO mapred.JobClient: map 100% reduce 0% >> * >> For 2640 average is:6.201058* >> >> >> On Sat, Aug 17, 2013 at 11:59 PM, Jean-Marc Spaggiari < >> [email protected]> wrote: >> >>> Are you outputting to a table? From your code, I don't see any output >>> configured. >>> >>> 2013/8/17 manish dunani <[email protected]> >>> >>> > Thanx a lot!! >>> > Jean. >>> > >>> > I am very thankful to you..And off course Ted also doing very good job. >>> > * >>> > >>> > Revised Code ::* >>> > >>> > Package com.maddy; >>> > > >>> > > import java.io.IOException; >>> > > >>> > > import org.apache.hadoop.conf.Configuration; >>> > > import org.apache.hadoop.fs.Path; >>> > > import org.apache.hadoop.hbase.HBaseConfiguration; >>> > > import org.apache.hadoop.hbase.client.Put; >>> > > import org.apache.hadoop.hbase.client.Result; >>> > > import org.apache.hadoop.hbase.client.Scan; >>> > > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; >>> > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; >>> > > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; >>> > > import org.apache.hadoop.hbase.mapreduce.TableMapper; >>> > > import org.apache.hadoop.hbase.mapreduce.TableReducer; >>> > > import org.apache.hadoop.hbase.util.Bytes; >>> > > //import org.apache.hadoop.io.DoubleWritable; >>> > > import org.apache.hadoop.io.FloatWritable; >>> > > import org.apache.hadoop.mapreduce.Job; >>> > > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; >>> > > >>> > > >>> > > public class openaveragestock >>> > > { >>> > > public static class map extends >>> > > TableMapper<ImmutableBytesWritable,FloatWritable> >>> > > { >>> > > private static String col_family="stocks"; >>> > > private static String qul="open"; >>> > > >>> > > private static String col_family1="stocks"; >>> > > private static String qul1="symbol"; >>> > > >>> > > private static byte[] colfamily2=Bytes.toBytes(col_family); >>> > > private static byte[] qul2=Bytes.toBytes(qul); >>> > > >>> > > private static byte[] colfamily3=Bytes.toBytes(col_family1); >>> > > private static byte[] qul3=Bytes.toBytes(qul1); >>> > > >>> > > // public static float toFloat(int qul2) >>> > > // { >>> > > // return Float.intBitsToFloat((qul2)); >>> > > // >>> > > // } >>> > > // >>> > > >>> > > >>> > > >>> > > public void map(ImmutableBytesWritable row,Result >>> value,Context >>> > > context) throws IOException >>> > > { >>> > > >>> > > >>> > > //byte[] >>> > > val1=(value.getValue("stocks".getBytes(),"symbol".getBytes())); >>> > > byte[] val=value.getValue(colfamily2,qul2); >>> > > >>> > > >>> > > ImmutableBytesWritable stock_symbol=new >>> > > ImmutableBytesWritable(qul3); >>> > > >>> > > >>> > > >>> > > try >>> > > { >>> > > >>> > > context.write(stock_symbol,new >>> > > FloatWritable(Float.parseFloat(Bytes.toString(val)))); >>> > > } >>> > > >>> > > catch(InterruptedException e) >>> > > >>> > > { >>> > > throw new IOException(e); >>> > > } >>> > > >>> > > >>> > > } >>> > > >>> > > >>> > > } >>> > > >>> > > >>> > > public static class reduce extends >>> > > >>> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable> >>> > > { >>> > > >>> > > @Override >>> > > public void reduce(ImmutableBytesWritable >>> > > key,Iterable<FloatWritable>values,Context context) throws >>> IOException, >>> > > InterruptedException >>> > > { >>> > > float sum=0; >>> > > int count=0; >>> > > float average=0; >>> > > for(FloatWritable val:values) >>> > > { >>> > > sum+=val.get(); >>> > > count++; >>> > > } >>> > > average=(sum/count); >>> > > Put put=new Put(key.get()); >>> > > >>> > > >>> > >>> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average)); >>> > > System.out.println("For\t"+count+"\t average >>> is:"+average); >>> > > context.write(key,put); >>> > > >>> > > } >>> > > >>> > > } >>> > > >>> > > public static void main(String args[]) throws IOException, >>> > > ClassNotFoundException, InterruptedException >>> > > { >>> > > Configuration config=HBaseConfiguration.create(); >>> > > config.addResource("/home/manish/workspace/hbase >>> > > project/bin/hbase-site.xml"); >>> > > Job job=new Job(config,"openstockaverage1"); >>> > > >>> > > >>> > > Scan scan=new Scan(); >>> > > scan.addFamily("stocks".getBytes()); >>> > > scan.setFilter(new FirstKeyOnlyFilter()); >>> > > >>> > > TableMapReduceUtil.initTableMapperJob("nyse4", >>> > > scan, >>> > > map.class, >>> > > ImmutableBytesWritable.class, >>> > > FloatWritable.class, >>> > > job); >>> > > >>> > > TableMapReduceUtil.initTableReducerJob("nyse5", >>> > > reduce.class, >>> > > job); >>> > > //job.setReducerClass(reduce.class); >>> > > >>> > > //FileOutputFormat.setOutputPath(job, new Path( >>> > > // >>> > > "hdfs://localhost:54310/user/manish/final_hbase_hadoop")); >>> > > job.waitForCompletion(true); >>> > > } >>> > > >>> > > >>> > > >>> > > } >>> > > >>> > > >>> > *Sample output at my eclipse:::* >>> > >>> > 13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments >>> > > 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass, >>> with 1 >>> > > segments left of total size: 42242 bytes >>> > > 13/08/17 07:27:21 INFO mapred.LocalJobRunner: >>> > > 13/08/17 07:27:21 INFO mapred.JobClient: map 100% reduce 0% >>> > > *For 2640 average is:6.201058* >>> > > 13/08/17 07:27:21 INFO mapred.Task: >>> Task:attempt_local_0001_r_000000_0 is >>> > > done. And is in the process of commiting >>> > > 13/08/17 07:27:21 INFO mapred.LocalJobRunner: reduce > reduce >>> > > 13/08/17 07:27:21 INFO mapred.Task: Task >>> 'attempt_local_0001_r_000000_0' >>> > > done. >>> > > 13/08/17 07:27:22 WARN mapred.FileOutputCommitter: Output path is >>> null in >>> > > cleanup >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: map 100% reduce 100% >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Job complete: job_local_0001 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Counters: 30 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: HBase Counters >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: REMOTE_RPC_CALLS=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: RPC_CALLS=2643 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: RPC_RETRIES=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: >>> > NOT_SERVING_REGION_EXCEPTION=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: NUM_SCANNER_RESTARTS=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: >>> MILLIS_BETWEEN_NEXTS=5849 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: BYTES_IN_RESULTS=126719 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: >>> BYTES_IN_REMOTE_RESULTS=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: REGIONS_SCANNED=1 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: REMOTE_RPC_RETRIES=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: File Output Format >>> Counters >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Bytes Written=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: FileSystemCounters >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: FILE_BYTES_READ=24176810 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: >>> FILE_BYTES_WRITTEN=24552208 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: File Input Format Counters >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Bytes Read=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Map-Reduce Framework >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Reduce input groups=1 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Map output materialized >>> > > bytes=42246 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Combine output records=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Map input records=2640 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Reduce shuffle bytes=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Physical memory (bytes) >>> > > snapshot=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Reduce output records=1 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Spilled Records=5280 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Map output bytes=36960 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: CPU time spent (ms)=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Total committed heap >>> usage >>> > > (bytes)=321527808 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Virtual memory (bytes) >>> > > snapshot=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Combine input records=0 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Map output records=2640 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: SPLIT_RAW_BYTES=65 >>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Reduce input >>> records=2640 >>> > > >>> > >>> > * >>> > * >>> > *Question:::(please don't laugh at me if i will ask you a silly >>> question) >>> > >>> > * >>> > Here in code i set output directory.But,when i seen in hdfs directory >>> it is >>> > not contain any part-0000 file.It contains only SUCCESS file.* >>> > * >>> > Can i ask why it is happen.??* >>> > >>> > >>> > >>> > * >>> > >>> > >>> > >>> > >>> > On Sat, Aug 17, 2013 at 5:24 PM, Jean-Marc Spaggiari < >>> > [email protected]> wrote: >>> > >>> > > Hi Manish. >>> > > >>> > > Looking a bit more at this, I think the issue is because you >>> "floats" are >>> > > written in you table as strings and not as floats.... >>> > > >>> > > Can you try something link: >>> > > context.write(stock_symbol,new FloatWritable( >>> > > Float.parseFloat(Bytes.toString(val))))); >>> > > >>> > > Also, as asked previously, " can you please paste you code on >>> pastbin? >>> > > Same for the exception."? >>> > > >>> > > Thanks, >>> > > >>> > > JM >>> > > >>> > > 2013/8/17 manish dunani <[email protected]> >>> > > >>> > > > Hey jean, >>> > > > I did it according to you. >>> > > > I convert as u told..But still face the same error. >>> > > > >>> > > > And ted I am new to this don't get an idea how can i use this >>> > method..Can >>> > > > You please show me..? >>> > > > >>> > > > Your Help will be appreciated.. >>> > > > >>> > > > >>> > > > >>> > > > On Fri, Aug 16, 2013 at 10:04 PM, Ted Yu <[email protected]> >>> wrote: >>> > > > >>> > > > > Here is javadoc for toFloat(): >>> > > > > >>> > > > > * Presumes float encoded as IEEE 754 floating-point "single >>> > format" >>> > > > > >>> > > > > * @param bytes byte array >>> > > > > >>> > > > > * @return Float made from passed byte array. >>> > > > > >>> > > > > */ >>> > > > > >>> > > > > public static float toFloat(byte [] bytes) { >>> > > > > >>> > > > > So for values of '2.5', toFloat() is not the proper method. >>> > > > > >>> > > > > You can float conversion provided by Java. >>> > > > > >>> > > > > >>> > > > > Cheers >>> > > > > >>> > > > > >>> > > > > On Fri, Aug 16, 2013 at 6:57 AM, Ted Yu <[email protected]> >>> wrote: >>> > > > > >>> > > > > > Here is code from Bytes: >>> > > > > > >>> > > > > > public static float toFloat(byte [] bytes, int offset) { >>> > > > > > >>> > > > > > return Float.intBitsToFloat(toInt(bytes, offset, >>> SIZEOF_INT)); >>> > > > > > >>> > > > > > Looking at your sample data: >>> > > > > > >>> > > > > > 2010-02-04 column=stocks:open, >>> timestamp=1376567559424, >>> > > > > > value=*2.5* >>> > > > > > >>> > > > > > The length of value didn't match SIZEOF_INT. >>> > > > > > >>> > > > > > It seems you need to validate the values first. >>> > > > > > >>> > > > > > >>> > > > > > Cheers >>> > > > > > >>> > > > > > >>> > > > > > On Fri, Aug 16, 2013 at 3:42 AM, manish dunani < >>> > [email protected] >>> > > > > >wrote: >>> > > > > > >>> > > > > >> hello, >>> > > > > >> >>> > > > > >> I am using apache hadoop 1.1.2 and hbase 0.94.9 on pseudo >>> > distibuted >>> > > > > mode. >>> > > > > >> >>> > > > > >> I am trying to find Average open stocks values. >>> > > > > >> >>> > > > > >> *sample dataset in hbase::**(table name:nyse4)* >>> > > > > >> >>> > > > > >> >>> > > > > >> 2010-02-04 column=stocks:open, >>> timestamp=1376567559424, >>> > > > > >> value=2.5 >>> > > > > >> 2010-02-04 column=stocks:symbol, >>> > timestamp=1376567559424, >>> > > > > >> value=QXM >>> > > > > >> 2010-02-05 column=stocks:open, >>> timestamp=1376567559429, >>> > > > > >> value=2.42 >>> > > > > >> 2010-02-05 column=stocks:symbol, >>> > timestamp=1376567559429, >>> > > > > >> value=QXM >>> > > > > >> 2010-02-08 column=stocks:open, >>> timestamp=1376567559431, >>> > > > > >> value=2.33 >>> > > > > >> 2010-02-08 column=stocks:symbol, >>> > timestamp=1376567559431, >>> > > > > >> value=QXM >>> > > > > >> >>> > > > > >> *code:*(please ignores the lines that are commenting) >>> > > > > >> >>> > > > > >> >>> > > > > >> > package com.maddy; >>> > > > > >> > >>> > > > > >> > import java.io.IOException; >>> > > > > >> > >>> > > > > >> > import org.apache.hadoop.conf.Configuration; >>> > > > > >> > import org.apache.hadoop.fs.Path; >>> > > > > >> > import org.apache.hadoop.hbase.HBaseConfiguration; >>> > > > > >> > import org.apache.hadoop.hbase.client.Put; >>> > > > > >> > import org.apache.hadoop.hbase.client.Result; >>> > > > > >> > import org.apache.hadoop.hbase.client.Scan; >>> > > > > >> > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; >>> > > > > >> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; >>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; >>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapper; >>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableReducer; >>> > > > > >> > import org.apache.hadoop.hbase.util.Bytes; >>> > > > > >> > //import org.apache.hadoop.io.DoubleWritable; >>> > > > > >> > import org.apache.hadoop.io.FloatWritable; >>> > > > > >> > import org.apache.hadoop.mapreduce.Job; >>> > > > > >> > import >>> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > public class openaveragestock >>> > > > > >> > { >>> > > > > >> > public static class map extends >>> > > > > >> > TableMapper<ImmutableBytesWritable,FloatWritable> >>> > > > > >> > { >>> > > > > >> > @Override >>> > > > > >> > >>> > > > > >> > public void map(ImmutableBytesWritable row,Result >>> > > > > value,Context >>> > > > > >> > context) throws IOException >>> > > > > >> > { >>> > > > > >> > >>> > > > > >> > byte[] >>> > > > > >> > val=(value.getValue("stocks".getBytes(),"open".getBytes())); >>> > > > > >> > //byte[] >>> > > > > >> > >>> val1=(value.getValue("stocks".getBytes(),"symbol".getBytes())); >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > ImmutableBytesWritable stock_symbol=new >>> > > > > >> > ImmutableBytesWritable("symbol".getBytes()); >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > try >>> > > > > >> > { >>> > > > > >> > context.write(stock_symbol,new >>> > > > > >> > FloatWritable(Bytes.toFloat(val))); >>> > > > > >> > } >>> > > > > >> > catch(InterruptedException e) >>> > > > > >> > >>> > > > > >> > { >>> > > > > >> > throw new IOException(e); >>> > > > > >> > } >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > } >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > } >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > public static class reduce extends >>> > > > > >> > >>> > > > > >> >>> > > > > >>> > > >>> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable> >>> > > > > >> > { >>> > > > > >> > >>> > > > > >> > @Override >>> > > > > >> > public void reduce(ImmutableBytesWritable >>> > > > > >> > key,Iterable<FloatWritable>values,Context context) throws >>> > > > IOException, >>> > > > > >> > InterruptedException >>> > > > > >> > { >>> > > > > >> > float sum=0; >>> > > > > >> > int count=0; >>> > > > > >> > // float average=0; >>> > > > > >> > for(FloatWritable val:values) >>> > > > > >> > { >>> > > > > >> > sum+=val.get(); >>> > > > > >> > count++; >>> > > > > >> > } >>> > > > > >> > //average=(sum/count); >>> > > > > >> > Put put=new Put(key.get()); >>> > > > > >> > >>> > > > > >> > >>> > > > > >> >>> > > > > >>> > > > >>> > > >>> > >>> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(sum/count)); >>> > > > > >> > System.out.println("For\t"+count+"\t average >>> > > > > >> is:"+(sum/count)); >>> > > > > >> > context.write(key,put); >>> > > > > >> > >>> > > > > >> > } >>> > > > > >> > >>> > > > > >> > } >>> > > > > >> > >>> > > > > >> > public static void main(String args[]) throws >>> IOException, >>> > > > > >> > ClassNotFoundException, InterruptedException >>> > > > > >> > { >>> > > > > >> > Configuration config=HBaseConfiguration.create(); >>> > > > > >> > config.addResource("/home/manish/workspace/hbase >>> > > > > >> > project/bin/hbase-site.xml"); >>> > > > > >> > Job job=new Job(config,"openstockaverage1"); >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > Scan scan=new Scan(); >>> > > > > >> > scan.addFamily("stocks".getBytes()); >>> > > > > >> > scan.setFilter(new FirstKeyOnlyFilter()); >>> > > > > >> > >>> > > > > >> > TableMapReduceUtil.initTableMapperJob("nyse4", >>> > > > > >> > scan, >>> > > > > >> > map.class, >>> > > > > >> > ImmutableBytesWritable.class, >>> > > > > >> > FloatWritable.class, >>> > > > > >> > job); >>> > > > > >> > >>> > > > > >> > TableMapReduceUtil.initTableReducerJob("nyse5", >>> > > > > >> > reduce.class, >>> > > > > >> > job); >>> > > > > >> > //job.setReducerClass(reduce.class); >>> > > > > >> > >>> > > > > >> > FileOutputFormat.setOutputPath(job, new Path( >>> > > > > >> > >>> > > > > >> "hdfs://localhost:54310/user/manish/edurekahbasehadoop1")); >>> > > > > >> > job.waitForCompletion(true); >>> > > > > >> > } >>> > > > > >> > >>> > > > > >> > } >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > *===>Got stuck into error:* >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > 13/08/16 03:21:45 INFO mapred.JobClient: Running job: >>> > > job_local_0001 >>> > > > > >> > 13/08/16 03:21:46 INFO mapred.JobClient: map 0% reduce 0% >>> > > > > >> > 13/08/16 03:21:46 INFO mapreduce.TableOutputFormat: Created >>> > table >>> > > > > >> instance >>> > > > > >> > for nyse5 >>> > > > > >> > 13/08/16 03:21:46 INFO util.ProcessTree: setsid exited with >>> exit >>> > > > code >>> > > > > 0 >>> > > > > >> > 13/08/16 03:21:47 INFO mapred.Task: Using >>> > > ResourceCalculatorPlugin >>> > > > : >>> > > > > >> > org.apache.hadoop.util.LinuxResourceCalculatorPlugin@50b77c >>> > > > > >> > 13/08/16 03:21:47 INFO mapred.MapTask: io.sort.mb = 100 >>> > > > > >> > 13/08/16 03:21:53 INFO mapred.MapTask: data buffer = >>> > > > 79691776/99614720 >>> > > > > >> > 13/08/16 03:21:53 INFO mapred.MapTask: record buffer = >>> > > 262144/327680 >>> > > > > >> > 13/08/16 03:21:54 WARN mapred.LocalJobRunner: job_local_0001 >>> > > > > >> > java.lang.IllegalArgumentException: offset (0) + length (4) >>> > exceed >>> > > > the >>> > > > > >> > capacity of the array: 3 >>> > > > > >> > at >>> > > > > >> > >>> > > > > >> >>> > > > > >>> > > > >>> > > >>> > >>> org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:543) >>> > > > > >> > at >>> org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:690) >>> > > > > >> > at >>> > org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:584) >>> > > > > >> > at >>> > org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:574) >>> > > > > >> > at >>> > > com.maddy.openaveragestock$map.map(openaveragestock.java:41) >>> > > > > >> > at >>> > com.maddy.openaveragestock$map.map(openaveragestock.java:1) >>> > > > > >> > at >>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) >>> > > > > >> > at >>> > > > org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) >>> > > > > >> > at >>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) >>> > > > > >> > at >>> > > > > >> > >>> > > > > >>> > > >>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214) >>> > > > > >> > >>> > > > > >> > >>> > > > > >> I cannot find where is it fail?? >>> > > > > >> Can you please tell me?? >>> > > > > >> where i was wrong..? >>> > > > > >> >>> > > > > >> >>> > > > > >> Your help will be appreciated. >>> > > > > >> >>> > > > > >> >>> > > > > >> >>> > > > > >> >>> > > > > >> >>> > > > > >> >>> > > > > >> >>> > > > > >> >>> > > > > >> >>> > > > > >> >>> > > > > >> -- >>> > > > > >> Regards >>> > > > > >> >>> > > > > >> *Manish Dunani* >>> > > > > >> *Contact No* : +91 9408329137 >>> > > > > >> *skype id* : manish.dunani* >>> > > > > >> * >>> > > > > >> >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > >>> > > > >>> > > > -- >>> > > > Regards >>> > > > >>> > > > *Manish Dunani* >>> > > > *Contact No* : +91 9408329137 >>> > > > *skype id* : manish.dunani* >>> > > > * >>> > > > >>> > > >>> > >>> > >>> > >>> > -- >>> > Regards >>> > >>> > *Manish Dunani* >>> > *Contact No* : +91 9408329137 >>> > *skype id* : manish.dunani* >>> > * >>> > >>> >> >> >> >> -- >> Regards >> >> *Manish Dunani* >> *Contact No* : +91 9408329137 >> *skype id* : manish.dunani* >> * >> >> >> > > > -- > Regards > > *Manish Dunani* > *Contact No* : +91 9408329137 > *skype id* : manish.dunani* > * > > > -- Regards *Manish Dunani* *Contact No* : +91 9408329137 *skype id* : manish.dunani* *
