import java.util.List;

import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.NewHadoopRDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import com.google.common.collect.Lists;

import scala.Function1;
import scala.collection.JavaConverters.*;
import scala.reflect.ClassTag;

public class SparkHBaseMain {

	
	public static void main(String[] arg){
		
		try{
			
			List<String> jars = Lists.newArrayList("/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.2.0.jar",
					"/home/akhld/Downloads/hbase-server-0.96.0-hadoop2.jar",
					"/home/akhld/Downloads/hbase-protocol-0.96.0-hadoop2.jar",
					"/home/akhld/Downloads/hbase-hadoop2-compat-0.96.0-hadoop2.jar",
					"/home/akhld/Downloads/hbase-common-0.96.0-hadoop2.jar",
					"/home/akhld/Downloads/hbase-client-0.96.0-hadoop2.jar",
					"/home/akhld/Downloads/htrace-core-2.02.jar");

			SparkConf spconf = new SparkConf();
			spconf.setMaster("local");
			spconf.setAppName("HBaser");
			spconf.setSparkHome("/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2");
			spconf.setJars(jars.toArray(new String[jars.size()]));
			spconf.set("spark.executor.memory", "1g");

			final JavaSparkContext sc = new JavaSparkContext(spconf);
						
			org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
			conf.addResource("/home/akhld/mobi/temp/sung/hbase-site.xml");
			conf.set(TableInputFormat.INPUT_TABLE, "blogposts");
						
			NewHadoopRDD<ImmutableBytesWritable, Result> rdd = new NewHadoopRDD<ImmutableBytesWritable, Result>(JavaSparkContext.toSparkContext(sc), TableInputFormat.class, ImmutableBytesWritable.class, Result.class, conf);
			
			/*import scala.collection.JavaConverters._
			rdd
			  .map(tuple => tuple._2)
			  .map(result => result.getColumn("columnFamily".getBytes(), "columnQualifier".getBytes()))
			  .map(keyValues => {
			  keyValues.asScala.reduceLeft {
			    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
			  }.getValue
			})*/
						
			System.out.println(rdd.count());	
						
			
		}catch(Exception e){
			
			e.printStackTrace();
			System.out.println("Craaaashed : " + e);
			
		}
		
		
		
	}
}
