HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley,
reviewed by Matt McCline and Prasanth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ac424f0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ac424f0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ac424f0

Branch: refs/heads/llap
Commit: 0ac424f0a17b341efe299da167791112e4a953e9
Parents: bf44f3e
Author: Owen O'Malley <omal...@apache.org>
Authored: Tue Apr 26 08:15:00 2016 -0700
Committer: Owen O'Malley <omal...@apache.org>
Committed: Tue Apr 26 08:17:24 2016 -0700

----------------------------------------------------------------------
 itests/qtest/pom.xml                            |    2 +-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   46 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |   62 +-
 .../llap/io/metadata/OrcStripeMetadata.java     |    6 +-
 .../TestIncrementalObjectSizeEstimator.java     |   31 +-
 orc/src/java/org/apache/orc/DataReader.java     |   24 +-
 .../java/org/apache/orc/DataReaderFactory.java  |    9 -
 .../org/apache/orc/MetadataReaderFactory.java   |   12 -
 orc/src/java/org/apache/orc/OrcUtils.java       |   78 +
 orc/src/java/org/apache/orc/Reader.java         |   29 +-
 orc/src/java/org/apache/orc/RecordReader.java   |    8 +-
 .../java/org/apache/orc/TypeDescription.java    |   62 +-
 .../org/apache/orc/impl/BitFieldReader.java     |    5 +-
 .../apache/orc/impl/DataReaderProperties.java   |   41 +-
 .../orc/impl/DefaultMetadataReaderFactory.java  |   14 -
 orc/src/java/org/apache/orc/impl/InStream.java  |    4 +-
 .../java/org/apache/orc/impl/IntegerReader.java |   26 +-
 .../org/apache/orc/impl/MetadataReader.java     |   33 -
 .../org/apache/orc/impl/MetadataReaderImpl.java |  120 --
 .../orc/impl/MetadataReaderProperties.java      |   96 -
 .../apache/orc/impl/RunLengthByteReader.java    |   36 +-
 .../apache/orc/impl/RunLengthIntegerReader.java |   31 +-
 .../orc/impl/RunLengthIntegerReaderV2.java      |   33 +-
 .../java/org/apache/orc/impl/WriterImpl.java    |   47 +-
 .../orc/impl/TestDataReaderProperties.java      |   12 +-
 .../orc/impl/TestMetadataReaderProperties.java  |   72 -
 .../ql/exec/vector/VectorizedRowBatchCtx.java   |   13 +-
 .../ql/io/orc/DefaultDataReaderFactory.java     |   14 -
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   43 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |    3 +-
 .../hadoop/hive/ql/io/orc/ReaderImpl.java       |   65 +-
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java |  231 +--
 .../hive/ql/io/orc/RecordReaderUtils.java       |  127 +-
 .../hadoop/hive/ql/io/orc/SchemaEvolution.java  |  234 +--
 .../hive/ql/io/orc/TreeReaderFactory.java       |  838 +++++----
 .../ql/io/orc/VectorizedOrcInputFormat.java     |   32 +-
 .../hadoop/hive/ql/io/orc/WriterImpl.java       |    2 -
 .../hive/ql/io/orc/TestRecordReaderImpl.java    |   55 +-
 .../hive/ql/io/orc/TestTypeDescription.java     |    4 +-
 .../hive/ql/io/orc/TestVectorOrcFile.java       | 1647 +++++++++---------
 .../hive/ql/io/orc/TestVectorizedORCReader.java |    7 +-
 .../hive/ql/exec/vector/BytesColumnVector.java  |   13 +-
 .../ql/exec/vector/DecimalColumnVector.java     |    2 +-
 .../hive/ql/exec/vector/DoubleColumnVector.java |    2 +-
 .../hive/ql/exec/vector/LongColumnVector.java   |    2 +-
 .../ql/exec/vector/MultiValuedColumnVector.java |    2 +-
 .../ql/exec/vector/TimestampColumnVector.java   |    2 +-
 .../hive/ql/exec/vector/UnionColumnVector.java  |    2 -
 48 files changed, 2133 insertions(+), 2146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/itests/qtest/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml
index b042774..a479557 100644
--- a/itests/qtest/pom.xml
+++ b/itests/qtest/pom.xml
@@ -431,7 +431,7 @@
                   
templatePath="${basedir}/${hive.path.to.root}/ql/src/test/templates/" 
template="TestCliDriver.vm"
                   
queryDirectory="${basedir}/${hive.path.to.root}/ql/src/test/queries/clientpositive/"
                   queryFile="${qfile}"
-                  
excludeQueryFile="${minillap.query.files},${minimr.query.files},${minitez.query.files},${encrypted.query.files},${spark.only.query.files},${disabled.query.files},regexp_extract.q"
+                  
excludeQueryFile="${minillap.query.files},${minimr.query.files},${minitez.query.files},${encrypted.query.files},${spark.only.query.files},${disabled.query.files}"
                   queryFileRegex="${qfile_regex}"
                   clusterMode="${clustermode}"
                   runDisabled="${run_disabled}"

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 0651557..a689f10 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.llap.io.decode;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import 
org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
@@ -27,7 +28,12 @@ import 
org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.CompressionCodec;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
@@ -71,6 +77,35 @@ public class OrcEncodedDataConsumer
     stripes[m.getStripeIx()] = m;
   }
 
