package net.juniper.cs;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.PhoenixInputSplit;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.ReadMetricQueue;

import com.google.common.collect.Lists;

public class IgniteRegionLoader<Boolean> implements IgniteCallable<Boolean>, Externalizable{

	private QueryPlan queryPlan ; 
	private PhoenixConnection connection;
	private PhoenixInputSplit inputsplit;
	private String tableName;
	
	private Scan scan;
	private Configuration conf;
	
	
	/*public IgniteRegionLoader(Configuration conf, String tableName, Scan scan){
		this.connection = connection;
		this.inputsplit = ip;
		
		this.conf = conf;
		this.tableName = tableName;
		this.scan = scan;
	}*/
	
	public IgniteRegionLoader(){
		
	}
	public IgniteRegionLoader(QueryPlan queryPlan, String tableName, PhoenixInputSplit ip){		
		this.inputsplit = ip;
		this.queryPlan = queryPlan;
		this.tableName = tableName;
	}
	
	@Override
	public Boolean call() throws Exception {
		final PhoenixInputSplit pSplit = (PhoenixInputSplit)inputsplit;
        final List<Scan> scans = pSplit.getScans();
        
        List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
        StatementContext ctx = queryPlan.getContext();
        ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
        String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
        for (Scan scan : scans) {
            final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(),
                    queryPlan.getTableRef(), scan, readMetrics.allotMetric(MetricType.SCAN_BYTES, tableName));
            PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
            iterators.add(peekingResultIterator);
        }
        ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators);
        if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
            iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
        }
       
        PhoenixResultSet resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
        while(resultSet.next()){
        	System.out.println(Thread.currentThread().getName() + " - Edge type " + resultSet.getString("EDGETYPE") + " srcvertexid - "+ resultSet.getString("SRCVERTEXID"));
        }
        
        return null;
	}

	@Override
	public void writeExternal(ObjectOutput out) throws IOException {
		out.writeObject(queryPlan);
		out.writeObject(tableName);
		out.writeObject(inputsplit);
		
	}

	@Override
	public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
		queryPlan = (QueryPlan)in.readObject();
		tableName = (String) tableName;
		inputsplit = (PhoenixInputSplit) inputsplit;
		
	}

}
