

import java.io.IOException;
import java.text.ParseException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.joda.time.DateTime;






public class HbaseTest {
	public static HTablePool pool;

	public static class TestMapper extends
			TableMapper<Text, Text> {
		private Text maprow = new Text();
		private Text mapout = new Text();
	
		public void map(ImmutableBytesWritable row, Result value,
				Context context) throws IOException, InterruptedException {
			String date  = null;
			String pid  = null;
			String did = null;
			String v = null;
			boolean flag = false;
			
			byte[] btime = value.getValue(Bytes.toBytes("f"),
					Bytes.toBytes("time"));
			byte[] bproductid = value.getValue(Bytes.toBytes("f"),
					Bytes.toBytes("productid"));
			byte[] bversion = value.getValue(Bytes.toBytes("f"),
					Bytes.toBytes("version"));
		
			byte[] bdevice = value.getValue(Bytes.toBytes("f"),
					Bytes.toBytes("device"));
		
			if(bproductid != null)
			{
				pid = new String(bproductid);
		
			}
			else
			{
				flag = true;
			}
			if(btime != null)
			{
				date = new String(btime);
			
			}
			else
			{
				flag = true;
			}
			if(bdevice != null)
			{
				did = new String(bdevice);
			
			}
			else
			{
				flag = true;
			}
			if(bversion != null)
			{
				v = new String(bversion);
		
			}
			else
			{
				flag = true;
			}
			
			if(!flag)
			{
//				maprow.set(pid);
//		
//				mapout.set(date + "-" + v + "-" + did);
//			
//				context.write(maprow, mapout);
			}		
		}
	}
	
	public static class TestReducer extends TableReducer<Text,Text,ImmutableBytesWritable>
	{
		public void reduce(Text key,Iterable<Text> values,Context context)
		{
			String reducekey = key.toString();
			String date = null;
			String version = null;
			String device = null;
			String []list = null;
			String mapout = null;		
			
			for(Text val : values)
			{
				mapout = val.toString();
				list = mapout.split("-");
				date = list[0];
				version = list[1];
				device = list[2];
			}
			
			String rowkey = reducekey + "-" + version;
			Put put = new Put(Bytes.toBytes(rowkey));
			put.add(Bytes.toBytes("f"),Bytes.toBytes("date"),Bytes.toBytes(date));
			put.add(Bytes.toBytes("f"),Bytes.toBytes("deviceid"),Bytes.toBytes(device));
			try {
				context.write(null, put);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
//			
		}
	}
	public static boolean run_Test( Scan scan) {
		
		Configuration config = Config.GetConfig();
		boolean b=false;
		try {
			
			Job job = new Job(config, "Test2");
			job.setJarByClass(HbaseTest.class);
			pool = new HTablePool(config, Integer.MAX_VALUE);
			TableMapReduceUtil.initTableMapperJob("error", // input table
					scan, // Scan instance to control CF and attribute selection
					TestMapper.class, // mapper class
					Text.class, // mapper output key
					Text.class, // mapper output value
					job);

			TableMapReduceUtil.initTableReducerJob("test2", // output table
					TestReducer.class, // reducer class
					job);
			job.setNumReduceTasks(1); // at least one, adjust as required

		    b = job.waitForCompletion(true);
			pool.close();
			if (!b) {
				throw new IOException("error with job!");
			}
		} catch (IOException e) {

			e.printStackTrace();
		} catch (InterruptedException e) {

			e.printStackTrace();
		} catch (ClassNotFoundException e) {

			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {

		}
		return b;
	}

	public static void main(String[] args) throws Exception {
	
		Scan scan = new Scan();
		scan.setCaching(20000);
		scan.setCacheBlocks(false);

//		String starthour = Formatter.startHour(args[0]);
//		String endhour = Formatter.endHour(args[0]);
//		scan.setStartRow(Bytes.toBytes(Formatter.StartRowLong(starthour)));
//		scan.setStopRow(Bytes.toBytes(Formatter.EndRowLong(endhour)));
		
		run_Test(scan);
	}



}
