package com.test;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PUnsignedLong;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;

import net.juniper.cs.column.types.GenericPDatum;

public class PhoenixUtils {
	
	public final static String TABLE_NAME = "TEST_TABLE"; 
	/*
	*  create table schema 
	*/
	public static RowKeySchema getSchema(String tableName){
		if (TABLE_NAME.equalsIgnoreCase(tableName)){
			RowKeySchemaBuilder builder = new RowKeySchemaBuilder(7);
	    	builder.addField(GenericPDatum.pDatum(PVarchar.INSTANCE), false,  SortOrder.getDefault());
	    	builder.addField(GenericPDatum.pDatum(PVarchar.INSTANCE), false,  SortOrder.getDefault());
	    	builder.addField(GenericPDatum.pDatum(PVarchar.INSTANCE), false,  SortOrder.getDefault());
	    	builder.addField(GenericPDatum.pDatum(PVarchar.INSTANCE), false,  SortOrder.getDefault());
	    	builder.addField(GenericPDatum.pDatum(PVarchar.INSTANCE).nullble(true), true,  SortOrder.getDefault());
	    	builder.addField(GenericPDatum.pDatum(PUnsignedLong.INSTANCE), false,  SortOrder.getDefault());
	    	builder.addField(GenericPDatum.pDatum(PVarchar.INSTANCE), false,  SortOrder.getDefault());
	    	builder.rowKeyOrderOptimizable(false);
	    	return builder.build();
		}
		
		return null;
	}

	/**
	 * @param tableName
	 * @param rowKey
	 * @return
	 */
	public static Object[] getColumnValues(String tableName, byte[] rowKey){
		RowKeySchema schema = getSchema(tableName);
		if (null != schema){
			ImmutableBytesWritable ptr = new ImmutableBytesWritable();
	    	// initialize the ptr with row information
	    	int maxOffset = schema.iterator(rowKey, 0, rowKey.length, ptr);
	    	Object[] values = new Object[schema.getFieldCount()];
	    	for (int i = 0; i < schema.getFieldCount(); i++) {
	    		 Boolean hasValue = schema.next(ptr, i, maxOffset);
	    		 Field field = schema.getField(i);
	    		 PDataType type = field.getDataType();
	             SortOrder sortOrder = field.getSortOrder();
	             Object value = type.toObject(ptr, type, sortOrder);
	             values[i] = value;
	    	}
	    	return values;
		}
		return null;
	}
}