+  private static ColumnVector createColumn(OrcProto.Type type,
+                                           int batchSize) {
+    switch (type.getKind()) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case DATE:
+        return new LongColumnVector(batchSize);
+      case FLOAT:
+      case DOUBLE:
+        return new DoubleColumnVector(batchSize);
+      case BINARY:
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        return new BytesColumnVector(batchSize);
+      case TIMESTAMP:
+        return new TimestampColumnVector(batchSize);
+      case DECIMAL:
+        return new DecimalColumnVector(batchSize, type.getPrecision(),
+            type.getScale());
+      default:
+        throw new IllegalArgumentException("LLAP does not support " +
+            type.getKind());
+    }
+  }
+
   @Override
   protected void decodeBatch(OrcEncodedColumnBatch batch,
       Consumer<ColumnVectorBatch> downstreamConsumer) {
@@ -112,9 +147,16 @@ public class OrcEncodedDataConsumer
         ColumnVectorBatch cvb = cvbPool.take();
         assert cvb.cols.length == batch.getColumnIxs().length; // Must be 
constant per split.
         cvb.size = batchSize;
-
+        List<OrcProto.Type> types = fileMetadata.getTypes();
+        int[] columnMapping = batch.getColumnIxs();
         for (int idx = 0; idx < batch.getColumnIxs().length; idx++) {
-          cvb.cols[idx] = 
(ColumnVector)columnReaders[idx].nextVector(cvb.cols[idx], batchSize);
+          if (cvb.cols[idx] == null) {
+            // skip over the top level struct, but otherwise assume no complex
+            // types
+            cvb.cols[idx] = createColumn(types.get(columnMapping[idx]), 
batchSize);
+          }
+          cvb.cols[idx].ensureSize(batchSize, false);
+          columnReaders[idx].nextVector(cvb.cols[idx], null, batchSize);
         }
 
         // we are done reading a batch, send it to consumer for processing

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 406f8f6..83011fb 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -27,6 +27,8 @@ import java.util.List;
 
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
+import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.OrcIndex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -58,7 +60,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.DataReader;
-import org.apache.orc.impl.MetadataReader;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
 import org.apache.orc.OrcConf;
@@ -144,8 +145,9 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   // Read state.
   private int stripeIxFrom;
   private OrcFileMetadata fileMetadata;
+  private Path path;
   private Reader orcReader;
-  private MetadataReader metadataReader;
+  private DataReader metadataReader;
   private EncodedReader stripeReader;
   private Object fileKey;
   private FileSystem fs;
@@ -555,13 +557,6 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
    * Closes the stripe readers (on error).
    */
   private void cleanupReaders() {
-    if (metadataReader != null) {
-      try {
-        metadataReader.close();
-      } catch (IOException ex) {
-        // Ignore.
-      }
-    }
     if (stripeReader != null) {
       try {
         stripeReader.close();
@@ -576,7 +571,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
    */
   private void ensureOrcReader() throws IOException {
     if (orcReader != null) return;
-    Path path = split.getPath();
+    path = split.getPath();
     if (fileKey instanceof Long && HiveConf.getBoolVar(conf, 
ConfVars.LLAP_IO_USE_FILEID_PATH)) {
       path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey);
     }
@@ -661,7 +656,16 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     ensureOrcReader();
     if (metadataReader != null) return;
     long startTime = counters.startTimeCounter();
-    metadataReader = orcReader.metadata();
+    boolean useZeroCopy = (conf != null) && 
OrcConf.USE_ZEROCOPY.getBoolean(conf);
+    metadataReader = RecordReaderUtils.createDefaultDataReader(
+        DataReaderProperties.builder()
+        .withBufferSize(orcReader.getCompressionSize())
+        .withCompression(orcReader.getCompressionKind())
+        .withFileSystem(fs)
+        .withPath(path)
+        .withTypeCount(orcReader.getSchema().getMaximumId() + 1)
+        .withZeroCopy(useZeroCopy)
+        .build());
     counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
   }
 
@@ -805,13 +809,13 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   private class DataWrapperForOrc implements DataReader, DataCache {
     private final DataReader orcDataReader;
 
-    public DataWrapperForOrc() {
-      boolean useZeroCopy = (conf != null) && 
OrcConf.USE_ZEROCOPY.getBoolean(conf);
-      if (useZeroCopy && !getAllocator().isDirectAlloc()) {
-        throw new UnsupportedOperationException("Cannot use zero-copy reader 
with non-direct cache "
-            + "buffers; either disable zero-copy or enable direct cache 
allocation");
-      }
-      this.orcDataReader = orcReader.createDefaultDataReader(useZeroCopy);
+    private DataWrapperForOrc(DataWrapperForOrc other) {
+      orcDataReader = other.orcDataReader.clone();
+    }
+
+    public DataWrapperForOrc() throws IOException {
+      ensureMetadataReader();
+      this.orcDataReader = metadataReader.clone();
     }
 
     @Override
@@ -881,10 +885,32 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     }
 
     @Override
+    public DataWrapperForOrc clone() {
+      return new DataWrapperForOrc(this);
+    }
+
+    @Override
     public void open() throws IOException {
       long startTime = counters.startTimeCounter();
       orcDataReader.open();
       counters.recordHdfsTime(startTime);
     }
+
+    @Override
+    public OrcIndex readRowIndex(StripeInformation stripe,
+                                 OrcProto.StripeFooter footer,
+                                 boolean[] included,
+                                 OrcProto.RowIndex[] indexes,
+                                 boolean[] sargColumns,
+                                 OrcProto.BloomFilterIndex[] bloomFilterIndices
+                                 ) throws IOException {
+      return orcDataReader.readRowIndex(stripe, footer, included, indexes,
+          sargColumns, bloomFilterIndices);
+    }
+
+    @Override
+    public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) 
throws IOException {
+      return orcDataReader.readStripeFooter(stripe);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 82187bd..6874586 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -28,9 +28,9 @@ import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
 import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
 import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
+import org.apache.orc.DataReader;
 import org.apache.orc.OrcProto;
 import org.apache.orc.StripeInformation;
-import org.apache.orc.impl.MetadataReader;
 import org.apache.orc.impl.OrcIndex;
 
 public class OrcStripeMetadata extends LlapCacheableBuffer {
@@ -54,7 +54,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
     SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcStripeMetadata.class);
   }
 
-  public OrcStripeMetadata(OrcBatchKey stripeKey, MetadataReader mr, 
StripeInformation stripe,
+  public OrcStripeMetadata(OrcBatchKey stripeKey, DataReader mr, 
StripeInformation stripe,
       boolean[] includes, boolean[] sargColumns) throws IOException {
     this.stripeKey = stripeKey;
     OrcProto.StripeFooter footer = mr.readStripeFooter(stripe);
@@ -95,7 +95,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
     return true;
   }
 
-  public void loadMissingIndexes(MetadataReader mr, StripeInformation stripe, 
boolean[] includes,
+  public void loadMissingIndexes(DataReader mr, StripeInformation stripe, 
boolean[] includes,
       boolean[] sargColumns) throws IOException {
     // TODO: should we save footer to avoid a read here?
     rowIndex = mr.readRowIndex(stripe, null, includes, 
rowIndex.getRowGroupIndex(),

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
index a078f73..183fb1b 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
@@ -21,18 +21,20 @@ import static org.junit.Assert.*;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.orc.DataReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
 import 
org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
-import org.apache.orc.impl.MetadataReader;
 import org.apache.orc.impl.OrcIndex;
 import org.apache.orc.StripeInformation;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
@@ -46,11 +48,16 @@ import com.google.protobuf.CodedOutputStream;
 public class TestIncrementalObjectSizeEstimator {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestIncrementalObjectSizeEstimator.class);
 
-  private static class DummyMetadataReader implements MetadataReader  {
+  private static class DummyMetadataReader implements DataReader {
     public boolean doStreamStep = false;
     public boolean isEmpty;
 
     @Override
+    public void open() throws IOException {
+
+    }
+
+    @Override
     public OrcIndex readRowIndex(StripeInformation stripe,
                               OrcProto.StripeFooter footer,
         boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns,
@@ -118,6 +125,26 @@ public class TestIncrementalObjectSizeEstimator {
     }
 
     @Override
+    public DiskRangeList readFileData(DiskRangeList range, long baseOffset, 
boolean doForceDirect) throws IOException {
+      return null;
+    }
+
+    @Override
+    public boolean isTrackingDiskRanges() {
+      return false;
+    }
+
+    @Override
+    public void releaseBuffer(ByteBuffer toRelease) {
+
+    }
+
+    @Override
+    public DataReader clone() {
+      return null;
+    }
+
+    @Override
     public void close() throws IOException {
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/DataReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/DataReader.java 
b/orc/src/java/org/apache/orc/DataReader.java
index b70f26b..a5dbb76 100644
--- a/orc/src/java/org/apache/orc/DataReader.java
+++ b/orc/src/java/org/apache/orc/DataReader.java
@@ -18,18 +18,27 @@
 
 package org.apache.orc;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.orc.impl.OrcIndex;
 
 /** An abstract data reader that IO formats can use to read bytes from 
underlying storage. */
-public interface DataReader extends Closeable {
+public interface DataReader extends AutoCloseable {
 
   /** Opens the DataReader, making it ready to use. */
   void open() throws IOException;
 
+  OrcIndex readRowIndex(StripeInformation stripe,
+                        OrcProto.StripeFooter footer,
+                        boolean[] included, OrcProto.RowIndex[] indexes,
+                        boolean[] sargColumns,
+                        OrcProto.BloomFilterIndex[] bloomFilterIndices
+                        ) throws IOException;
+
+  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws 
IOException;
+
   /** Reads the data.
    *
    * Note that for the cases such as zero-copy read, caller must release the 
disk ranges
@@ -53,4 +62,15 @@ public interface DataReader extends Closeable {
    * @param toRelease The buffer to release.
    */
   void releaseBuffer(ByteBuffer toRelease);
+
+  /**
+   * Clone the entire state of the DataReader with the assumption that the
+   * clone will be closed at a different time. Thus, any file handles in the
+   * implementation need to be cloned.
+   * @return a new instance
+   */
+  DataReader clone();
+
+  @Override
+  public void close() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/DataReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/DataReaderFactory.java 
b/orc/src/java/org/apache/orc/DataReaderFactory.java
deleted file mode 100644
index ec3a0e9..0000000
--- a/orc/src/java/org/apache/orc/DataReaderFactory.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.orc;
-
-import org.apache.orc.impl.DataReaderProperties;
-
-public interface DataReaderFactory {
-
-  DataReader create(DataReaderProperties properties);
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/MetadataReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/MetadataReaderFactory.java 
b/orc/src/java/org/apache/orc/MetadataReaderFactory.java
deleted file mode 100644
index 64629da..0000000
--- a/orc/src/java/org/apache/orc/MetadataReaderFactory.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.orc;
-
-import org.apache.orc.impl.MetadataReader;
-import org.apache.orc.impl.MetadataReaderProperties;
-
-import java.io.IOException;
-
-public interface MetadataReaderFactory {
-
-  MetadataReader create(MetadataReaderProperties properties) throws 
IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/OrcUtils.java 
b/orc/src/java/org/apache/orc/OrcUtils.java
index 2e93254..5845ba6 100644
--- a/orc/src/java/org/apache/orc/OrcUtils.java
+++ b/orc/src/java/org/apache/orc/OrcUtils.java
@@ -449,4 +449,82 @@ public class OrcUtils {
     return columnId;
   }
 
+  /**
+   * Translate the given rootColumn from the list of types to a 
TypeDescription.
+   * @param types all of the types
+   * @param rootColumn translate this type
+   * @return a new TypeDescription that matches the given rootColumn
+   */
+  public static
+        TypeDescription convertTypeFromProtobuf(List<OrcProto.Type> types,
+                                                int rootColumn) {
+    OrcProto.Type type = types.get(rootColumn);
+    switch (type.getKind()) {
+      case BOOLEAN:
+        return TypeDescription.createBoolean();
+      case BYTE:
+        return TypeDescription.createByte();
+      case SHORT:
+        return TypeDescription.createShort();
+      case INT:
+        return TypeDescription.createInt();
+      case LONG:
+        return TypeDescription.createLong();
+      case FLOAT:
+        return TypeDescription.createFloat();
+      case DOUBLE:
+        return TypeDescription.createDouble();
+      case STRING:
+        return TypeDescription.createString();
+      case CHAR:
+      case VARCHAR: {
+        TypeDescription result = type.getKind() == OrcProto.Type.Kind.CHAR ?
+            TypeDescription.createChar() : TypeDescription.createVarchar();
+        if (type.hasMaximumLength()) {
+          result.withMaxLength(type.getMaximumLength());
+        }
+        return result;
+      }
+      case BINARY:
+        return TypeDescription.createBinary();
+      case TIMESTAMP:
+        return TypeDescription.createTimestamp();
+      case DATE:
+        return TypeDescription.createDate();
+      case DECIMAL: {
+        TypeDescription result = TypeDescription.createDecimal();
+        if (type.hasScale()) {
+          result.withScale(type.getScale());
+        }
+        if (type.hasPrecision()) {
+          result.withPrecision(type.getPrecision());
+        }
+        return result;
+      }
+      case LIST:
+        return TypeDescription.createList(
+            convertTypeFromProtobuf(types, type.getSubtypes(0)));
+      case MAP:
+        return TypeDescription.createMap(
+            convertTypeFromProtobuf(types, type.getSubtypes(0)),
+            convertTypeFromProtobuf(types, type.getSubtypes(1)));
+      case STRUCT: {
+        TypeDescription result = TypeDescription.createStruct();
+        for(int f=0; f < type.getSubtypesCount(); ++f) {
+          result.addField(type.getFieldNames(f),
+              convertTypeFromProtobuf(types, type.getSubtypes(f)));
+        }
+        return result;
+      }
+      case UNION: {
+        TypeDescription result = TypeDescription.createUnion();
+        for(int f=0; f < type.getSubtypesCount(); ++f) {
+          result.addUnionChild(
+              convertTypeFromProtobuf(types, type.getSubtypes(f)));
+        }
+        return result;
+      }
+    }
+    throw new IllegalArgumentException("Unknown ORC type " + type.getKind());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/Reader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/Reader.java 
b/orc/src/java/org/apache/orc/Reader.java
index be722b5..39de763 100644
--- a/orc/src/java/org/apache/orc/Reader.java
+++ b/orc/src/java/org/apache/orc/Reader.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.orc.impl.MetadataReader;
 
 /**
  * The interface for reading ORC files.
@@ -116,9 +115,15 @@ public interface Reader {
   ColumnStatistics[] getStatistics();
 
   /**
+   * Get the type of rows in this ORC file.
+   */
+  TypeDescription getSchema();
+
+  /**
    * Get the list of types contained in the file. The root type is the first
    * type in the list.
    * @return the list of flattened types
+   * @deprecated use getSchema instead
    */
   List<OrcProto.Type> getTypes();
 
@@ -144,6 +149,7 @@ public interface Reader {
     private Boolean useZeroCopy = null;
     private Boolean skipCorruptRecords = null;
     private TypeDescription schema = null;
+    private DataReader dataReader = null;
 
     /**
      * Set the list of columns to read.
@@ -197,6 +203,11 @@ public interface Reader {
       return this;
     }
 
+    public Options dataReader(DataReader value) {
+      this.dataReader = value;
+      return this;
+    }
+
     /**
      * Set whether to skip corrupt records.
      * @param value the new skip corrupt records flag
@@ -247,6 +258,10 @@ public interface Reader {
       return skipCorruptRecords;
     }
 
+    public DataReader getDataReader() {
+      return dataReader;
+    }
+
     public Options clone() {
       Options result = new Options();
       result.include = include;
@@ -257,6 +272,7 @@ public interface Reader {
       result.columnNames = columnNames;
       result.useZeroCopy = useZeroCopy;
       result.skipCorruptRecords = skipCorruptRecords;
+      result.dataReader = dataReader == null ? null : dataReader.clone();
       return result;
     }
 
@@ -321,11 +337,6 @@ public interface Reader {
   RecordReader rowsOptions(Options options) throws IOException;
 
   /**
-   * @return Metadata reader used to read file metadata.
-   */
-  MetadataReader metadata() throws IOException;
-
-  /**
    * @return List of integers representing version of the file, in order from 
major to minor.
    */
   List<Integer> getVersionList();
@@ -351,12 +362,6 @@ public interface Reader {
   List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics();
 
   /**
-   * @param useZeroCopy Whether zero-copy read should be used.
-   * @return The default data reader that ORC is using to read bytes from disk.
-   */
-  DataReader createDefaultDataReader(boolean useZeroCopy);
-
-  /**
    * @return Serialized file metadata read from disk for the purposes of 
caching, etc.
    */
   ByteBuffer getSerializedFileFooter();

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/RecordReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/RecordReader.java 
b/orc/src/java/org/apache/orc/RecordReader.java
index 7229dda..09ba0f0 100644
--- a/orc/src/java/org/apache/orc/RecordReader.java
+++ b/orc/src/java/org/apache/orc/RecordReader.java
@@ -30,13 +30,11 @@ public interface RecordReader {
    * controlled by the callers. Caller need to look at
    * VectorizedRowBatch.size of the retunred object to know the batch
    * size read.
-   * @param previousBatch a row batch object that can be reused by the reader
-   * @return the row batch that was read. The batch will have a non-zero row
-   *         count if the pointer isn't at the end of the file
+   * @param batch a row batch object to read into
+   * @return were more rows available to read?
    * @throws java.io.IOException
    */
-  VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch
-                              ) throws IOException;
+  boolean nextBatch(VectorizedRowBatch batch) throws IOException;
 
   /**
    * Get the row number of the row that will be returned by the following

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/TypeDescription.java 
b/orc/src/java/org/apache/orc/TypeDescription.java
index bd900ac..b8e057e 100644
--- a/orc/src/java/org/apache/orc/TypeDescription.java
+++ b/orc/src/java/org/apache/orc/TypeDescription.java
@@ -61,7 +61,7 @@ public class TypeDescription {
     LIST("array", false),
     MAP("map", false),
     STRUCT("struct", false),
-    UNION("union", false);
+    UNION("uniontype", false);
 
     Category(String name, boolean isPrimitive) {
       this.name = name;
@@ -258,6 +258,66 @@ public class TypeDescription {
     return id;
   }
 
+  public TypeDescription clone() {
+    TypeDescription result = new TypeDescription(category);
+    result.maxLength = maxLength;
+    result.precision = precision;
+    result.scale = scale;
+    if (fieldNames != null) {
+      result.fieldNames.addAll(fieldNames);
+    }
+    if (children != null) {
+      for(TypeDescription child: children) {
+        TypeDescription clone = child.clone();
+        clone.parent = result;
+        result.children.add(clone);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public int hashCode() {
+    return getId();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || other.getClass() != TypeDescription.class) {
+      return false;
+    }
+    if (other == this) {
+      return true;
+    }
+    TypeDescription castOther = (TypeDescription) other;
+    if (category != castOther.category ||
+        getId() != castOther.getId() ||
+        getMaximumId() != castOther.getMaximumId() ||
+        maxLength != castOther.maxLength ||
+        scale != castOther.scale ||
+        precision != castOther.precision) {
+      return false;
+    }
+    if (children != null) {
+      if (children.size() != castOther.children.size()) {
+        return false;
+      }
+      for (int i = 0; i < children.size(); ++i) {
+        if (!children.get(i).equals(castOther.children.get(i))) {
+          return false;
+        }
+      }
+    }
+    if (category == Category.STRUCT) {
+      for(int i=0; i < fieldNames.size(); ++i) {
+        if (!fieldNames.get(i).equals(castOther.fieldNames.get(i))) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
   /**
    * Get the maximum id assigned to this type or its children.
    * The first call will cause all of the the ids in tree to be assigned, so

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/BitFieldReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/BitFieldReader.java 
b/orc/src/java/org/apache/orc/impl/BitFieldReader.java
index 8d9d3cb..dda7355 100644
--- a/orc/src/java/org/apache/orc/impl/BitFieldReader.java
+++ b/orc/src/java/org/apache/orc/impl/BitFieldReader.java
@@ -137,7 +137,7 @@ public class BitFieldReader {
                          long previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
-      if (!previous.isNull[i]) {
+      if (previous.noNulls || !previous.isNull[i]) {
         previous.vector[i] = next();
       } else {
         // The default value of null for int types in vectorized
@@ -150,7 +150,8 @@ public class BitFieldReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && ((previous.vector[i - 1] != previous.vector[i]) || 
(previous.isNull[i - 1] != previous.isNull[i]))) {
+          && ((previous.vector[0] != previous.vector[i]) ||
+          (previous.isNull[0] != previous.isNull[i]))) {
         previous.isRepeating = false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java 
b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
index 49f47d6..bb73d53 100644
--- a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -3,7 +3,7 @@ package org.apache.orc.impl;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
 
 import javax.annotation.Nullable;
 
@@ -11,14 +11,18 @@ public final class DataReaderProperties {
 
   private final FileSystem fileSystem;
   private final Path path;
-  private final CompressionCodec codec;
+  private final CompressionKind compression;
   private final boolean zeroCopy;
+  private final int typeCount;
+  private final int bufferSize;
 
   private DataReaderProperties(Builder builder) {
     this.fileSystem = builder.fileSystem;
     this.path = builder.path;
-    this.codec = builder.codec;
+    this.compression = builder.compression;
     this.zeroCopy = builder.zeroCopy;
+    this.typeCount = builder.typeCount;
+    this.bufferSize = builder.bufferSize;
   }
 
   public FileSystem getFileSystem() {
@@ -29,15 +33,22 @@ public final class DataReaderProperties {
     return path;
   }
 
-  @Nullable
-  public CompressionCodec getCodec() {
-    return codec;
+  public CompressionKind getCompression() {
+    return compression;
   }
 
   public boolean getZeroCopy() {
     return zeroCopy;
   }
 
+  public int getTypeCount() {
+    return typeCount;
+  }
+
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -46,8 +57,10 @@ public final class DataReaderProperties {
 
     private FileSystem fileSystem;
     private Path path;
-    private CompressionCodec codec;
+    private CompressionKind compression;
     private boolean zeroCopy;
+    private int typeCount;
+    private int bufferSize;
 
     private Builder() {
 
@@ -63,8 +76,8 @@ public final class DataReaderProperties {
       return this;
     }
 
-    public Builder withCodec(CompressionCodec codec) {
-      this.codec = codec;
+    public Builder withCompression(CompressionKind value) {
+      this.compression = value;
       return this;
     }
 
@@ -73,6 +86,16 @@ public final class DataReaderProperties {
       return this;
     }
 
+    public Builder withTypeCount(int value) {
+      this.typeCount = value;
+      return this;
+    }
+
+    public Builder withBufferSize(int value) {
+      this.bufferSize = value;
+      return this;
+    }
+
     public DataReaderProperties build() {
       Preconditions.checkNotNull(fileSystem);
       Preconditions.checkNotNull(path);

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java 
b/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java
deleted file mode 100644
index fc0d141..0000000
--- a/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.orc.impl;
-
-import org.apache.orc.MetadataReaderFactory;
-
-import java.io.IOException;
-
-public final class DefaultMetadataReaderFactory implements 
MetadataReaderFactory {
-
-  @Override
-  public MetadataReader create(MetadataReaderProperties properties) throws 
IOException {
-    return new MetadataReaderImpl(properties);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/InStream.java 
b/orc/src/java/org/apache/orc/impl/InStream.java
index 1893afe..851f645 100644
--- a/orc/src/java/org/apache/orc/impl/InStream.java
+++ b/orc/src/java/org/apache/orc/impl/InStream.java
@@ -53,6 +53,9 @@ public abstract class InStream extends InputStream {
     return length;
   }
 
+  @Override
+  public abstract void close();
+
   public static class UncompressedStream extends InStream {
     private List<DiskRange> bytes;
     private long length;
@@ -423,7 +426,6 @@ public abstract class InStream extends InputStream {
 
   /**
    * Create an input stream from a list of buffers.
-   * @param fileName name of the file
    * @param streamName the name of the stream
    * @param buffers the list of ranges of bytes for the stream
    * @param offsets a list of offsets (the same length as input) that must

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java 
b/orc/src/java/org/apache/orc/impl/IntegerReader.java
index 7dfd289..8bef0f1 100644
--- a/orc/src/java/org/apache/orc/impl/IntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
 /**
  * Interface for reading integers.
@@ -57,9 +57,25 @@ public interface IntegerReader {
 
   /**
    * Return the next available vector for values.
-   * @return
+   * @param column the column being read
+   * @param data the vector to read into
+   * @param length the number of numbers to read
+   * @throws IOException
+   */
+   void nextVector(ColumnVector column,
+                   long[] data,
+                   int length
+                   ) throws IOException;
+
+  /**
+   * Return the next available vector for values. Does not change the
+   * value of column.isRepeating.
+   * @param column the column being read
+   * @param data the vector to read into
+   * @param length the number of numbers to read
    * @throws IOException
    */
-   void nextVector(LongColumnVector previous, final int previousLen)
-      throws IOException;
-}
+  void nextVector(ColumnVector column,
+                  int[] data,
+                  int length
+                  ) throws IOException;}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/MetadataReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReader.java 
b/orc/src/java/org/apache/orc/impl/MetadataReader.java
deleted file mode 100644
index 500239d..0000000
--- a/orc/src/java/org/apache/orc/impl/MetadataReader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.orc.OrcProto;
-import org.apache.orc.StripeInformation;
-
-public interface MetadataReader extends Closeable {
-  OrcIndex readRowIndex(StripeInformation stripe,
-                                      OrcProto.StripeFooter footer,
-      boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns,
-      OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException;
-
-  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws 
IOException;
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java 
b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
deleted file mode 100644
index c3ea74f..0000000
--- a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.OrcProto;
-import org.apache.orc.StripeInformation;
-
-import com.google.common.collect.Lists;
-
-public class MetadataReaderImpl implements MetadataReader {
-  private final FSDataInputStream file;
-  private final CompressionCodec codec;
-  private final int bufferSize;
-  private final int typeCount;
-
-  MetadataReaderImpl(MetadataReaderProperties properties) throws IOException {
-    this.file = properties.getFileSystem().open(properties.getPath());
-    this.codec = properties.getCodec();
-    this.bufferSize = properties.getBufferSize();
-    this.typeCount = properties.getTypeCount();
-  }
-
-  @Override
-  public OrcIndex readRowIndex(StripeInformation stripe,
-      OrcProto.StripeFooter footer, boolean[] included, OrcProto.RowIndex[] 
indexes,
-      boolean[] sargColumns, OrcProto.BloomFilterIndex[] bloomFilterIndices) 
throws IOException {
-    if (footer == null) {
-      footer = readStripeFooter(stripe);
-    }
-    if (indexes == null) {
-      indexes = new OrcProto.RowIndex[typeCount];
-    }
-    if (bloomFilterIndices == null) {
-      bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
-    }
-    long offset = stripe.getOffset();
-    List<OrcProto.Stream> streams = footer.getStreamsList();
-    for (int i = 0; i < streams.size(); i++) {
-      OrcProto.Stream stream = streams.get(i);
-      OrcProto.Stream nextStream = null;
-      if (i < streams.size() - 1) {
-        nextStream = streams.get(i+1);
-      }
-      int col = stream.getColumn();
-      int len = (int) stream.getLength();
-      // row index stream and bloom filter are interlaced, check if the sarg 
column contains bloom
-      // filter and combine the io to read row index and bloom filters for 
that column together
-      if (stream.hasKind() && (stream.getKind() == 
OrcProto.Stream.Kind.ROW_INDEX)) {
-        boolean readBloomFilter = false;
-        if (sargColumns != null && sargColumns[col] &&
-            nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
-          len += nextStream.getLength();
-          i += 1;
-          readBloomFilter = true;
-        }
-        if ((included == null || included[col]) && indexes[col] == null) {
-          byte[] buffer = new byte[len];
-          file.readFully(offset, buffer, 0, buffer.length);
-          ByteBuffer bb = ByteBuffer.wrap(buffer);
-          indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
-              Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), 
stream.getLength(),
-              codec, bufferSize));
-          if (readBloomFilter) {
-            bb.position((int) stream.getLength());
-            bloomFilterIndices[col] = 
OrcProto.BloomFilterIndex.parseFrom(InStream.create(
-                "bloom_filter", Lists.<DiskRange>newArrayList(new 
BufferChunk(bb, 0)),
-                nextStream.getLength(), codec, bufferSize));
-          }
-        }
-      }
-      offset += len;
-    }
-
-    OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
-    return index;
-  }
-
-  @Override
-  public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) 
throws IOException {
-    long offset = stripe.getOffset() + stripe.getIndexLength() + 
stripe.getDataLength();
-    int tailLength = (int) stripe.getFooterLength();
-
-    // read the footer
-    ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
-    file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
-    return 
OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
-        Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
-        tailLength, codec, bufferSize));
-  }
-
-  @Override
-  public void close() throws IOException {
-    file.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java 
b/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java
deleted file mode 100644
index 321931c..0000000
--- a/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package org.apache.orc.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.orc.CompressionCodec;
-
-import javax.annotation.Nullable;
-
-public final class MetadataReaderProperties {
-
-  private final FileSystem fileSystem;
-  private final Path path;
-  private final CompressionCodec codec;
-  private final int bufferSize;
-  private final int typeCount;
-
-  private MetadataReaderProperties(Builder builder) {
-    this.fileSystem = builder.fileSystem;
-    this.path = builder.path;
-    this.codec = builder.codec;
-    this.bufferSize = builder.bufferSize;
-    this.typeCount = builder.typeCount;
-  }
-
-  public FileSystem getFileSystem() {
-    return fileSystem;
-  }
-
-  public Path getPath() {
-    return path;
-  }
-
-  @Nullable
-  public CompressionCodec getCodec() {
-    return codec;
-  }
-
-  public int getBufferSize() {
-    return bufferSize;
-  }
-
-  public int getTypeCount() {
-    return typeCount;
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  public static class Builder {
-
-    private FileSystem fileSystem;
-    private Path path;
-    private CompressionCodec codec;
-    private int bufferSize;
-    private int typeCount;
-
-    private Builder() {
-
-    }
-
-    public Builder withFileSystem(FileSystem fileSystem) {
-      this.fileSystem = fileSystem;
-      return this;
-    }
-
-    public Builder withPath(Path path) {
-      this.path = path;
-      return this;
-    }
-
-    public Builder withCodec(CompressionCodec codec) {
-      this.codec = codec;
-      return this;
-    }
-
-    public Builder withBufferSize(int bufferSize) {
-      this.bufferSize = bufferSize;
-      return this;
-    }
-
-    public Builder withTypeCount(int typeCount) {
-      this.typeCount = typeCount;
-      return this;
-    }
-
-    public MetadataReaderProperties build() {
-      Preconditions.checkNotNull(fileSystem);
-      Preconditions.checkNotNull(path);
-
-      return new MetadataReaderProperties(this);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java 
b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
index 380f3391..24bd051 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
 import java.io.EOFException;
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
 /**
  * A reader that reads a sequence of bytes. A control byte is read before
@@ -92,16 +92,16 @@ public class RunLengthByteReader {
     return result;
   }
 
-  public void nextVector(LongColumnVector previous, long previousLen)
+  public void nextVector(ColumnVector previous, long[] data, long size)
       throws IOException {
     previous.isRepeating = true;
-    for (int i = 0; i < previousLen; i++) {
+    for (int i = 0; i < size; i++) {
       if (!previous.isNull[i]) {
-        previous.vector[i] = next();
+        data[i] = next();
       } else {
         // The default value of null for int types in vectorized
         // processing is 1, so set that if the value is null
-        previous.vector[i] = 1;
+        data[i] = 1;
       }
 
       // The default value for nulls in Vectorization for int types is 1
@@ -109,12 +109,36 @@ public class RunLengthByteReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && ((previous.vector[i - 1] != previous.vector[i]) || 
(previous.isNull[i - 1] != previous.isNull[i]))) {
+          && ((data[0] != data[i]) ||
+              (previous.isNull[0] != previous.isNull[i]))) {
         previous.isRepeating = false;
       }
     }
   }
 
+  /**
+   * Read the next size bytes into the data array, skipping over any slots
+   * where isNull is true.
+   * @param isNull if non-null, skip any rows where isNull[r] is true
+   * @param data the array to read into
+   * @param size the number of elements to read
+   * @throws IOException
+   */
+  public void nextVector(boolean[] isNull, int[] data,
+                         long size) throws IOException {
+    if (isNull == null) {
+      for(int i=0; i < size; ++i) {
+        data[i] = next();
+      }
+    } else {
+      for(int i=0; i < size; ++i) {
+        if (!isNull[i]) {
+          data[i] = next();
+        }
+      }
+    }
+  }
+
   public void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java 
b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
index 0c90cde..b91a263 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
 import java.io.EOFException;
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
 /**
  * A reader that reads a sequence of integers.
@@ -99,15 +99,17 @@ public class RunLengthIntegerReader implements 
IntegerReader {
   }
 
   @Override
-  public void nextVector(LongColumnVector previous, final int previousLen) 
throws IOException {
+  public void nextVector(ColumnVector previous,
+                         long[] data,
+                         int previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
-        previous.vector[i] = next();
+        data[i] = next();
       } else {
         // The default value of null for int type in vectorized
         // processing is 1, so set that if the value is null
-        previous.vector[i] = 1;
+        data[i] = 1;
       }
 
       // The default value for nulls in Vectorization for int types is 1
@@ -115,13 +117,32 @@ public class RunLengthIntegerReader implements 
IntegerReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && (previous.vector[i - 1] != previous.vector[i] || 
previous.isNull[i - 1] != previous.isNull[i])) {
+          && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) 
{
         previous.isRepeating = false;
       }
     }
   }
 
   @Override
+  public void nextVector(ColumnVector vector,
+                         int[] data,
+                         int size) throws IOException {
+    if (vector.noNulls) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        data[r] = (int) next();
+      }
+    } else if (!(vector.isRepeating && vector.isNull[0])) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        if (!vector.isNull[r]) {
+          data[r] = (int) next();
+        } else {
+          data[r] = 1;
+        }
+      }
+    }
+  }
+
+  @Override
   public void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java 
b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
index c6d685a..610d9b5 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
@@ -21,9 +21,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 
 /**
  * A reader that reads a sequence of light weight compressed integers. Refer
@@ -360,15 +360,17 @@ public class RunLengthIntegerReaderV2 implements 
IntegerReader {
   }
 
   @Override
-  public void nextVector(LongColumnVector previous, final int previousLen) 
throws IOException {
+  public void nextVector(ColumnVector previous,
+                         long[] data,
+                         int previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
-        previous.vector[i] = next();
+        data[i] = next();
       } else {
         // The default value of null for int type in vectorized
         // processing is 1, so set that if the value is null
-        previous.vector[i] = 1;
+        data[i] = 1;
       }
 
       // The default value for nulls in Vectorization for int types is 1
@@ -376,10 +378,29 @@ public class RunLengthIntegerReaderV2 implements 
IntegerReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && (previous.vector[i - 1] != previous.vector[i] ||
-          previous.isNull[i - 1] != previous.isNull[i])) {
+          && (data[0] != data[i] ||
+          previous.isNull[0] != previous.isNull[i])) {
         previous.isRepeating = false;
       }
     }
   }
+
+  @Override
+  public void nextVector(ColumnVector vector,
+                         int[] data,
+                         int size) throws IOException {
+    if (vector.noNulls) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        data[r] = (int) next();
+      }
+    } else if (!(vector.isRepeating && vector.isNull[0])) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        if (!vector.isNull[r]) {
+          data[r] = (int) next();
+        } else {
+          data[r] = 1;
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java 
b/orc/src/java/org/apache/orc/impl/WriterImpl.java
index f8afe06..b2966e0 100644
--- a/orc/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -1693,9 +1693,10 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     }
   }
 
+  public static long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+  public static long NANOS_PER_MILLI = 1000000;
   public static final int MILLIS_PER_SECOND = 1000;
   static final int NANOS_PER_SECOND = 1000000000;
-  static final int MILLIS_PER_NANO  = 1000000;
   public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
 
   private static class TimestampTreeWriter extends TreeWriter {
@@ -2261,32 +2262,36 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
         }
       } else {
         // write the records in runs of the same tag
-        byte prevTag = 0;
-        int currentRun = 0;
-        boolean started = false;
+        int[] currentStart = new int[vec.fields.length];
+        int[] currentLength = new int[vec.fields.length];
         for(int i=0; i < length; ++i) {
-          if (!vec.isNull[i + offset]) {
+          // only need to deal with the non-nulls, since the nulls were dealt
+          // with in the super method.
+          if (vec.noNulls || !vec.isNull[i + offset]) {
             byte tag = (byte) vec.tags[offset + i];
             tags.write(tag);
-            if (!started) {
-              started = true;
-              currentRun = i;
-              prevTag = tag;
-            } else if (tag != prevTag) {
-              childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
-                  offset + currentRun, i - currentRun);
-              currentRun = i;
-              prevTag = tag;
+            if (currentLength[tag] == 0) {
+              // start a new sequence
+              currentStart[tag] = i + offset;
+              currentLength[tag] = 1;
+            } else if (currentStart[tag] + currentLength[tag] == i + offset) {
+              // ok, we are extending the current run for that tag.
+              currentLength[tag] += 1;
+            } else {
+              // otherwise, we need to close off the old run and start a new 
one
+              childrenWriters[tag].writeBatch(vec.fields[tag],
+                  currentStart[tag], currentLength[tag]);
+              currentStart[tag] = i + offset;
+              currentLength[tag] = 1;
             }
-          } else if (started) {
-            started = false;
-            childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
-                offset + currentRun, i - currentRun);
           }
         }
-        if (started) {
-          childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
-              offset + currentRun, length - currentRun);
+        // write out any left over sequences
+        for(int tag=0; tag < currentStart.length; ++tag) {
+          if (currentLength[tag] != 0) {
+            childrenWriters[tag].writeBatch(vec.fields[tag], currentStart[tag],
+                currentLength[tag]);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java 
b/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
index 9ec08f3..b9918f2 100644
--- a/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
+++ b/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java
@@ -3,6 +3,7 @@ package org.apache.orc.impl;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -14,7 +15,6 @@ public class TestDataReaderProperties {
 
   private FileSystem mockedFileSystem = mock(FileSystem.class);
   private Path mockedPath = mock(Path.class);
-  private CompressionCodec mockedCodec = mock(CompressionCodec.class);
   private boolean mockedZeroCopy = false;
 
   @Test
@@ -22,12 +22,12 @@ public class TestDataReaderProperties {
     DataReaderProperties properties = DataReaderProperties.builder()
       .withFileSystem(mockedFileSystem)
       .withPath(mockedPath)
-      .withCodec(mockedCodec)
+      .withCompression(CompressionKind.ZLIB)
       .withZeroCopy(mockedZeroCopy)
       .build();
     assertEquals(mockedFileSystem, properties.getFileSystem());
     assertEquals(mockedPath, properties.getPath());
-    assertEquals(mockedCodec, properties.getCodec());
+    assertEquals(CompressionKind.ZLIB, properties.getCompression());
     assertEquals(mockedZeroCopy, properties.getZeroCopy());
   }
 
@@ -39,7 +39,7 @@ public class TestDataReaderProperties {
       .build();
     assertEquals(mockedFileSystem, properties.getFileSystem());
     assertEquals(mockedPath, properties.getPath());
-    assertNull(properties.getCodec());
+    assertNull(properties.getCompression());
     assertFalse(properties.getZeroCopy());
   }
 
@@ -52,7 +52,7 @@ public class TestDataReaderProperties {
   public void testMissingPath() {
     DataReaderProperties.builder()
       .withFileSystem(mockedFileSystem)
-      .withCodec(mockedCodec)
+      .withCompression(CompressionKind.NONE)
       .withZeroCopy(mockedZeroCopy)
       .build();
   }
@@ -61,7 +61,7 @@ public class TestDataReaderProperties {
   public void testMissingFileSystem() {
     DataReaderProperties.builder()
       .withPath(mockedPath)
-      .withCodec(mockedCodec)
+      .withCompression(CompressionKind.NONE)
       .withZeroCopy(mockedZeroCopy)
       .build();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java 
b/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java
deleted file mode 100644
index 12e8eb4..0000000
--- a/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.orc.impl;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.orc.CompressionCodec;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-public class TestMetadataReaderProperties {
-
-  private FileSystem mockedFileSystem = mock(FileSystem.class);
-  private Path mockedPath = mock(Path.class);
-  private CompressionCodec mockedCodec = mock(CompressionCodec.class);
-  private int mockedBufferSize = 0;
-  private int mockedTypeCount = 0;
-
-  @Test
-  public void testCompleteBuild() {
-    MetadataReaderProperties properties = MetadataReaderProperties.builder()
-      .withFileSystem(mockedFileSystem)
-      .withPath(mockedPath)
-      .withCodec(mockedCodec)
-      .withBufferSize(mockedBufferSize)
-      .withTypeCount(mockedTypeCount)
-      .build();
-    assertEquals(mockedFileSystem, properties.getFileSystem());
-    assertEquals(mockedPath, properties.getPath());
-    assertEquals(mockedCodec, properties.getCodec());
-    assertEquals(mockedBufferSize, properties.getBufferSize());
-    assertEquals(mockedTypeCount, properties.getTypeCount());
-  }
-
-  @Test
-  public void testMissingNonRequiredArgs() {
-    MetadataReaderProperties properties = MetadataReaderProperties.builder()
-      .withFileSystem(mockedFileSystem)
-      .withPath(mockedPath)
-      .build();
-    assertEquals(mockedFileSystem, properties.getFileSystem());
-    assertEquals(mockedPath, properties.getPath());
-    assertNull(properties.getCodec());
-    assertEquals(0, properties.getBufferSize());
-    assertEquals(0, properties.getTypeCount());
-  }
-
-  @Test(expected = java.lang.NullPointerException.class)
-  public void testEmptyBuild() {
-    MetadataReaderProperties.builder().build();
-  }
-
-  @Test(expected = java.lang.NullPointerException.class)
-  public void testMissingPath() {
-    MetadataReaderProperties.builder()
-      .withFileSystem(mockedFileSystem)
-      .withCodec(mockedCodec)
-      .withBufferSize(mockedBufferSize)
-      .build();
-  }
-
-  @Test(expected = java.lang.NullPointerException.class)
-  public void testMissingFileSystem() {
-    MetadataReaderProperties.builder()
-      .withPath(mockedPath)
-      .withCodec(mockedCodec)
-      .withBufferSize(mockedBufferSize)
-      .build();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
index 0724191..82a97e0 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
@@ -215,12 +215,9 @@ public class VectorizedRowBatchCtx {
     LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + 
Arrays.toString(columnsToIncludeTruncated));
     int totalColumnCount = rowColumnTypeInfos.length + 
scratchColumnTypeNames.length;
     VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
-
-    for (int i = 0; i < columnsToIncludeTruncated.length; i++) {
-      if (columnsToIncludeTruncated[i]) {
-        TypeInfo typeInfo = rowColumnTypeInfos[i];
-        result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
-      }
+    for (int i = 0; i < dataColumnCount; i++) {
+      TypeInfo typeInfo = rowColumnTypeInfos[i];
+      result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
     }
 
     for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; 
i++) {
@@ -476,8 +473,8 @@ public class VectorizedRowBatchCtx {
             bcv.isNull[0] = true;
             bcv.isRepeating = true;
           } else {
-            bcv.fill(sVal.getBytes());
-            bcv.isNull[0] = false;
+            bcv.setVal(0, sVal.getBytes());
+            bcv.isRepeating = true;
           }
         }
         break;

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java
deleted file mode 100644
index de3471c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.apache.orc.DataReader;
-import org.apache.orc.DataReaderFactory;
-import org.apache.orc.impl.DataReaderProperties;
-
-public final class DefaultDataReaderFactory implements DataReaderFactory {
-
-  @Override
-  public DataReader create(DataReaderProperties properties) {
-    return RecordReaderUtils.createDefaultDataReader(properties);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index fe0be7b..fcb8ca4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -301,7 +301,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     /**
      * Do we have schema on read in the configuration variables?
      */
-    TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ 
false);
+    TypeDescription schema = getDesiredRowTypeDescr(conf, false, 
Integer.MAX_VALUE);
 
     Reader.Options options = new Reader.Options().range(offset, length);
     options.schema(schema);
@@ -1743,7 +1743,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     /**
      * Do we have schema on read in the configuration variables?
      */
-    TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ 
true);
+    TypeDescription schema = getDesiredRowTypeDescr(conf, true, 
Integer.MAX_VALUE);
 
     final Reader reader;
     final int bucket;
@@ -1994,10 +1994,13 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
   /**
    * Convert a Hive type property string that contains separated type names 
into a list of
    * TypeDescription objects.
+   * @param hiveTypeProperty the desired types from hive
+   * @param maxColumns the maximum number of desired columns
    * @return the list of TypeDescription objects.
    */
-  public static ArrayList<TypeDescription> 
typeDescriptionsFromHiveTypeProperty(
-      String hiveTypeProperty) {
+  public static ArrayList<TypeDescription>
+      typeDescriptionsFromHiveTypeProperty(String hiveTypeProperty,
+                                           int maxColumns) {
 
     // CONSDIER: We need a type name parser for TypeDescription.
 
@@ -2005,6 +2008,9 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     ArrayList<TypeDescription> typeDescrList =new 
ArrayList<TypeDescription>(typeInfoList.size());
     for (TypeInfo typeInfo : typeInfoList) {
       typeDescrList.add(convertTypeInfo(typeInfo));
+      if (typeDescrList.size() >= maxColumns) {
+        break;
+      }
     }
     return typeDescrList;
   }
@@ -2091,8 +2097,18 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     }
   }
 
-  public static TypeDescription getDesiredRowTypeDescr(Configuration conf, 
boolean isAcidRead)
-      throws IOException {
+  /**
+   * Generate the desired schema for reading the file.
+   * @param conf the configuration
+   * @param isAcidRead is this an acid format?
+   * @param dataColumns the desired number of data columns for vectorized read
+   * @return the desired schema or null if schema evolution isn't enabled
+   * @throws IOException
+   */
+  public static TypeDescription getDesiredRowTypeDescr(Configuration conf,
+                                                       boolean isAcidRead,
+                                                       int dataColumns
+                                                       ) throws IOException {
 
     String columnNameProperty = null;
     String columnTypeProperty = null;
@@ -2115,8 +2131,10 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
           haveSchemaEvolutionProperties = false;
         } else {
           schemaEvolutionTypeDescrs =
-              typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
-          if (schemaEvolutionTypeDescrs.size() != 
schemaEvolutionColumnNames.size()) {
+              typeDescriptionsFromHiveTypeProperty(columnTypeProperty,
+                  dataColumns);
+          if (schemaEvolutionTypeDescrs.size() !=
+              Math.min(dataColumns, schemaEvolutionColumnNames.size())) {
             haveSchemaEvolutionProperties = false;
           }
         }
@@ -2147,8 +2165,9 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         return null;
       }
       schemaEvolutionTypeDescrs =
-          typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
-      if (schemaEvolutionTypeDescrs.size() != 
schemaEvolutionColumnNames.size()) {
+          typeDescriptionsFromHiveTypeProperty(columnTypeProperty, 
dataColumns);
+      if (schemaEvolutionTypeDescrs.size() !=
+          Math.min(dataColumns, schemaEvolutionColumnNames.size())) {
         return null;
       }
 
@@ -2162,7 +2181,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         }
         columnNum++;
       }
-      if (virtualColumnClipNum != -1) {
+      if (virtualColumnClipNum != -1 && virtualColumnClipNum < dataColumns) {
         schemaEvolutionColumnNames =
             Lists.newArrayList(schemaEvolutionColumnNames.subList(0, 
virtualColumnClipNum));
         schemaEvolutionTypeDescrs = 
Lists.newArrayList(schemaEvolutionTypeDescrs.subList(0, virtualColumnClipNum));
@@ -2179,7 +2198,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
 
     // Desired schema does not include virtual columns or partition columns.
     TypeDescription result = TypeDescription.createStruct();
-    for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) {
+    for (int i = 0; i < schemaEvolutionTypeDescrs.size(); i++) {
       result.addField(schemaEvolutionColumnNames.get(i), 
schemaEvolutionTypeDescrs.get(i));
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 1fce282..0dd58b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -447,7 +447,8 @@ public class OrcRawRecordMerger implements 
AcidInputFormat.RawReader<OrcStruct>{
     this.length = options.getLength();
     this.validTxnList = validTxnList;
 
-    TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, /* 
isAcidRead */ true);
+    TypeDescription typeDescr =
+        OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
 
     objectInspector = OrcRecordUpdater.createEventSchema
         (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));

http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 822ef14..b7437be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -23,23 +23,20 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.orc.DataReaderFactory;
-import org.apache.orc.MetadataReaderFactory;
+import com.google.common.collect.Lists;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.BufferChunk;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.impl.ColumnStatisticsImpl;
 import org.apache.orc.CompressionCodec;
-import org.apache.orc.DataReader;
 import org.apache.orc.FileMetaInfo;
 import org.apache.orc.FileMetadata;
-import org.apache.orc.impl.DataReaderProperties;
-import org.apache.orc.impl.DefaultMetadataReaderFactory;
 import org.apache.orc.impl.InStream;
-import org.apache.orc.impl.MetadataReader;
-import org.apache.orc.impl.MetadataReaderProperties;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.StripeStatistics;
 import org.slf4j.Logger;
@@ -56,8 +53,6 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Text;
 import org.apache.orc.OrcProto;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.protobuf.CodedInputStream;
 
 public class ReaderImpl implements Reader {
@@ -75,13 +70,12 @@ public class ReaderImpl implements Reader {
   private final List<OrcProto.StripeStatistics> stripeStats;
   private final int metadataSize;
   protected final List<OrcProto.Type> types;
+  private final TypeDescription schema;
   private final List<OrcProto.UserMetadataItem> userMetadata;
   private final List<OrcProto.ColumnStatistics> fileStats;
   private final List<StripeInformation> stripes;
   protected final int rowIndexStride;
   private final long contentLength, numberOfRows;
-  private final MetadataReaderFactory metadataReaderFactory = new 
DefaultMetadataReaderFactory();
-  private final DataReaderFactory dataReaderFactory = new 
DefaultDataReaderFactory();
 
   private final ObjectInspector inspector;
   private long deserializedSize = -1;
@@ -248,6 +242,11 @@ public class ReaderImpl implements Reader {
     return result;
   }
 
+  @Override
+  public TypeDescription getSchema() {
+    return schema;
+  }
+
   /**
    * Ensure this is an ORC file to prevent users from trying to read text
    * files or RC files as ORC files.
@@ -391,11 +390,13 @@ public class ReaderImpl implements Reader {
       this.writerVersion = footerMetaData.writerVersion;
       this.stripes = 
convertProtoStripesToStripes(rInfo.footer.getStripesList());
     }
+    this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0);
   }
+
   /**
    * Get the WriterVersion based on the ORC file postscript.
    * @param writerVersion the integer writer version
-   * @return
+   * @return the writer version of the file
    */
   static OrcFile.WriterVersion getWriterVersion(int writerVersion) {
     for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) {
@@ -672,20 +673,7 @@ public class ReaderImpl implements Reader {
       Arrays.fill(include, true);
       options.include(include);
     }
-
-    return RecordReaderImpl.builder()
-        .withMetadataReaderFactory(metadataReaderFactory)
-        .withDataReaderFactory(dataReaderFactory)
-        .withStripes(this.getStripes())
-        .withFileSystem(fileSystem)
-        .withPath(path)
-        .withOptions(options)
-        .withTypes(types)
-        .withCodec(codec)
-        .withBufferSize(bufferSize)
-        .withStrideRate(rowIndexStride)
-        .withConf(conf)
-        .build();
+    return new RecordReaderImpl(this, options);
   }
 
 
@@ -837,7 +825,7 @@ public class ReaderImpl implements Reader {
   }
 
   private int getLastIdx() {
-    Set<Integer> indices = Sets.newHashSet();
+    Set<Integer> indices = new HashSet<>();
     for (OrcProto.Type type : types) {
       indices.addAll(type.getSubtypesList());
     }
@@ -856,7 +844,7 @@ public class ReaderImpl implements Reader {
 
   @Override
   public List<StripeStatistics> getStripeStatistics() {
-    List<StripeStatistics> result = Lists.newArrayList();
+    List<StripeStatistics> result = new ArrayList<>();
     for (OrcProto.StripeStatistics ss : stripeStats) {
       result.add(new StripeStatistics(ss.getColStatsList()));
     }
@@ -868,17 +856,6 @@ public class ReaderImpl implements Reader {
   }
 
   @Override
-  public MetadataReader metadata() throws IOException {
-    return metadataReaderFactory.create(MetadataReaderProperties.builder()
-      .withBufferSize(bufferSize)
-      .withCodec(codec)
-      .withFileSystem(fileSystem)
-      .withPath(path)
-      .withTypeCount(types.size())
-      .build());
-  }
-
-  @Override
   public List<Integer> getVersionList() {
     return versionList;
   }
@@ -889,16 +866,6 @@ public class ReaderImpl implements Reader {
   }
 
   @Override
-  public DataReader createDefaultDataReader(boolean useZeroCopy) {
-    return dataReaderFactory.create(DataReaderProperties.builder()
-      .withFileSystem(fileSystem)
-      .withPath(path)
-      .withCodec(codec)
-      .withZeroCopy(useZeroCopy)
-      .build());
-  }
-
-  @Override
   public String toString() {
     StringBuilder buffer = new StringBuilder();
     buffer.append("ORC Reader(");

Reply via email to