package net.test.cs;

import java.io.IOException;
import java.net.URL;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.PhoenixInputSplit;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.query.KeyRange;

import com.google.common.collect.Lists;

public class Load {
	
	public static void main(String[] args) {
		try {
			Configuration configuration = HBaseConfiguration.create();
			configuration.set("hbase.zookeeper.quorum", "192.168.56.101");
			configuration.set("hbase.zookeeper.property.clientPort", "2181");
			PhoenixConfigurationUtil.setInputTableName(configuration, "TEST");
			PhoenixConfigurationUtil.setInputQuery(configuration, "SELECT TYPE, ID FROM TEST where type='test'");		
			PhoenixConfigurationUtil.setSchemaType(configuration, PhoenixConfigurationUtil.SchemaType.QUERY);
			
			final Connection connection = ConnectionUtil.getInputConnection(configuration, new Properties());
	        final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);	
	         
	        final Statement statement = connection.createStatement();
	        final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
	            // Optimize the query plan so that we potentially use secondary indexes            
	        final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
	            // Initialize the query plan so it sets up the parallel scans
	        queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
	         
	        final List<KeyRange> allSplits = queryPlan.getSplits();
	        List<InputSplit> splits = generateSplits(queryPlan,allSplits);
	        System.out.println("split size  - " + splits.size());
	        List<IgniteCallable<Boolean>> calls = new ArrayList<IgniteCallable<Boolean>>();
	        // broad cast all input splits.
	        for (InputSplit ip : splits){
	        	System.out.println("Start row - " +Bytes.toString(((PhoenixInputSplit)ip).getKeyRange().getLowerRange()));
	        	System.out.println("Stop row - " +Bytes.toString(((PhoenixInputSplit)ip).getKeyRange().getUpperRange()));
	        	calls.add(new IgniteCallable<Boolean>() {
	        		
					@Override
					public Boolean call() {
						try {
							final PhoenixInputSplit pSplit = (PhoenixInputSplit)ip;
					        final List<Scan> scans = pSplit.getScans();
					        
					        List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
				            StatementContext ctx = queryPlan.getContext();
				            ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
				            String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
				            for (Scan scan : scans) {
				                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(),
				                        queryPlan.getTableRef(), scan, readMetrics.allotMetric(MetricType.SCAN_BYTES, tableName));
				                PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
				                iterators.add(peekingResultIterator);
				            }
				            // get the iterator
				            ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators);
				            if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
				                iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
				            }
				           
				            PhoenixResultSet resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
				            while(resultSet.next()){
				            	System.out.println(Thread.currentThread().getName() + " - type " + resultSet.getString("TYPE") + " id - "+ resultSet.getString("ID"));
				            }
				            
				            return true;
						}catch (Exception ex){
							return false;
						}
//			            resultIterator = iterator;
			            // Clone the row projector as it's not thread safe and would be used simultaneously by
			            // multiple threads otherwise.
//			            resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
				        
					}
				});
	        	
	        	
	        }
	        
	        URL resource = CacheManager.class.getResource("/ignite.xml");
			Ignition.start(resource);
			Ignite ignite = Ignition.ignite("my-grid");
			ignite.compute().call(calls);
		}catch (Exception ex){
			
		}
	}
	
	private static List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {	       
        final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
        for (List<Scan> scans : qplan.getScans()) {
            psplits.add(new PhoenixInputSplit(scans));
        }
        return psplits;
    }

}
