*I want my desire output likewise!!!!!!!!*
ROW CELL+COLUMN
QXM column=stocks_output:average,
timestamp=XXXXXXXXXX, val
ue=XXXX
QTM column=stocks_output:average,
timestamp=XXXXXXXXXX, val
ue=XXXX
*sample dataset in hbase::**(table name:nyse4)*
2010-02-04 column=stocks:open, timestamp=1376567559424,
value=2.5
2010-02-04 column=stocks:symbol, timestamp=1376567559424,
value=QXM
2010-02-05 column=stocks:open, timestamp=1376567559429,
value=2.42
2010-02-05 column=stocks:symbol, timestamp=1376567559429,
value=QXM
===>>In my previous output i didn't get any symbol(qulifier)'s values in my
table as Row key..
hbase(main):004:0> scan 'nyse5'
ROW COLUMN+CELL
symbol column=stocks_output:average, timestamp=1376749641978,
val
ue=@\xC6o\x11
*So,that i changed my programme like wise::*
package com.maddy;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
//import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class openaveragestock
{
public static class map extends TableMapper<Text,FloatWritable>
{
private static String col_family="stocks";
private static String qul="open";
private static String col_family1="stocks";
private static String qul1="symbol";
private static byte[] colfamily2=Bytes.toBytes(col_family);
private static byte[] qul2=Bytes.toBytes(qul);
private static byte[] colfamily3=Bytes.toBytes(col_family1);
private static byte[] qul3=Bytes.toBytes(qul1);
// public static float toFloat(int qul2)
// {
// return Float.intBitsToFloat((qul2));
//
// }
//
private static Text k1=new Text();
public void map(ImmutableBytesWritable row,Result value,Context
context) throws IOException
{
//byte[]
val1=(value.getValue("stocks".getBytes(),"symbol".getBytes()));
//String
k=Bytes.toString(value.getValue(Bytes.toBytes("stocks"),Bytes.toBytes("symbol")));
byte[] val=value.getValue(colfamily2,qul2);
String k=Bytes.toString(value.getValue(colfamily3,qul3));
//ImmutableBytesWritable stock_symbol=new
ImmutableBytesWritable(qul3);
k1.set(k);
try
{
context.write(k1,new
FloatWritable(Float.parseFloat(Bytes.toString(val))));
}
catch(InterruptedException e)
{
throw new IOException(e);
}
}
}
public static class Reduce extends Reducer<Text,FloatWritable,
Text,FloatWritable>
{
public void reduce(Text key,Iterable<FloatWritable>values,Context
context) throws IOException, InterruptedException
{
float sum=0;
int count=0;
float average=0;
for(FloatWritable val:values)
{
sum+=val.get();
count++;
}
average=(sum/count);
// Put put=new Put(key.getBytes());
//
put.add(Bytes.toBytes("stocks_output"),Bytes.toBytes("average"),Bytes.toBytes(average));
System.out.println("For\t"+count+"\t average is:"+average);
context.write(key,new FloatWritable(average));
}
}
public static void main(String args[]) throws IOException,
ClassNotFoundException, InterruptedException
{
Configuration config=HBaseConfiguration.create();
config.addResource("/home/manish/workspace/hbase
project/bin/hbase-site.xml");
Job job=new Job(config,"openstockaverage1");
Scan scan=new Scan();
scan.addFamily("stocks".getBytes());
scan.setFilter(new FirstKeyOnlyFilter());
TableMapReduceUtil.initTableMapperJob("nyse4",
scan,
map.class,
ImmutableBytesWritable.class,
FloatWritable.class,
job);
// TableMapReduceUtil.initTableReducerJob("nyse5",
// Reduce.class,
// job);
job.setReducerClass(Reduce.class);
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://localhost:54310/user/manish/Full_final_output_5"));
job.waitForCompletion(true);
}
}
*It throws the error::*
13/08/17 19:37:59 INFO mapred.JobClient: Running job: job_local_0001
13/08/17 19:37:59 INFO util.ProcessTree: setsid exited with exit code 0
13/08/17 19:37:59 INFO mapred.Task: Using ResourceCalculatorPlugin :
org.apache.hadoop.util.
LinuxResourceCalculatorPlugin@bd96dd
13/08/17 19:37:59 INFO mapred.MapTask: io.sort.mb = 100
13/08/17 19:38:00 INFO mapred.JobClient: map 0% reduce 0%
13/08/17 19:38:00 INFO mapred.MapTask: data buffer = 79691776/99614720
13/08/17 19:38:00 INFO mapred.MapTask: record buffer = 262144/327680
13/08/17 19:38:00 WARN mapred.LocalJobRunner: job_local_0001
java.lang.NullPointerException
at org.apache.hadoop.io.Text.encode(Text.java:388)
at org.apache.hadoop.io.Text.set(Text.java:178)
at com.maddy.openaveragestock$map.map(openaveragestock.java:59)
at com.maddy.openaveragestock$map.map(openaveragestock.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
13/08/17 19:38:01 INFO mapred.JobClient: Job complete: job_local_0001
13/08/17 19:38:01 INFO mapred.JobClient: Counters: 0
==>>Can anyone please help me out from this why it is happen??
--
Regards
*Manish Dunani*
*Contact No* : +91 9408329137
*skype id* : manish.dunani*
*