import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hive.ql.io.orc.*;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;

public class OrcReader{

  private static Configuration conf = new Configuration();
  public static Reader reader;
  public static RecordReader rr;

  public static class OrcRow {
    public Object[] columns ;

    OrcRow (int colCount) {
      columns = new Object[colCount] ;
    }

    void setFieldValue(int FieldIndex,Object value) {
      columns[FieldIndex] = value ;
    }

    void setNumFields(int newSize) {
      if (newSize != columns.length) {
        Object[] oldColumns = columns;
        columns = new Object[newSize];
        System.arraycopy(oldColumns, 0, columns, 0, oldColumns.length);
      }
    }
	Object getFieldValue(int FieldIndex) {
	return columns[FieldIndex] ;
	}

  }
    static class OrcField implements StructField {
    private final String name;
    private final ObjectInspector inspector;
    private final int offset;

    OrcField(String name, ObjectInspector inspector, int offset) {
      this.name = name;
      this.inspector = inspector;
      this.offset = offset;
    }

    @Override
    public String getFieldName() {
      return name;
    }

    @Override
    public ObjectInspector getFieldObjectInspector() {
      return inspector;
    }

    @Override
    public int getFieldID() {
      return offset;
    }

    @Override
    public String getFieldComment() {
      return null;
    }
  }

    static class OrcRowInspector extends SettableStructObjectInspector {
    private List<StructField> fields;

    public OrcRowInspector(StructField... fields) {
      super();
      this.fields = Arrays.asList(fields);
    }

    @Override
    public List<StructField> getAllStructFieldRefs() {
      return fields;
    }

    @Override
    public StructField getStructFieldRef(String s) {
      for(StructField field: fields) {
        if (field.getFieldName().equalsIgnoreCase(s)) {
          return field;
        }
      }
      return null;
    }

    @Override
    public Object getStructFieldData(Object object, StructField field) {
      if (object == null) {
        return null;
      }
      int offset = ((OrcField) field).offset;
      OrcRow struct = (OrcRow) object;
      if (offset >= struct.columns.length) {
        return null;
      }

      return struct.columns[offset];
    }

    @Override
    public List<Object> getStructFieldsDataAsList(Object object) {
      if (object == null) {
        return null;
      }
      OrcRow struct = (OrcRow) object;
      List<Object> result = new ArrayList<Object>(struct.columns.length);
      for (Object child: struct.columns) {
        result.add(child);
      }
      return result;
    }

    @Override
    public String getTypeName() {
      StringBuilder buffer = new StringBuilder();
      buffer.append("struct<");
      for(int i=0; i < fields.size(); ++i) {
        StructField field = fields.get(i);
        if (i != 0) {
          buffer.append(",");
        }
        buffer.append(field.getFieldName());
        buffer.append(":");
        buffer.append(field.getFieldObjectInspector().getTypeName());
      }
      buffer.append(">");
      return buffer.toString();
    }

    @Override
    public Category getCategory() {
      return Category.STRUCT;
    }

    @Override
    public Object create() {
      return new OrcRow(0);
    }

    @Override
    public Object setStructFieldData(Object struct, StructField field,
                                     Object fieldValue) {
      OrcRow orcStruct = (OrcRow) struct;
      int offset = ((OrcField) field).offset;

      // if the offset is bigger than our current number of fields, grow it
      if (orcStruct.columns.length <= offset) {
        orcStruct.setNumFields(offset+1);
      }
      orcStruct.setFieldValue(offset, fieldValue);
      return struct;
    }

    @Override
    public boolean equals(Object o) {
      if (o == null || o.getClass() != getClass()) {
        return false;
      } else if (o == this) {
        return true;
      } else {
        List<StructField> other = ((OrcRowInspector) o).fields;
        if (other.size() != fields.size()) {
          return false;
        }
        for(int i = 0; i < fields.size(); ++i) {
          StructField left = other.get(i);
          StructField right = fields.get(i);
          if (!(left.getFieldName().equalsIgnoreCase(right.getFieldName()) &&
                left.getFieldObjectInspector()
                  .equals(right.getFieldObjectInspector()))) {
            return false;
          }
        }
        return true;
      }
    }

  }
    public static void main(String[] args) throws IOException,
                                                InterruptedException,
                                                ClassNotFoundException {

    String path = "/tmp/file3.orc";

    try 
    {

      conf = new Configuration();
      FileSystem fs = FileSystem.getLocal(conf);
	
	reader = OrcFile.createReader(new Path(path),
                                    OrcFile.readerOptions(conf).filesystem(fs)) ;

	OrcRowInspector readerInspector = (OrcRowInspector) reader.getObjectInspector() ;
	List<StructField> fields = readerInspector.getAllStructFieldRefs(); 
	Reader.Options ropts = new Reader.Options().range(0L, Long.MAX_VALUE).include(new boolean[]{true,true,true}) ;
	RecordReader rr = reader.rowsOptions(ropts) ;

	// Read the first row
	rr.seekToRow(1) ;
	OrcRow orw = (OrcRow) rr.next(null) ;

	IntObjectInspector intobj = (IntObjectInspector) readerInspector.getStructFieldRef("field1").getFieldObjectInspector(); 
	int a = intobj.get(readerInspector.getStructFieldData(orw, fields.get(0))) ;

	System.out.println("field1= "+ a) ;
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
