But i want my output likewise::
ROW CELL+COLUMN
QXM column=stocks_output:average,
timestamp=XXXXXXXXXX, val
ue=XXXX
QTM column=stocks_output:average,
timestamp=XXXXXXXXXX, val
ue=XXXX
*sample dataset in hbase::**(table name:nyse4)*
2010-02-04 column=stocks:open, timestamp=1376567559424,
value=2.5
2010-02-04 column=stocks:symbol, timestamp=1376567559424,
value=QXM
2010-02-05 column=stocks:open, timestamp=1376567559429,
value=2.42
2010-02-05 column=stocks:symbol, timestamp=1376567559429,
value=QXM
===>>In my previous output i didn't get any symbol(qulifier)'s values in my
table as Row key..
hbase(main):004:0> scan 'nyse5'
ROW COLUMN+CELL
symbol column=stocks_output:average, timestamp=1376749641978,
val
ue=@\xC6o\x11
*So,that i changed my programme like wise::*
package com.maddy;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
//import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class openaveragestock
{
public static class map extends TableMapper<Text,FloatWritable>
{
private static String col_family="stocks";
private static String qul="open";
private static String col_family1="stocks";
private static String qul1="symbol";
private static byte[] colfamily2=Bytes.toBytes(col_family);
private static byte[] qul2=Bytes.toBytes(qul);
private static byte[] colfamily3=Bytes.toBytes(col_family1);
private static byte[] qul3=Bytes.toBytes(qul1);
// public static float toFloat(int qul2)
// {
// return Float.intBitsToFloat((qul2));
//
// }
//
private static Text k1=new Text();
public void map(ImmutableBytesWritable row,Result value,Context
context) throws IOException
{
//byte[]
val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
//String
k=Bytes.toString(value.getValue(Bytes.toBytes("stocks"),Bytes.toBytes("symbol")));
byte[] val=value.getValue(colfamily2,qul2);
String k=Bytes.toString(value.getValue(colfamily3,qul3));
//ImmutableBytesWritable stock_symbol=new
ImmutableBytesWritable(qul3);
k1.set(k);
try
{
context.write(k1,new
FloatWritable(Float.parseFloat(Bytes.toString(val))));
}
catch(InterruptedException e)
{
throw new IOException(e);
}
}
}
public static class Reduce extends Reducer<Text,FloatWritable,
Text,FloatWritable>
{
public void reduce(Text key,Iterable<FloatWritable>values,Context
context) throws IOException, InterruptedException
{
float sum=0;
int count=0;
float average=0;
for(FloatWritable val:values)
{
sum+=val.get();
count++;
}
average=(sum/count);
// Put put=new Put(key.getBytes());
//
put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average));
System.out.println("For\t"+count+"\t average is:"+average);
context.write(key,new FloatWritable(average));
}
}
public static void main(String args[]) throws IOException,
ClassNotFoundException, InterruptedException
{
Configuration config=HBaseConfiguration.create();
config.addResource("/home/manish/workspace/hbase
project/bin/hbase-site.xml");
Job job=new Job(config,"openstockaverage1");
Scan scan=new Scan();
scan.addFamily("stocks".getBytes());
scan.setFilter(new FirstKeyOnlyFilter());
TableMapReduceUtil.initTableMapperJob("nyse4",
scan,
map.class,
ImmutableBytesWritable.class,
FloatWritable.class,
job);
// TableMapReduceUtil.initTableReducerJob("nyse5",
// Reduce.class,
// job);
job.setReducerClass(Reduce.class);
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://localhost:54310/user/manish/Full_final_output_5"));
job.waitForCompletion(true);
}
}
*It throws the error::*
13/08/17 19:37:59 INFO mapred.JobClient: Running job: job_local_0001
13/08/17 19:37:59 INFO util.ProcessTree: setsid exited with exit code 0
13/08/17 19:37:59 INFO mapred.Task: Using ResourceCalculatorPlugin :
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@bd96dd
13/08/17 19:37:59 INFO mapred.MapTask: io.sort.mb = 100
13/08/17 19:38:00 INFO mapred.JobClient: map 0% reduce 0%
13/08/17 19:38:00 INFO mapred.MapTask: data buffer = 79691776/99614720
13/08/17 19:38:00 INFO mapred.MapTask: record buffer = 262144/327680
13/08/17 19:38:00 WARN mapred.LocalJobRunner: job_local_0001
java.lang.NullPointerException
at org.apache.hadoop.io.Text.encode(Text.java:388)
at org.apache.hadoop.io.Text.set(Text.java:178)
at com.maddy.openaveragestock$map.map(openaveragestock.java:59)
at com.maddy.openaveragestock$map.map(openaveragestock.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
13/08/17 19:38:01 INFO mapred.JobClient: Job complete: job_local_0001
13/08/17 19:38:01 INFO mapred.JobClient: Counters: 0
On Sun, Aug 18, 2013 at 7:55 AM, manish dunani <[email protected]> wrote:
> *Here is output table===>>>
>
> *
>>
>> hbase(main):004:0> scan 'nyse5'
>> ROW
>> COLUMN+CELL
>> symbol column=stocks_output:average,
>> timestamp=1376749641978, val
>> ue=@\xC6o\x11
>>
>
>
>
>
> *Sample output at my eclipse:::*
>
> 13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments
> 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass, with 1
> segments left of total size: 42242 bytes
> 13/08/17 07:27:21 INFO mapred.LocalJobRunner:
> 13/08/17 07:27:21 INFO mapred.JobClient: map 100% reduce 0%
> *
> For 2640 average is:6.201058*
>
>
> On Sat, Aug 17, 2013 at 11:59 PM, Jean-Marc Spaggiari <
> [email protected]> wrote:
>
>> Are you outputting to a table? From your code, I don't see any output
>> configured.
>>
>> 2013/8/17 manish dunani <[email protected]>
>>
>> > Thanx a lot!!
>> > Jean.
>> >
>> > I am very thankful to you..And off course Ted also doing very good job.
>> > *
>> >
>> > Revised Code ::*
>> >
>> > Package com.maddy;
>> > >
>> > > import java.io.IOException;
>> > >
>> > > import org.apache.hadoop.conf.Configuration;
>> > > import org.apache.hadoop.fs.Path;
>> > > import org.apache.hadoop.hbase.HBaseConfiguration;
>> > > import org.apache.hadoop.hbase.client.Put;
>> > > import org.apache.hadoop.hbase.client.Result;
>> > > import org.apache.hadoop.hbase.client.Scan;
>> > > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
>> > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>> > > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
>> > > import org.apache.hadoop.hbase.mapreduce.TableMapper;
>> > > import org.apache.hadoop.hbase.mapreduce.TableReducer;
>> > > import org.apache.hadoop.hbase.util.Bytes;
>> > > //import org.apache.hadoop.io.DoubleWritable;
>> > > import org.apache.hadoop.io.FloatWritable;
>> > > import org.apache.hadoop.mapreduce.Job;
>> > > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>> > >
>> > >
>> > > public class openaveragestock
>> > > {
>> > > public static class map extends
>> > > TableMapper<ImmutableBytesWritable,FloatWritable>
>> > > {
>> > > private static String col_family="stocks";
>> > > private static String qul="open";
>> > >
>> > > private static String col_family1="stocks";
>> > > private static String qul1="symbol";
>> > >
>> > > private static byte[] colfamily2=Bytes.toBytes(col_family);
>> > > private static byte[] qul2=Bytes.toBytes(qul);
>> > >
>> > > private static byte[] colfamily3=Bytes.toBytes(col_family1);
>> > > private static byte[] qul3=Bytes.toBytes(qul1);
>> > >
>> > > // public static float toFloat(int qul2)
>> > > // {
>> > > // return Float.intBitsToFloat((qul2));
>> > > //
>> > > // }
>> > > //
>> > >
>> > >
>> > >
>> > > public void map(ImmutableBytesWritable row,Result
>> value,Context
>> > > context) throws IOException
>> > > {
>> > >
>> > >
>> > > //byte[]
>> > > val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
>> > > byte[] val=value.getValue(colfamily2,qul2);
>> > >
>> > >
>> > > ImmutableBytesWritable stock_symbol=new
>> > > ImmutableBytesWritable(qul3);
>> > >
>> > >
>> > >
>> > > try
>> > > {
>> > >
>> > > context.write(stock_symbol,new
>> > > FloatWritable(Float.parseFloat(Bytes.toString(val))));
>> > > }
>> > >
>> > > catch(InterruptedException e)
>> > >
>> > > {
>> > > throw new IOException(e);
>> > > }
>> > >
>> > >
>> > > }
>> > >
>> > >
>> > > }
>> > >
>> > >
>> > > public static class reduce extends
>> > >
>> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
>> > > {
>> > >
>> > > @Override
>> > > public void reduce(ImmutableBytesWritable
>> > > key,Iterable<FloatWritable>values,Context context) throws IOException,
>> > > InterruptedException
>> > > {
>> > > float sum=0;
>> > > int count=0;
>> > > float average=0;
>> > > for(FloatWritable val:values)
>> > > {
>> > > sum+=val.get();
>> > > count++;
>> > > }
>> > > average=(sum/count);
>> > > Put put=new Put(key.get());
>> > >
>> > >
>> >
>> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average));
>> > > System.out.println("For\t"+count+"\t average
>> is:"+average);
>> > > context.write(key,put);
>> > >
>> > > }
>> > >
>> > > }
>> > >
>> > > public static void main(String args[]) throws IOException,
>> > > ClassNotFoundException, InterruptedException
>> > > {
>> > > Configuration config=HBaseConfiguration.create();
>> > > config.addResource("/home/manish/workspace/hbase
>> > > project/bin/hbase-site.xml");
>> > > Job job=new Job(config,"openstockaverage1");
>> > >
>> > >
>> > > Scan scan=new Scan();
>> > > scan.addFamily("stocks".getBytes());
>> > > scan.setFilter(new FirstKeyOnlyFilter());
>> > >
>> > > TableMapReduceUtil.initTableMapperJob("nyse4",
>> > > scan,
>> > > map.class,
>> > > ImmutableBytesWritable.class,
>> > > FloatWritable.class,
>> > > job);
>> > >
>> > > TableMapReduceUtil.initTableReducerJob("nyse5",
>> > > reduce.class,
>> > > job);
>> > > //job.setReducerClass(reduce.class);
>> > >
>> > > //FileOutputFormat.setOutputPath(job, new Path(
>> > > //
>> > > "hdfs://localhost:54310/user/manish/final_hbase_hadoop"));
>> > > job.waitForCompletion(true);
>> > > }
>> > >
>> > >
>> > >
>> > > }
>> > >
>> > >
>> > *Sample output at my eclipse:::*
>> >
>> > 13/08/17 07:27:21 INFO mapred.Merger: Merging 1 sorted segments
>> > > 13/08/17 07:27:21 INFO mapred.Merger: Down to the last merge-pass,
>> with 1
>> > > segments left of total size: 42242 bytes
>> > > 13/08/17 07:27:21 INFO mapred.LocalJobRunner:
>> > > 13/08/17 07:27:21 INFO mapred.JobClient: map 100% reduce 0%
>> > > *For 2640 average is:6.201058*
>> > > 13/08/17 07:27:21 INFO mapred.Task:
>> Task:attempt_local_0001_r_000000_0 is
>> > > done. And is in the process of commiting
>> > > 13/08/17 07:27:21 INFO mapred.LocalJobRunner: reduce > reduce
>> > > 13/08/17 07:27:21 INFO mapred.Task: Task
>> 'attempt_local_0001_r_000000_0'
>> > > done.
>> > > 13/08/17 07:27:22 WARN mapred.FileOutputCommitter: Output path is
>> null in
>> > > cleanup
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: map 100% reduce 100%
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Job complete: job_local_0001
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Counters: 30
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: HBase Counters
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: REMOTE_RPC_CALLS=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: RPC_CALLS=2643
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: RPC_RETRIES=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
>> > NOT_SERVING_REGION_EXCEPTION=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: NUM_SCANNER_RESTARTS=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: MILLIS_BETWEEN_NEXTS=5849
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: BYTES_IN_RESULTS=126719
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: BYTES_IN_REMOTE_RESULTS=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: REGIONS_SCANNED=1
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: REMOTE_RPC_RETRIES=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: File Output Format Counters
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Bytes Written=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: FileSystemCounters
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: FILE_BYTES_READ=24176810
>> > > 13/08/17 07:27:22 INFO mapred.JobClient:
>> FILE_BYTES_WRITTEN=24552208
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: File Input Format Counters
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Bytes Read=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Map-Reduce Framework
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Reduce input groups=1
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Map output materialized
>> > > bytes=42246
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Combine output records=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Map input records=2640
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Reduce shuffle bytes=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Physical memory (bytes)
>> > > snapshot=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Reduce output records=1
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Spilled Records=5280
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Map output bytes=36960
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: CPU time spent (ms)=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Total committed heap
>> usage
>> > > (bytes)=321527808
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Virtual memory (bytes)
>> > > snapshot=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Combine input records=0
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Map output records=2640
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: SPLIT_RAW_BYTES=65
>> > > 13/08/17 07:27:22 INFO mapred.JobClient: Reduce input records=2640
>> > >
>> >
>> > *
>> > *
>> > *Question:::(please don't laugh at me if i will ask you a silly
>> question)
>> >
>> > *
>> > Here in code i set output directory.But,when i seen in hdfs directory
>> it is
>> > not contain any part-0000 file.It contains only SUCCESS file.*
>> > *
>> > Can i ask why it is happen.??*
>> >
>> >
>> >
>> > *
>> >
>> >
>> >
>> >
>> > On Sat, Aug 17, 2013 at 5:24 PM, Jean-Marc Spaggiari <
>> > [email protected]> wrote:
>> >
>> > > Hi Manish.
>> > >
>> > > Looking a bit more at this, I think the issue is because you "floats"
>> are
>> > > written in you table as strings and not as floats....
>> > >
>> > > Can you try something link:
>> > > context.write(stock_symbol,new FloatWritable(
>> > > Float.parseFloat(Bytes.toString(val)))));
>> > >
>> > > Also, as asked previously, " can you please paste you code on
>> pastbin?
>> > > Same for the exception."?
>> > >
>> > > Thanks,
>> > >
>> > > JM
>> > >
>> > > 2013/8/17 manish dunani <[email protected]>
>> > >
>> > > > Hey jean,
>> > > > I did it according to you.
>> > > > I convert as u told..But still face the same error.
>> > > >
>> > > > And ted I am new to this don't get an idea how can i use this
>> > method..Can
>> > > > You please show me..?
>> > > >
>> > > > Your Help will be appreciated..
>> > > >
>> > > >
>> > > >
>> > > > On Fri, Aug 16, 2013 at 10:04 PM, Ted Yu <[email protected]>
>> wrote:
>> > > >
>> > > > > Here is javadoc for toFloat():
>> > > > >
>> > > > > * Presumes float encoded as IEEE 754 floating-point "single
>> > format"
>> > > > >
>> > > > > * @param bytes byte array
>> > > > >
>> > > > > * @return Float made from passed byte array.
>> > > > >
>> > > > > */
>> > > > >
>> > > > > public static float toFloat(byte [] bytes) {
>> > > > >
>> > > > > So for values of '2.5', toFloat() is not the proper method.
>> > > > >
>> > > > > You can float conversion provided by Java.
>> > > > >
>> > > > >
>> > > > > Cheers
>> > > > >
>> > > > >
>> > > > > On Fri, Aug 16, 2013 at 6:57 AM, Ted Yu <[email protected]>
>> wrote:
>> > > > >
>> > > > > > Here is code from Bytes:
>> > > > > >
>> > > > > > public static float toFloat(byte [] bytes, int offset) {
>> > > > > >
>> > > > > > return Float.intBitsToFloat(toInt(bytes, offset,
>> SIZEOF_INT));
>> > > > > >
>> > > > > > Looking at your sample data:
>> > > > > >
>> > > > > > 2010-02-04 column=stocks:open,
>> timestamp=1376567559424,
>> > > > > > value=*2.5*
>> > > > > >
>> > > > > > The length of value didn't match SIZEOF_INT.
>> > > > > >
>> > > > > > It seems you need to validate the values first.
>> > > > > >
>> > > > > >
>> > > > > > Cheers
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Aug 16, 2013 at 3:42 AM, manish dunani <
>> > [email protected]
>> > > > > >wrote:
>> > > > > >
>> > > > > >> hello,
>> > > > > >>
>> > > > > >> I am using apache hadoop 1.1.2 and hbase 0.94.9 on pseudo
>> > distibuted
>> > > > > mode.
>> > > > > >>
>> > > > > >> I am trying to find Average open stocks values.
>> > > > > >>
>> > > > > >> *sample dataset in hbase::**(table name:nyse4)*
>> > > > > >>
>> > > > > >>
>> > > > > >> 2010-02-04 column=stocks:open,
>> timestamp=1376567559424,
>> > > > > >> value=2.5
>> > > > > >> 2010-02-04 column=stocks:symbol,
>> > timestamp=1376567559424,
>> > > > > >> value=QXM
>> > > > > >> 2010-02-05 column=stocks:open,
>> timestamp=1376567559429,
>> > > > > >> value=2.42
>> > > > > >> 2010-02-05 column=stocks:symbol,
>> > timestamp=1376567559429,
>> > > > > >> value=QXM
>> > > > > >> 2010-02-08 column=stocks:open,
>> timestamp=1376567559431,
>> > > > > >> value=2.33
>> > > > > >> 2010-02-08 column=stocks:symbol,
>> > timestamp=1376567559431,
>> > > > > >> value=QXM
>> > > > > >>
>> > > > > >> *code:*(please ignores the lines that are commenting)
>> > > > > >>
>> > > > > >>
>> > > > > >> > package com.maddy;
>> > > > > >> >
>> > > > > >> > import java.io.IOException;
>> > > > > >> >
>> > > > > >> > import org.apache.hadoop.conf.Configuration;
>> > > > > >> > import org.apache.hadoop.fs.Path;
>> > > > > >> > import org.apache.hadoop.hbase.HBaseConfiguration;
>> > > > > >> > import org.apache.hadoop.hbase.client.Put;
>> > > > > >> > import org.apache.hadoop.hbase.client.Result;
>> > > > > >> > import org.apache.hadoop.hbase.client.Scan;
>> > > > > >> > import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
>> > > > > >> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableMapper;
>> > > > > >> > import org.apache.hadoop.hbase.mapreduce.TableReducer;
>> > > > > >> > import org.apache.hadoop.hbase.util.Bytes;
>> > > > > >> > //import org.apache.hadoop.io.DoubleWritable;
>> > > > > >> > import org.apache.hadoop.io.FloatWritable;
>> > > > > >> > import org.apache.hadoop.mapreduce.Job;
>> > > > > >> > import
>> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > public class openaveragestock
>> > > > > >> > {
>> > > > > >> > public static class map extends
>> > > > > >> > TableMapper<ImmutableBytesWritable,FloatWritable>
>> > > > > >> > {
>> > > > > >> > @Override
>> > > > > >> >
>> > > > > >> > public void map(ImmutableBytesWritable row,Result
>> > > > > value,Context
>> > > > > >> > context) throws IOException
>> > > > > >> > {
>> > > > > >> >
>> > > > > >> > byte[]
>> > > > > >> > val=(value.getValue("stocks".getBytes(),"open".getBytes()));
>> > > > > >> > //byte[]
>> > > > > >> >
>> val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > ImmutableBytesWritable stock_symbol=new
>> > > > > >> > ImmutableBytesWritable("symbol".getBytes());
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > try
>> > > > > >> > {
>> > > > > >> > context.write(stock_symbol,new
>> > > > > >> > FloatWritable(Bytes.toFloat(val)));
>> > > > > >> > }
>> > > > > >> > catch(InterruptedException e)
>> > > > > >> >
>> > > > > >> > {
>> > > > > >> > throw new IOException(e);
>> > > > > >> > }
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > }
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > }
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > public static class reduce extends
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > >
>> TableReducer<ImmutableBytesWritable,FloatWritable,ImmutableBytesWritable>
>> > > > > >> > {
>> > > > > >> >
>> > > > > >> > @Override
>> > > > > >> > public void reduce(ImmutableBytesWritable
>> > > > > >> > key,Iterable<FloatWritable>values,Context context) throws
>> > > > IOException,
>> > > > > >> > InterruptedException
>> > > > > >> > {
>> > > > > >> > float sum=0;
>> > > > > >> > int count=0;
>> > > > > >> > // float average=0;
>> > > > > >> > for(FloatWritable val:values)
>> > > > > >> > {
>> > > > > >> > sum+=val.get();
>> > > > > >> > count++;
>> > > > > >> > }
>> > > > > >> > //average=(sum/count);
>> > > > > >> > Put put=new Put(key.get());
>> > > > > >> >
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(sum/count));
>> > > > > >> > System.out.println("For\t"+count+"\t average
>> > > > > >> is:"+(sum/count));
>> > > > > >> > context.write(key,put);
>> > > > > >> >
>> > > > > >> > }
>> > > > > >> >
>> > > > > >> > }
>> > > > > >> >
>> > > > > >> > public static void main(String args[]) throws
>> IOException,
>> > > > > >> > ClassNotFoundException, InterruptedException
>> > > > > >> > {
>> > > > > >> > Configuration config=HBaseConfiguration.create();
>> > > > > >> > config.addResource("/home/manish/workspace/hbase
>> > > > > >> > project/bin/hbase-site.xml");
>> > > > > >> > Job job=new Job(config,"openstockaverage1");
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > Scan scan=new Scan();
>> > > > > >> > scan.addFamily("stocks".getBytes());
>> > > > > >> > scan.setFilter(new FirstKeyOnlyFilter());
>> > > > > >> >
>> > > > > >> > TableMapReduceUtil.initTableMapperJob("nyse4",
>> > > > > >> > scan,
>> > > > > >> > map.class,
>> > > > > >> > ImmutableBytesWritable.class,
>> > > > > >> > FloatWritable.class,
>> > > > > >> > job);
>> > > > > >> >
>> > > > > >> > TableMapReduceUtil.initTableReducerJob("nyse5",
>> > > > > >> > reduce.class,
>> > > > > >> > job);
>> > > > > >> > //job.setReducerClass(reduce.class);
>> > > > > >> >
>> > > > > >> > FileOutputFormat.setOutputPath(job, new Path(
>> > > > > >> >
>> > > > > >> "hdfs://localhost:54310/user/manish/edurekahbasehadoop1"));
>> > > > > >> > job.waitForCompletion(true);
>> > > > > >> > }
>> > > > > >> >
>> > > > > >> > }
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > *===>Got stuck into error:*
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > 13/08/16 03:21:45 INFO mapred.JobClient: Running job:
>> > > job_local_0001
>> > > > > >> > 13/08/16 03:21:46 INFO mapred.JobClient: map 0% reduce 0%
>> > > > > >> > 13/08/16 03:21:46 INFO mapreduce.TableOutputFormat: Created
>> > table
>> > > > > >> instance
>> > > > > >> > for nyse5
>> > > > > >> > 13/08/16 03:21:46 INFO util.ProcessTree: setsid exited with
>> exit
>> > > > code
>> > > > > 0
>> > > > > >> > 13/08/16 03:21:47 INFO mapred.Task: Using
>> > > ResourceCalculatorPlugin
>> > > > :
>> > > > > >> > org.apache.hadoop.util.LinuxResourceCalculatorPlugin@50b77c
>> > > > > >> > 13/08/16 03:21:47 INFO mapred.MapTask: io.sort.mb = 100
>> > > > > >> > 13/08/16 03:21:53 INFO mapred.MapTask: data buffer =
>> > > > 79691776/99614720
>> > > > > >> > 13/08/16 03:21:53 INFO mapred.MapTask: record buffer =
>> > > 262144/327680
>> > > > > >> > 13/08/16 03:21:54 WARN mapred.LocalJobRunner: job_local_0001
>> > > > > >> > java.lang.IllegalArgumentException: offset (0) + length (4)
>> > exceed
>> > > > the
>> > > > > >> > capacity of the array: 3
>> > > > > >> > at
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:543)
>> > > > > >> > at
>> org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:690)
>> > > > > >> > at
>> > org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:584)
>> > > > > >> > at
>> > org.apache.hadoop.hbase.util.Bytes.toFloat(Bytes.java:574)
>> > > > > >> > at
>> > > com.maddy.openaveragestock$map.map(openaveragestock.java:41)
>> > > > > >> > at
>> > com.maddy.openaveragestock$map.map(openaveragestock.java:1)
>> > > > > >> > at
>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>> > > > > >> > at
>> > > > org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
>> > > > > >> > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
>> > > > > >> > at
>> > > > > >> >
>> > > > >
>> > >
>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
>> > > > > >> >
>> > > > > >> >
>> > > > > >> I cannot find where is it fail??
>> > > > > >> Can you please tell me??
>> > > > > >> where i was wrong..?
>> > > > > >>
>> > > > > >>
>> > > > > >> Your help will be appreciated.
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >> --
>> > > > > >> Regards
>> > > > > >>
>> > > > > >> *Manish Dunani*
>> > > > > >> *Contact No* : +91 9408329137
>> > > > > >> *skype id* : manish.dunani*
>> > > > > >> *
>> > > > > >>
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > Regards
>> > > >
>> > > > *Manish Dunani*
>> > > > *Contact No* : +91 9408329137
>> > > > *skype id* : manish.dunani*
>> > > > *
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > Regards
>> >
>> > *Manish Dunani*
>> > *Contact No* : +91 9408329137
>> > *skype id* : manish.dunani*
>> > *
>> >
>>
>
>
>
> --
> Regards
>
> *Manish Dunani*
> *Contact No* : +91 9408329137
> *skype id* : manish.dunani*
> *
>
>
>
--
Regards
*Manish Dunani*
*Contact No* : +91 9408329137
*skype id* : manish.dunani*
*