HIVE-18738 : LLAP IO ACID - includes handling is broken (Sergey Shelukhin, 
reviewed by Teddy Choi)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1a3090f8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1a3090f8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1a3090f8

Branch: refs/heads/master
Commit: 1a3090f858ce3ac18d1eb799d87a45b3ee0defce
Parents: a4198f5
Author: sergey <ser...@apache.org>
Authored: Fri Mar 2 18:49:59 2018 -0800
Committer: sergey <ser...@apache.org>
Committed: Fri Mar 2 18:49:59 2018 -0800

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   1 +
 .../llap/io/api/impl/ColumnVectorBatch.java     |  42 ++
 .../hive/llap/io/api/impl/LlapInputFormat.java  |   9 +-
 .../hive/llap/io/api/impl/LlapRecordReader.java | 286 +++++++++-----
 .../llap/io/decode/ColumnVectorProducer.java    |  19 +-
 .../io/decode/GenericColumnVectorProducer.java  |  17 +-
 .../llap/io/decode/OrcColumnVectorProducer.java |  17 +-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |  55 +--
 .../hive/llap/io/decode/ReadPipeline.java       |   1 -
 .../llap/io/encoded/OrcEncodedDataReader.java   |  60 +--
 .../llap/io/encoded/SerDeEncodedDataReader.java |  18 +-
 .../hive/ql/exec/vector/VectorExtractRow.java   |   6 +-
 .../ql/exec/vector/VectorSelectOperator.java    |   1 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  53 ++-
 .../io/orc/VectorizedOrcAcidRowBatchReader.java | 191 ++++-----
 .../hive/ql/io/orc/encoded/EncodedReader.java   |   4 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    |  32 +-
 .../orc/encoded/EncodedTreeReaderFactory.java   |  44 +--
 .../hadoop/hive/ql/io/orc/encoded/Reader.java   |  16 +-
 ql/src/test/queries/clientpositive/llap_acid2.q |  84 ++++
 .../clientpositive/llap/llap_acid2.q.out        | 392 +++++++++++++++++++
 .../common/io/encoded/EncodedColumnBatch.java   |   9 +-
 22 files changed, 1013 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 2776fe9..544c836 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -576,6 +576,7 @@ minillaplocal.query.files=\
   lineage2.q,\
   lineage3.q,\
   list_bucket_dml_10.q,\
+  llap_acid2.q,\
   llap_partitioned.q,\
   llap_vector_nohybridgrace.q,\
   load_data_acid_rename.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java
index 9262bf0..19b0b55 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java
@@ -43,4 +43,46 @@ public class ColumnVectorBatch {
     other[otherIx] = cols[ix];
     cols[ix] = old;
   }
+
+  
+  @Override
+  public String toString() {
+    if (size == 0) {
+      return "";
+    }
+    StringBuilder b = new StringBuilder();
+    b.append("Column vector types: ");
+    for (int k = 0; k < cols.length; k++) {
+      ColumnVector cv = cols[k];
+      b.append(k);
+      b.append(":");
+      b.append(cv == null ? "null" : 
cv.getClass().getSimpleName().replace("ColumnVector", ""));
+    }
+    b.append('\n');
+
+
+    for (int i = 0; i < size; i++) {
+      b.append('[');
+      for (int k = 0; k < cols.length; k++) {
+        ColumnVector cv = cols[k];
+        if (k > 0) {
+          b.append(", ");
+        }
+        if (cv == null) continue;
+        if (cv != null) {
+          try {
+            cv.stringifyValue(b, i);
+          } catch (Exception ex) {
+            b.append("invalid");
+          }
+        }
+      }
+      b.append(']');
+      if (i < size - 1) {
+        b.append('\n');
+      }
+    }
+
+    return b.toString();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index bb319f0..6d29163 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -100,9 +100,12 @@ public class LlapInputFormat implements 
InputFormat<NullWritable, VectorizedRowB
     FileSplit fileSplit = (FileSplit) split;
     reporter.setStatus(fileSplit.toString());
     try {
-      List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
+      // At this entry point, we are going to assume that these are logical 
table columns.
+      // Perhaps we should go thru the code and clean this up to be more 
explicit; for now, we
+      // will start with this single assumption and maintain clear semantics 
from here.
+      List<Integer> tableIncludedCols = 
ColumnProjectionUtils.isReadAllColumns(job)
           ? null : ColumnProjectionUtils.getReadColumnIDs(job);
-      LlapRecordReader rr = LlapRecordReader.create(job, fileSplit, 
includedCols, hostName,
+      LlapRecordReader rr = LlapRecordReader.create(job, fileSplit, 
tableIncludedCols, hostName,
           cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf);
       if (rr == null) {
         // Reader-specific incompatibility like SMB or schema evolution.
@@ -111,7 +114,7 @@ public class LlapInputFormat implements 
InputFormat<NullWritable, VectorizedRowB
       // For non-vectorized operator case, wrap the reader if possible.
       RecordReader<NullWritable, VectorizedRowBatch> result = rr;
       if (!Utilities.getIsVectorized(job)) {
-        result = wrapLlapReader(includedCols, rr, split);
+        result = wrapLlapReader(tableIncludedCols, rr, split);
         if (result == null) {
           // Cannot wrap a reader for non-vectorized pipeline.
           return sourceInputFormat.getRecordReader(split, job, reporter);

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index a69c9a0..3a2c19a 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
+import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes;
+import 
org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -44,15 +46,16 @@ import 
org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -70,17 +73,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 class LlapRecordReader
     implements RecordReader<NullWritable, VectorizedRowBatch>, 
Consumer<ColumnVectorBatch> {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapRecordReader.class);
   private static final Object DONE_OBJECT = new Object();
 
   private final FileSplit split;
-  private List<Integer> columnIds;
+  private final IncludesImpl includes;
   private final SearchArgument sarg;
-  private final String[] columnNames;
   private final VectorizedRowBatchCtx rbCtx;
   private final Object[] partitionValues;
 
@@ -100,21 +104,19 @@ class LlapRecordReader
   private final JobConf jobConf;
   private final ReadPipeline rp;
   private final ExecutorService executor;
-  private final int columnCount;
   private final boolean isAcidScan;
 
   /**
    * Creates the record reader and checks the input-specific compatibility.
    * @return The reader if the split can be read, null otherwise.
    */
-  public static LlapRecordReader create(JobConf job, FileSplit split, 
List<Integer> includedCols,
-      String hostName, ColumnVectorProducer cvp, ExecutorService executor,
-      InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter 
reporter,
-      Configuration daemonConf)
-          throws IOException, HiveException {
+  public static LlapRecordReader create(JobConf job, FileSplit split,
+      List<Integer> tableIncludedCols, String hostName, ColumnVectorProducer 
cvp,
+      ExecutorService executor, InputFormat<?, ?> sourceInputFormat, 
Deserializer sourceSerDe,
+      Reporter reporter, Configuration daemonConf) throws IOException, 
HiveException {
     MapWork mapWork = findMapWork(job);
     if (mapWork == null) return null; // No compatible MapWork.
-    LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, 
includedCols, hostName,
+    LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, 
tableIncludedCols, hostName,
         cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf);
     if (!rr.checkOrcSchemaEvolution()) {
       rr.close();
@@ -124,7 +126,7 @@ class LlapRecordReader
   }
 
   private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split,
-      List<Integer> includedCols, String hostName, ColumnVectorProducer cvp,
+      List<Integer> tableIncludedCols, String hostName, ColumnVectorProducer 
cvp,
       ExecutorService executor, InputFormat<?, ?> sourceInputFormat, 
Deserializer sourceSerDe,
       Reporter reporter, Configuration daemonConf) throws IOException, 
HiveException {
     this.executor = executor;
@@ -132,7 +134,6 @@ class LlapRecordReader
     this.split = split;
 
     this.sarg = ConvertAstToSearchArg.createFromConf(job);
-    this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
     final String fragmentId = LlapTezUtils.getFragmentId(job);
     final String dagId = LlapTezUtils.getDagId(job);
     final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
@@ -152,32 +153,11 @@ class LlapRecordReader
     VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
     rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork);
 
-    // Note: columnIds below makes additional changes for ACID. Don't use this 
var directly.
-    if (includedCols == null) {
-      // Assume including everything means the VRB will have everything.
-      includedCols = new ArrayList<>(rbCtx.getRowColumnTypeInfos().length);
-      for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) {
-        includedCols.add(i);
-      }
-    }
-
     isAcidScan = AcidUtils.isFullAcidScan(jobConf);
     TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(
         job, isAcidScan, Integer.MAX_VALUE);
-    if (isAcidScan) {
-      this.columnIds = new ArrayList<>();
-      final int ACID_FIELDS = OrcInputFormat.getRootColumn(false);
-      for (int i = 0; i < ACID_FIELDS; i++) {
-        columnIds.add(i);
-      }
-      for (int i = 0; i < includedCols.size(); i++) {
-        columnIds.add(i + ACID_FIELDS);
-      }
-      this.columnCount = columnIds.size();
-    } else {
-      this.columnIds = includedCols;
-      this.columnCount = columnIds.size();
-    }
+
+    this.includes = new IncludesImpl(tableIncludedCols, isAcidScan, rbCtx, 
schema, job);
 
     int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, 
job, daemonConf);
     int queueLimitMin =  getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, 
job, daemonConf);
@@ -195,9 +175,8 @@ class LlapRecordReader
     }
 
     // Create the consumer of encoded data; it will coordinate decoding to 
CVBs.
-    feedback = rp = cvp.createReadPipeline(this, split, columnIds, sarg, 
columnNames,
-        counters, schema, sourceInputFormat, sourceSerDe, reporter, job,
-        mapWork.getPathToPartitionInfo());
+    feedback = rp = cvp.createReadPipeline(this, split, includes, sarg, 
counters, includes,
+        sourceInputFormat, sourceSerDe, reporter, job, 
mapWork.getPathToPartitionInfo());
   }
 
   private static int getQueueVar(ConfVars var, JobConf jobConf, Configuration 
daemonConf) {
@@ -287,11 +266,11 @@ class LlapRecordReader
 
   private boolean checkOrcSchemaEvolution() {
     SchemaEvolution evolution = rp.getSchemaEvolution();
-    for (int i = 0; i < columnCount; ++i) {
-      int projectedColId = columnIds == null ? i : columnIds.get(i);
+    // TODO: should this just use physical IDs?
+    for (int i = 0; i < includes.getReaderLogicalColumnIds().size(); ++i) {
+      int projectedColId = includes.getReaderLogicalColumnIds().get(i);
       // Adjust file column index for ORC struct.
-      // LLAP IO does not support ACID. When it supports, this would be auto 
adjusted.
-      int fileColId =  OrcInputFormat.getRootColumn(!isAcidScan) + 
projectedColId + 1;
+        int fileColId =  OrcInputFormat.getRootColumn(!isAcidScan) + 
projectedColId + 1;
       if (!evolution.isPPDSafeConversion(fileColId)) {
         LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO 
for {}", split);
         return false;
@@ -301,8 +280,8 @@ class LlapRecordReader
   }
 
   @Override
-  public boolean next(NullWritable key, VectorizedRowBatch value) throws 
IOException {
-    assert value != null;
+  public boolean next(NullWritable key, VectorizedRowBatch vrb) throws 
IOException {
+    assert vrb != null;
     if (isClosed) {
       throw new AssertionError("next called after close");
     }
@@ -310,7 +289,7 @@ class LlapRecordReader
     boolean wasFirst = isFirst;
     if (isFirst) {
       if (partitionValues != null) {
-        rbCtx.addPartitionColsToBatch(value, partitionValues);
+        rbCtx.addPartitionColsToBatch(vrb, partitionValues);
       }
       isFirst = false;
     }
@@ -332,59 +311,47 @@ class LlapRecordReader
     }
     final boolean isVectorized = HiveConf.getBoolVar(jobConf,
         HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
-
     if (isAcidScan) {
-      value.selectedInUse = true;
+      vrb.selectedInUse = true;
       if (isVectorized) {
-        final VectorizedRowBatch acidVrb = new 
VectorizedRowBatch(cvb.cols.length);
-        acidVrb.cols = cvb.cols;
-        acidVrb.size = cvb.size;
-        final VectorizedOrcAcidRowBatchReader acidReader =
-            new VectorizedOrcAcidRowBatchReader((OrcSplit)split, jobConf, 
Reporter.NULL,
-                new RecordReader<NullWritable, VectorizedRowBatch>() {
-                  @Override
-                  public boolean next(NullWritable key, VectorizedRowBatch 
value) throws IOException {
-                    return true;
-                  }
-
-                  @Override
-                  public NullWritable createKey() {
-                    return NullWritable.get();
-                  }
-
-                  @Override
-                  public VectorizedRowBatch createValue() {
-                    return acidVrb;
-                  }
-
-                  @Override
-                  public long getPos() throws IOException {
-                    return 0;
-                  }
-
-                  @Override
-                  public void close() throws IOException {
-                  }
-
-                  @Override
-                  public float getProgress() throws IOException {
-                    return 0;
-                  }
-                }, rbCtx);
-        acidReader.next(NullWritable.get(), value);
+        // TODO: relying everywhere on the magical constants and columns being 
together means ACID
+        //       columns are going to be super hard to change in a backward 
compat manner. I can
+        //       foresee someone cursing while refactoring all the magic for 
prefix schema changes.
+        // Exclude the row column.
+        int acidColCount = OrcInputFormat.getRootColumn(false) - 1;
+        VectorizedRowBatch inputVrb = new VectorizedRowBatch(
+            acidColCount + 1 + vrb.getDataColumnCount() );
+        // By assumption, ACID columns are currently always in the beginning 
of the arrays.
+        System.arraycopy(cvb.cols, 0, inputVrb.cols, 0, acidColCount);
+        for (int ixInReadSet = acidColCount; ixInReadSet < cvb.cols.length; 
++ixInReadSet) {
+          int ixInVrb = includes.getPhysicalColumnIds().get(ixInReadSet);
+          // TODO: should we create the batch from vrbctx, and reuse the 
vectors, like below? Future work.
+          inputVrb.cols[ixInVrb] = cvb.cols[ixInReadSet];
+        }
+        inputVrb.size = cvb.size;
+        // TODO: reuse between calls
+        @SuppressWarnings("resource")
+        VectorizedOrcAcidRowBatchReader acidReader = new 
VectorizedOrcAcidRowBatchReader(
+            (OrcSplit)split, jobConf, Reporter.NULL, new 
AcidWrapper(inputVrb), rbCtx, true);
+        acidReader.next(NullWritable.get(), vrb);
+      } else {
+         // TODO: WTF? The old code seems to just drop the ball here.
+        throw new AssertionError("Unsupported mode");
       }
     } else {
-      if (columnCount != cvb.cols.length) {
-        throw new RuntimeException("Unexpected number of columns, VRB has " + 
columnCount
-            + " included, but the reader returned " + cvb.cols.length);
+      if (includes.getPhysicalColumnIds().size() != cvb.cols.length) {
+        throw new RuntimeException("Unexpected number of columns, VRB has "
+            + includes.getPhysicalColumnIds().size() + " included, but the 
reader returned "
+            + cvb.cols.length);
       }
-      // VRB was created from VrbCtx, so we already have pre-allocated column 
vectors
-      for (int i = 0; i < cvb.cols.length; ++i) {
-        // Return old CVs (if any) to caller. We assume these things all have 
the same schema.
-        cvb.swapColumnVector(i, value.cols, columnIds.get(i));
+      // VRB was created from VrbCtx, so we already have pre-allocated column 
vectors.
+      // Return old CVs (if any) to caller. We assume these things all have 
the same schema.
+      for (int ixInReadSet = 0; ixInReadSet < cvb.cols.length; ++ixInReadSet) {
+        int ixInVrb = includes.getPhysicalColumnIds().get(ixInReadSet);
+        cvb.swapColumnVector(ixInReadSet, vrb.cols, ixInVrb);
       }
-      value.selectedInUse = false;
-      value.size = cvb.size;
+      vrb.selectedInUse = false;
+      vrb.size = cvb.size;
     }
 
     if (wasFirst) {
@@ -397,6 +364,44 @@ class LlapRecordReader
     return rbCtx;
   }
 
+  private static final class AcidWrapper
+      implements RecordReader<NullWritable, VectorizedRowBatch> {
+    private final VectorizedRowBatch acidVrb;
+
+    private AcidWrapper(VectorizedRowBatch acidVrb) {
+      this.acidVrb = acidVrb;
+    }
+
+    @Override
+    public boolean next(NullWritable key, VectorizedRowBatch value) throws 
IOException {
+      return true;
+    }
+
+    @Override
+    public NullWritable createKey() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public VectorizedRowBatch createValue() {
+      return acidVrb;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return 0;
+    }
+  }
+
   private final class IOUncaughtExceptionHandler implements 
Thread.UncaughtExceptionHandler {
     @Override
     public void uncaughtException(final Thread t, final Throwable e) {
@@ -528,4 +533,99 @@ class LlapRecordReader
     // TODO: plumb progress info thru the reader if we can get metadata from 
loader first.
     return 0.0f;
   }
-}
+
+  
+  /** This class encapsulates include-related logic for LLAP readers. It is 
not actually specific
+   *  to LLAP IO but in LLAP IO in particular, I want to encapsulate all this 
mess for now until
+   *  we have smth better like Schema Evolution v2. This can also 
hypothetically encapsulate
+   *  field pruning inside structs and stuff like that. */
+  private static class IncludesImpl implements SchemaEvolutionFactory, 
Includes {
+    private List<Integer> readerLogicalColumnIds;
+    private List<Integer> filePhysicalColumnIds;
+    private Integer acidStructColumnId = null;
+
+    // For current schema evolution.
+    private TypeDescription readerSchema;
+    private JobConf jobConf;
+
+    public IncludesImpl(List<Integer> tableIncludedCols, boolean isAcidScan,
+        VectorizedRowBatchCtx rbCtx, TypeDescription readerSchema, JobConf 
jobConf) {
+          // Note: columnIds below makes additional changes for ACID. Don't 
use this var directly.
+      this.readerSchema = readerSchema;
+      this.jobConf = jobConf;
+      if (tableIncludedCols == null) {
+        // Assume including everything means the VRB will have everything.
+        // TODO: this is rather brittle, esp. in view of schema evolution (in 
abstract, not as 
+        //       currently implemented in Hive). The compile should supply the 
columns it expects
+        //       to see, which is not "all, of any schema". Is VRB row CVs the 
right mechanism
+        //       for that? Who knows. Perhaps resolve in schema evolution v2.
+        tableIncludedCols = new 
ArrayList<>(rbCtx.getRowColumnTypeInfos().length);
+        for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) {
+          tableIncludedCols.add(i);
+        }
+      }
+      LOG.debug("Logical table includes: {}", tableIncludedCols);
+      this.readerLogicalColumnIds = tableIncludedCols;
+      // Note: schema evolution currently does not support column index 
changes.
+      //       So, the indices should line up... to be fixed in SE v2?
+      List<Integer> filePhysicalColumnIds = readerLogicalColumnIds;
+      if (isAcidScan) {
+        int rootCol = OrcInputFormat.getRootColumn(false);
+        filePhysicalColumnIds = new 
ArrayList<Integer>(filePhysicalColumnIds.size() + rootCol);
+        this.acidStructColumnId = rootCol - 1; // OrcRecordUpdater.ROW. This 
is somewhat fragile...
+        // Note: this guarantees that physical column IDs are in order.
+        for (int i = 0; i < rootCol; ++i) {
+          // We don't want to include the root struct in ACID case; it would 
cause the whole
+          // struct to get read without projection.
+          if (acidStructColumnId == i) continue;
+          filePhysicalColumnIds.add(i);
+        }
+        for (int tableColumnId : readerLogicalColumnIds) {
+          filePhysicalColumnIds.add(rootCol + tableColumnId);
+        }
+      }
+ 
+      this.filePhysicalColumnIds = filePhysicalColumnIds;
+    }
+
+    @Override
+    public String toString() {
+      return "logical columns " + readerLogicalColumnIds
+          + ", physical columns " + filePhysicalColumnIds;
+    }
+
+    @Override
+    public SchemaEvolution createSchemaEvolution(TypeDescription fileSchema) {
+      if (readerSchema == null) {
+        readerSchema = fileSchema;
+      }
+      // TODO: will this work correctly with ACID?
+      boolean[] readerIncludes = OrcInputFormat.genIncludedColumns(
+          readerSchema, readerLogicalColumnIds);
+      Reader.Options options = new 
Reader.Options(jobConf).include(readerIncludes);
+      return new SchemaEvolution(fileSchema, readerSchema, options);
+    }
+
+    @Override
+    public boolean[] generateFileIncludes(TypeDescription fileSchema) {
+      return OrcInputFormat.genIncludedColumns(
+          fileSchema, filePhysicalColumnIds, acidStructColumnId);
+    }
+
+    @Override
+    public List<Integer> getPhysicalColumnIds() {
+      return filePhysicalColumnIds;
+    }
+
+    @Override
+    public List<Integer> getReaderLogicalColumnIds() {
+      return readerLogicalColumnIds;
+    }
+
+    @Override
+    public TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema) {
+      return OrcInputFormat.genIncludedTypes(
+          fileSchema, filePhysicalColumnIds, acidStructColumnId);
+    }
+  }
+} 

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
index 2a2be56..a830c07 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
@@ -34,14 +34,25 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.SchemaEvolution;
 
 /**
  * Entry point used by LlapInputFormat to create read pipeline to get data.
  */
 public interface ColumnVectorProducer {
+  public interface SchemaEvolutionFactory {
+    SchemaEvolution createSchemaEvolution(TypeDescription fileSchema);
+  }
+
+  public interface Includes {
+    boolean[] generateFileIncludes(TypeDescription fileSchema);
+    List<Integer> getPhysicalColumnIds();
+    List<Integer> getReaderLogicalColumnIds();
+    TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema);
+  }
+
   ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, 
FileSplit split,
-      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
-      QueryFragmentCounters counters, TypeDescription readerSchema,
-      InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter 
reporter,
-      JobConf job, Map<Path, PartitionDesc> parts) throws IOException;
+      Includes includes, SearchArgument sarg, QueryFragmentCounters counters,
+      SchemaEvolutionFactory sef, InputFormat<?, ?> sourceInputFormat, 
Deserializer sourceSerDe,
+      Reporter reporter, JobConf job, Map<Path, PartitionDesc> parts) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
index d66e2f2..7af1b05 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes;
 import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader;
 import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
@@ -81,13 +82,12 @@ public class GenericColumnVectorProducer implements 
ColumnVectorProducer {
 
   @Override
   public ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, 
FileSplit split,
-      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
-      QueryFragmentCounters counters, TypeDescription schema, InputFormat<?, 
?> sourceInputFormat,
-      Deserializer sourceSerDe, Reporter reporter, JobConf job, Map<Path, 
PartitionDesc> parts)
-          throws IOException {
+      Includes includes, SearchArgument sarg, QueryFragmentCounters counters,
+      SchemaEvolutionFactory sef, InputFormat<?, ?> sourceInputFormat, 
Deserializer sourceSerDe,
+      Reporter reporter, JobConf job, Map<Path, PartitionDesc> parts) throws 
IOException {
     cacheMetrics.incrCacheReadRequests();
     OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
-        consumer, columnIds.size(), false, counters, ioMetrics);
+        consumer, includes, false, counters, ioMetrics);
     SerDeFileMetadata fm;
     try {
       fm = new SerDeFileMetadata(sourceSerDe);
@@ -97,13 +97,10 @@ public class GenericColumnVectorProducer implements 
ColumnVectorProducer {
     edc.setFileMetadata(fm);
     // Note that we pass job config to the record reader, but use global 
config for LLAP IO.
     // TODO: add tracing to serde reader
-    SerDeEncodedDataReader reader = new SerDeEncodedDataReader(cache,
-        bufferManager, conf, split, columnIds, edc, job, reporter, 
sourceInputFormat,
+    SerDeEncodedDataReader reader = new SerDeEncodedDataReader(cache, 
bufferManager, conf,
+        split, includes.getPhysicalColumnIds(), edc, job, reporter, 
sourceInputFormat,
         sourceSerDe, counters, fm.getSchema(), parts);
     edc.init(reader, reader, new IoTrace(0, false));
-    if (LlapIoImpl.LOG.isDebugEnabled()) {
-      LlapIoImpl.LOG.debug("Ignoring schema: " + schema);
-    }
     return edc;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 3a7b192..2a0c5ca 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -85,16 +85,15 @@ public class OrcColumnVectorProducer implements 
ColumnVectorProducer {
 
   @Override
   public ReadPipeline createReadPipeline(
-      Consumer<ColumnVectorBatch> consumer, FileSplit split, List<Integer> 
columnIds,
-      SearchArgument sarg, String[] columnNames, QueryFragmentCounters 
counters,
-      TypeDescription readerSchema, InputFormat<?, ?> unused0, Deserializer 
unused1,
-      Reporter reporter, JobConf job, Map<Path, PartitionDesc> unused2) throws 
IOException {
+      Consumer<ColumnVectorBatch> consumer, FileSplit split, Includes includes,
+      SearchArgument sarg, QueryFragmentCounters counters, 
SchemaEvolutionFactory sef,
+      InputFormat<?, ?> unused0, Deserializer unused1, Reporter reporter, 
JobConf job,
+      Map<Path, PartitionDesc> unused2) throws IOException {
     cacheMetrics.incrCacheReadRequests();
-    OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, 
columnIds.size(),
-        _skipCorrupt, counters, ioMetrics);
-    OrcEncodedDataReader reader = new OrcEncodedDataReader(
-        lowLevelCache, bufferManager, metadataCache, conf, job, split, 
columnIds, sarg,
-        columnNames, edc, counters, readerSchema, tracePool);
+    OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
+        consumer, includes, _skipCorrupt, counters, ioMetrics);
+    OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, 
bufferManager,
+        metadataCache, conf, job, split, includes, sarg, edc, counters, sef, 
tracePool);
     edc.init(reader, reader, reader.getTrace());
     return edc;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 36810d9..9e8ae10 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes;
 import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
@@ -62,21 +63,22 @@ import org.apache.orc.OrcProto;
 public class OrcEncodedDataConsumer
   extends EncodedDataConsumer<OrcBatchKey, OrcEncodedColumnBatch> {
   private TreeReaderFactory.TreeReader[] columnReaders;
-  private int[] columnMapping; // Mapping from columnReaders (by index) to 
columns in file schema.
   private int previousStripeIndex = -1;
   private ConsumerFileMetadata fileMetadata; // We assume one request is only 
for one file.
   private CompressionCodec codec;
   private List<ConsumerStripeMetadata> stripes;
   private final boolean skipCorrupt; // TODO: get rid of this
   private final QueryFragmentCounters counters;
-  private boolean[] includedColumns;
   private SchemaEvolution evolution;
   private IoTrace trace;
+  private final Includes includes;
+  private TypeDescription[] batchSchemas;
 
   public OrcEncodedDataConsumer(
-      Consumer<ColumnVectorBatch> consumer, int colCount, boolean skipCorrupt,
+      Consumer<ColumnVectorBatch> consumer, Includes includes, boolean 
skipCorrupt,
       QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) {
-    super(consumer, colCount, ioMetrics);
+    super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics);
+    this.includes = includes;
     // TODO: get rid of this
     this.skipCorrupt = skipCorrupt;
     this.counters = counters;
@@ -120,20 +122,10 @@ public class OrcEncodedDataConsumer
       }
       int maxBatchesRG = (int) ((nonNullRowCount / 
VectorizedRowBatch.DEFAULT_SIZE) + 1);
       int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
-      TypeDescription schema = fileMetadata.getSchema();
+      TypeDescription fileSchema = fileMetadata.getSchema();
 
       if (columnReaders == null || !sameStripe) {
-        int[] columnMapping = new int[schema.getChildren().size()];
-        TreeReaderFactory.Context context =
-            new TreeReaderFactory.ReaderContext()
-                .setSchemaEvolution(evolution)
-                .writerTimeZone(stripeMetadata.getWriterTimezone())
-                .skipCorrupt(skipCorrupt);
-        StructTreeReader treeReader = 
EncodedTreeReaderFactory.createRootTreeReader(
-            schema, stripeMetadata.getEncodings(), batch, codec, context, 
columnMapping);
-        this.columnReaders = treeReader.getChildReaders();
-        this.columnMapping = Arrays.copyOf(columnMapping, 
columnReaders.length);
-        positionInStreams(columnReaders, batch.getBatchKey(), stripeMetadata);
+        createColumnReaders(batch, stripeMetadata, fileSchema);
       } else {
         repositionInStreams(this.columnReaders, batch, sameStripe, 
stripeMetadata);
       }
@@ -154,8 +146,7 @@ public class OrcEncodedDataConsumer
           if (cvb.cols[idx] == null) {
             // Orc store rows inside a root struct (hive writes it this way).
             // When we populate column vectors we skip over the root struct.
-            cvb.cols[idx] = 
createColumn(schema.getChildren().get(columnMapping[idx]),
-                VectorizedRowBatch.DEFAULT_SIZE);
+            cvb.cols[idx] = createColumn(batchSchemas[idx], 
VectorizedRowBatch.DEFAULT_SIZE);
           }
           trace.logTreeReaderNextVector(idx);
 
@@ -214,6 +205,25 @@ public class OrcEncodedDataConsumer
     }
   }
 
+  private void createColumnReaders(OrcEncodedColumnBatch batch,
+      ConsumerStripeMetadata stripeMetadata, TypeDescription fileSchema) 
throws IOException {
+    TreeReaderFactory.Context context = new TreeReaderFactory.ReaderContext()
+            .setSchemaEvolution(evolution).skipCorrupt(skipCorrupt)
+            .writerTimeZone(stripeMetadata.getWriterTimezone());
+    this.batchSchemas = includes.getBatchReaderTypes(fileSchema);
+    StructTreeReader treeReader = 
EncodedTreeReaderFactory.createRootTreeReader(
+        batchSchemas, stripeMetadata.getEncodings(), batch, codec, context);
+    this.columnReaders = treeReader.getChildReaders();
+
+    if (LlapIoImpl.LOG.isDebugEnabled()) {
+      for (int i = 0; i < columnReaders.length; ++i) {
+        LlapIoImpl.LOG.debug("Created a reader at " + i + ": " + 
columnReaders[i]
+            + " from schema " + batchSchemas[i]);
+      }
+    }
+    positionInStreams(columnReaders, batch.getBatchKey(), stripeMetadata);
+  }
+
   private ColumnVector createColumn(TypeDescription type, int batchSize) {
     switch (type.getCategory()) {
       case BOOLEAN:
@@ -344,15 +354,6 @@ public class OrcEncodedDataConsumer
     return rowIndexEntry.getStatistics().getNumberOfValues();
   }
 
-  @Override
-  public boolean[] getIncludedColumns() {
-    return includedColumns;
-  }
-
-  public void setIncludedColumns(final boolean[] includedColumns) {
-    this.includedColumns = includedColumns;
-  }
-
   public void setSchemaEvolution(SchemaEvolution evolution) {
     this.evolution = evolution;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
index 06708d3..05b1429 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
@@ -27,5 +27,4 @@ import org.apache.orc.impl.SchemaEvolution;
 public interface ReadPipeline extends ConsumerFeedback<ColumnVectorBatch> {
   public Callable<Void> getReadCallable();
   SchemaEvolution getSchemaEvolution();
-  boolean[] getIncludedColumns();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 9219d28..65b2d34 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -67,6 +67,8 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes;
+import 
org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory;
 import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
@@ -79,6 +81,7 @@ import org.apache.orc.CompressionKind;
 import org.apache.orc.DataReader;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
+import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.orc.OrcConf;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
@@ -154,9 +157,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   private final BufferUsageManager bufferManager;
   private final Configuration daemonConf, jobConf;
   private final FileSplit split;
-  private List<Integer> includedColumnIds;
   private final SearchArgument sarg;
-  private final String[] columnNames;
   private final OrcEncodedDataConsumer consumer;
   private final QueryFragmentCounters counters;
   private final UserGroupInformation ugi;
@@ -184,27 +185,21 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   @SuppressWarnings("unused")
   private volatile boolean isPaused = false;
 
-  boolean[] readerIncludes = null, sargColumns = null, fileIncludes = null;
+  boolean[] sargColumns = null, fileIncludes = null;
   private final IoTrace trace;
   private Pool<IoTrace> tracePool;
 
   public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager 
bufferManager,
       MetadataCache metadataCache, Configuration daemonConf, Configuration 
jobConf,
-      FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] 
columnNames,
-      OrcEncodedDataConsumer consumer, QueryFragmentCounters counters,
-      TypeDescription readerSchema, Pool<IoTrace> tracePool)
+      FileSplit split, Includes includes, SearchArgument sarg, 
OrcEncodedDataConsumer consumer,
+      QueryFragmentCounters counters, SchemaEvolutionFactory sef, 
Pool<IoTrace> tracePool)
           throws IOException {
     this.lowLevelCache = lowLevelCache;
     this.metadataCache = metadataCache;
     this.bufferManager = bufferManager;
     this.daemonConf = daemonConf;
     this.split = split;
-    this.includedColumnIds = columnIds;
-    if (this.includedColumnIds != null) {
-      Collections.sort(this.includedColumnIds);
-    }
     this.sarg = sarg;
-    this.columnNames = columnNames;
     this.consumer = consumer;
     this.counters = counters;
     this.trace = tracePool.take();
@@ -216,8 +211,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     }
     this.useCodecPool = HiveConf.getBoolVar(daemonConf, 
ConfVars.HIVE_ORC_CODEC_POOL);
 
-    // moved this part of code from performDataRead as LlapInputFormat need to 
know the file schema
-    // to decide if schema evolution is supported or not.
+    // LlapInputFormat needs to know the file schema to decide if schema 
evolution is supported.
     orcReader = null;
     // 1. Get file metadata from cache, or create the reader and read it.
     // Don't cache the filesystem object for now; Tez closes it and FS cache 
will fix all that
@@ -227,15 +221,12 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         HiveConf.getBoolVar(daemonConf, 
ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID));
     fileMetadata = getFileFooterFromCacheOrDisk();
     final TypeDescription fileSchema = fileMetadata.getSchema();
-    if (readerSchema == null) {
-      readerSchema = fileMetadata.getSchema();
-    }
-    readerIncludes = OrcInputFormat.genIncludedColumns(readerSchema, 
includedColumnIds);
-    if (AcidUtils.isFullAcidScan(jobConf)) {
-      fileIncludes = OrcInputFormat.shiftReaderIncludedForAcid(readerIncludes);
-    } else {
-      fileIncludes = OrcInputFormat.genIncludedColumns(fileSchema, 
includedColumnIds);
+
+    fileIncludes = includes.generateFileIncludes(fileSchema);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("From {}, the file includes are {}", includes, 
DebugUtils.toString(fileIncludes));
     }
+
     // Do not allow users to override zero-copy setting. The rest can be taken 
from user config.
     boolean useZeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
     if (useZeroCopy != OrcConf.USE_ZEROCOPY.getBoolean(jobConf)) {
@@ -243,10 +234,9 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       jobConf.setBoolean(OrcConf.USE_ZEROCOPY.getAttribute(), useZeroCopy);
     }
     this.jobConf = jobConf;
-    Reader.Options options = new 
Reader.Options(jobConf).include(readerIncludes);
-    evolution = new SchemaEvolution(fileMetadata.getSchema(), readerSchema, 
options);
+    // TODO: setFileMetadata could just create schema. Called in two places; 
clean up later.
+    this.evolution = sef.createSchemaEvolution(fileMetadata.getSchema());
     consumer.setFileMetadata(fileMetadata);
-    consumer.setIncludedColumns(readerIncludes);
     consumer.setSchemaEvolution(evolution);
   }
 
@@ -290,9 +280,6 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         + (fileKey == null ? "" : " (" + fileKey + ")"));
     try {
       validateFileMetadata();
-      if (includedColumnIds == null) {
-        includedColumnIds = getAllColumnIds(fileMetadata);
-      }
 
       // 2. Determine which stripes to read based on the split.
       determineStripesToRead();
@@ -316,12 +303,14 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     try {
       if (sarg != null && stride != 0) {
         // TODO: move this to a common method
+        // Note: this gets IDs by name, so we assume indices don't need to be 
adjusted for ACID.
         int[] filterColumns = 
RecordReaderImpl.mapSargColumnsToOrcInternalColIdx(
           sarg.getLeaves(), evolution);
         // included will not be null, row options will fill the array with 
trues if null
         sargColumns = new boolean[evolution.getFileSchema().getMaximumId() + 
1];
         for (int i : filterColumns) {
           // filter columns may have -1 as index which could be partition 
column in SARG.
+          // TODO: should this then be >=?
           if (i > 0) {
             sargColumns[i] = true;
           }
@@ -332,7 +321,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       }
 
       // Now, apply SARG if any; w/o sarg, this will just initialize stripeRgs.
-      boolean hasData = determineRgsToRead(fileIncludes, stride, 
stripeMetadatas);
+      boolean hasData = determineRgsToRead(stride, stripeMetadatas);
       if (!hasData) {
         consumer.setDone();
         recordReaderTime(startTime);
@@ -530,19 +519,6 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   }
 
   /**
-   * Puts all column indexes from metadata to make a column list to read all 
column.
-   */
-  private static List<Integer> getAllColumnIds(OrcFileMetadata metadata) {
-    int rootColumn = OrcInputFormat.getRootColumn(true);
-    List<Integer> types = 
metadata.getTypes().get(rootColumn).getSubtypesList();
-    List<Integer> columnIds = new ArrayList<Integer>(types.size());
-    for (int i = 0; i < types.size(); ++i) {
-      columnIds.add(i);
-    }
-    return columnIds;
-  }
-
-  /**
    * Closes the stripe readers (on error).
    */
   private void cleanupReaders() {
@@ -822,7 +798,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
    * Determines which RGs need to be read, after stripes have been determined.
    * SARG is applied, and readState is populated for each stripe accordingly.
    */
-  private boolean determineRgsToRead(boolean[] globalIncludes, int 
rowIndexStride,
+  private boolean determineRgsToRead(int rowIndexStride,
       ArrayList<OrcStripeMetadata> metadata) throws IOException {
     RecordReaderImpl.SargApplier sargApp = null;
     if (sarg != null && rowIndexStride != 0) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 166abf7..0cb1828 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -948,7 +948,8 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
 
     OrcEncodedColumnBatch ecb = ECB_POOL.take();
     ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, 
writerIncludes.length);
-    for (int colIx = 0; colIx < writerIncludes.length; ++colIx) {
+    // Skip the 0th column that is the root structure.
+    for (int colIx = 1; colIx < writerIncludes.length; ++colIx) {
       if (!writerIncludes[colIx]) continue;
       ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
       if (!hasAllData && splitIncludes[colIx]) {
@@ -1035,18 +1036,15 @@ public class SerDeEncodedDataReader extends 
CallableWithNdc<Void>
     ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, 
writerIncludes.length);
     int vectorsIx = 0;
     for (int colIx = 0; colIx < writerIncludes.length; ++colIx) {
+      // Skip the 0-th column, since it won't have a vector after reading the 
text source.
+      if (colIx == 0) continue;
       if (!writerIncludes[colIx]) continue;
       if (splitIncludes[colIx]) {
-        // Skip the 0-th column, since it won't have a vector after reading 
the text source.
-        if (colIx != 0 ) {
-          List<ColumnVector> vectors = diskData.getVectors(vectorsIx++);
-          if (LlapIoImpl.LOG.isTraceEnabled()) {
-            LlapIoImpl.LOG.trace("Processing vectors for column " + colIx + ": 
" + vectors);
-          }
-          ecb.initColumnWithVectors(colIx, vectors);
-        } else {
-          ecb.initColumn(0, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+        List<ColumnVector> vectors = diskData.getVectors(vectorsIx++);
+        if (LlapIoImpl.LOG.isTraceEnabled()) {
+          LlapIoImpl.LOG.trace("Processing vectors for column " + colIx + ": " 
+ vectors);
         }
+        ecb.initColumnWithVectors(colIx, vectors);
       } else {
         ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
         processColumnCacheData(cacheBuffers, ecb, colIx);

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index 681d9ca..f429308 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 
 /**
@@ -112,7 +113,6 @@ public class VectorExtractRow {
    */
   public void init(StructObjectInspector structObjectInspector, List<Integer> 
projectedColumns)
       throws HiveException {
-
     List<? extends StructField> fields = 
structObjectInspector.getAllStructFieldRefs();
     final int count = fields.size();
     allocateArrays(count);
@@ -125,7 +125,6 @@ public class VectorExtractRow {
       ObjectInspector fieldInspector = field.getFieldObjectInspector();
       TypeInfo typeInfo =
           
TypeInfoUtils.getTypeInfoFromTypeString(fieldInspector.getTypeName());
-
       initEntry(i, projectionColumnNum, typeInfo);
     }
   }
@@ -148,7 +147,8 @@ public class VectorExtractRow {
    * Initialize using data type names.
    * No projection -- the column range 0 .. types.size()-1
    */
-  public void init(List<String> typeNames) throws HiveException {
+  @VisibleForTesting
+  void init(List<String> typeNames) throws HiveException {
 
     final int count = typeNames.size();
     allocateArrays(count);

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
index 59b3ae9..22d2f34 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index e956485..cf0d013 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import org.apache.hadoop.hive.ql.plan.DynamicValue.NoDynamicValuesException;
-
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
@@ -30,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -342,10 +342,15 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     return false;
   }
 
-
+  
   public static boolean[] genIncludedColumns(TypeDescription readerSchema,
                                              List<Integer> included) {
+    return genIncludedColumns(readerSchema, included, null);
+  }
 
+  public static boolean[] genIncludedColumns(TypeDescription readerSchema,
+                                             List<Integer> included,
+                                             Integer recursiveStruct) {
     boolean[] result = new boolean[readerSchema.getMaximumId() + 1];
     if (included == null) {
       Arrays.fill(result, true);
@@ -355,15 +360,53 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     List<TypeDescription> children = readerSchema.getChildren();
     for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) 
{
       if (included.contains(columnNumber)) {
-        TypeDescription child = children.get(columnNumber);
-        for(int col = child.getId(); col <= child.getMaximumId(); ++col) {
-          result[col] = true;
+        addColumnToIncludes(children.get(columnNumber), result);
+      } else if (recursiveStruct != null && recursiveStruct == columnNumber) {
+        // This assumes all struct cols immediately follow struct
+        List<TypeDescription> nestedChildren = 
children.get(columnNumber).getChildren();
+        for (int columnNumberDelta = 0; columnNumberDelta < 
nestedChildren.size(); ++columnNumberDelta) {
+          int columnNumberNested = columnNumber + 1 + columnNumberDelta;
+          if (included.contains(columnNumberNested)) {
+            addColumnToIncludes(nestedChildren.get(columnNumberDelta), result);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+
+  // Mostly dup of genIncludedColumns
+  public static TypeDescription[] genIncludedTypes(TypeDescription fileSchema,
+      List<Integer> included, Integer recursiveStruct) {
+    TypeDescription[] result = new TypeDescription[included.size()];
+    List<TypeDescription> children = fileSchema.getChildren();
+    for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) 
{
+      int indexInBatchCols = included.indexOf(columnNumber);
+      if (indexInBatchCols >= 0) {
+        result[indexInBatchCols] = children.get(columnNumber);
+      } else if (recursiveStruct != null && recursiveStruct == columnNumber) {
+        // This assumes all struct cols immediately follow struct
+        List<TypeDescription> nestedChildren = 
children.get(columnNumber).getChildren();
+        for (int columnNumberDelta = 0; columnNumberDelta < 
nestedChildren.size(); ++columnNumberDelta) {
+          int columnNumberNested = columnNumber + 1 + columnNumberDelta;
+          int nestedIxInBatchCols = included.indexOf(columnNumberNested);
+          if (nestedIxInBatchCols >= 0) {
+            result[nestedIxInBatchCols] = 
nestedChildren.get(columnNumberDelta);
+          }
         }
       }
     }
     return result;
   }
 
+
+  private static void addColumnToIncludes(TypeDescription child, boolean[] 
result) {
+    for(int col = child.getId(); col <= child.getMaximumId(); ++col) {
+      result[col] = true;
+    }
+  }
+
   /**
    * Reverses genIncludedColumns; produces the table columns indexes from ORC 
included columns.
    * @param readerSchema The ORC reader schema for the table.

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index e296351..77736ee 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -72,6 +72,7 @@ public class VectorizedOrcAcidRowBatchReader
   protected float progress = 0.0f;
   protected Object[] partitionValues;
   private boolean addPartitionCols = true;
+  private final boolean isFlatPayload;
   private final ValidWriteIdList validWriteIdList;
   private final DeleteEventRegistry deleteEventRegistry;
   /**
@@ -104,7 +105,8 @@ public class VectorizedOrcAcidRowBatchReader
   @VisibleForTesting
   VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf,
         Reporter reporter, VectorizedRowBatchCtx rbCtx) throws IOException {
-    this(conf, inputSplit, reporter, rbCtx == null ? 
Utilities.getVectorizedRowBatchCtx(conf) : rbCtx);
+    this(conf, inputSplit, reporter,
+        rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx, 
false);
 
     final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, 
(OrcSplit) inputSplit);
     // Careful with the range here now, we do not want to read the whole base 
file like deltas.
@@ -143,20 +145,22 @@ public class VectorizedOrcAcidRowBatchReader
     };
     this.vectorizedRowBatchBase = ((RecordReaderImpl) 
innerReader).createRowBatch();
   }
+
   /**
    * LLAP IO c'tor
    */
   public VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, 
Reporter reporter,
       org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> 
baseReader,
-      VectorizedRowBatchCtx rbCtx) throws IOException {
-    this(conf, inputSplit, reporter, rbCtx);
+      VectorizedRowBatchCtx rbCtx, boolean isFlatPayload) throws IOException {
+    this(conf, inputSplit, reporter, rbCtx, isFlatPayload);
     this.baseReader = baseReader;
     this.innerReader = null;
     this.vectorizedRowBatchBase = baseReader.createValue();
   }
 
   private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, 
Reporter reporter,
-      VectorizedRowBatchCtx rowBatchCtx) throws IOException {
+      VectorizedRowBatchCtx rowBatchCtx, boolean isFlatPayload) throws 
IOException {
+    this.isFlatPayload = isFlatPayload;
     this.rbCtx = rowBatchCtx;
     final boolean isAcidRead = AcidUtils.isFullAcidScan(conf);
     final AcidUtils.AcidOperationalProperties acidOperationalProperties
@@ -206,13 +210,13 @@ public class VectorizedOrcAcidRowBatchReader
     }
     this.deleteEventRegistry = der;
     isOriginal = orcSplit.isOriginal();
-    if(isOriginal) {
+    if (isOriginal) {
       recordIdColumnVector = new 
StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
         new LongColumnVector(), new LongColumnVector(), new 
LongColumnVector());
-    }
-    else {
-      //will swap in the Vectors from underlying row batch
-      recordIdColumnVector = new 
StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null);
+    } else {
+      // Will swap in the Vectors from underlying row batch.
+      recordIdColumnVector = new StructColumnVector(
+          VectorizedRowBatch.DEFAULT_SIZE, null, null, null);
     }
     rowIdProjected = areRowIdsProjected(rbCtx);
     rootPath = orcSplit.getRootDir();
@@ -412,73 +416,10 @@ public class VectorizedOrcAcidRowBatchReader
       selectedBitSet.set(0, vectorizedRowBatchBase.size, true);
     }
     ColumnVector[] innerRecordIdColumnVector = vectorizedRowBatchBase.cols;
-    if(isOriginal) {
-      /*
-       * If there are deletes and reading original file, we must produce 
synthetic ROW_IDs in order
-       * to see if any deletes apply
-       */
-      boolean needSyntheticRowId =
-          needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), 
rowIdProjected);
-      if(needSyntheticRowId) {
-        assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" 
+ syntheticProps;
-        assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : 
"" + syntheticProps;
-        if(innerReader == null) {
-          throw new IllegalStateException(getClass().getName() + " requires " +
-            org.apache.orc.RecordReader.class +
-            " to handle original files that require ROW__IDs: " + rootPath);
-        }
-        /**
-         * {@link RecordIdentifier#getWriteId()}
-         */
-        recordIdColumnVector.fields[0].noNulls = true;
-        recordIdColumnVector.fields[0].isRepeating = true;
-        ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 
syntheticProps.syntheticTxnId;
-        /**
-         * This is {@link RecordIdentifier#getBucketProperty()}
-         * Also see {@link BucketCodec}
-         */
-        recordIdColumnVector.fields[1].noNulls = true;
-        recordIdColumnVector.fields[1].isRepeating = true;
-        ((LongColumnVector)recordIdColumnVector.fields[1]).vector[0] = 
syntheticProps.bucketProperty;
-        /**
-         * {@link RecordIdentifier#getRowId()}
-         */
-        recordIdColumnVector.fields[2].noNulls = true;
-        recordIdColumnVector.fields[2].isRepeating = false;
-        long[] rowIdVector = 
((LongColumnVector)recordIdColumnVector.fields[2]).vector;
-        for(int i = 0; i < vectorizedRowBatchBase.size; i++) {
-          //baseReader.getRowNumber() seems to point at the start of the batch 
todo: validate
-          rowIdVector[i] = syntheticProps.rowIdOffset + 
innerReader.getRowNumber() + i;
-        }
-        //Now populate a structure to use to apply delete events
-        innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS];
-        innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_WRITEID] = 
recordIdColumnVector.fields[0];
-        innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = 
recordIdColumnVector.fields[1];
-        innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = 
recordIdColumnVector.fields[2];
-        //these are insert events so (original txn == current) txn for all rows
-        innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_WRITEID] = 
recordIdColumnVector.fields[0];
-      }
-      if(syntheticProps.syntheticTxnId > 0) {
-        //"originals" (written before table was converted to acid) is 
considered written by
-        // txnid:0 which is always committed so there is no need to check wrt 
invalid transactions
-        //But originals written by Load Data for example can be in base_x or 
delta_x_x so we must
-        //check if 'x' is committed or not evn if ROW_ID is not needed in the 
Operator pipeline.
-        if (needSyntheticRowId) {
-          findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector,
-              vectorizedRowBatchBase.size, selectedBitSet);
-        } else {
-          /*since ROW_IDs are not needed we didn't create the ColumnVectors to 
hold them but we
-          * still have to check if the data being read is committed as far as 
current
-          * reader (transactions) is concerned.  Since here we are reading 
'original' schema file,
-          * all rows in it have been created by the same txn, namely 
'syntheticProps.syntheticTxnId'
-          */
-          if (!validWriteIdList.isWriteIdValid(syntheticProps.syntheticTxnId)) 
{
-            selectedBitSet.clear(0, vectorizedRowBatchBase.size);
-          }
-        }
-      }
-    }
-    else {
+    if (isOriginal) {
+      // Handle synthetic row IDs for the original files.
+      innerRecordIdColumnVector = handleOriginalFile(selectedBitSet, 
innerRecordIdColumnVector);
+    } else {
       // Case 1- find rows which belong to transactions that are not valid.
       findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, 
selectedBitSet);
     }
@@ -505,29 +446,103 @@ public class VectorizedOrcAcidRowBatchReader
       }
     }
 
-    if(isOriginal) {
-     /*Just copy the payload.  {@link recordIdColumnVector} has already been 
populated*/
-      System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0,
-        value.getDataColumnCount());
-    }
-    else {
-      // Finally, link up the columnVector from the base VectorizedRowBatch to 
outgoing batch.
-      StructColumnVector payloadStruct = (StructColumnVector) 
vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
-      // Transfer columnVector objects from base batch to outgoing batch.
-      System.arraycopy(payloadStruct.fields, 0, value.cols, 0, 
value.getDataColumnCount());
-      if(rowIdProjected) {
+    if (isOriginal) {
+     /* Just copy the payload.  {@link recordIdColumnVector} has already been 
populated */
+      System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, 
value.getDataColumnCount());
+    } else {
+      int payloadCol = OrcRecordUpdater.ROW;
+      if (isFlatPayload) {
+        // Ignore the struct column and just copy all the following data 
columns.
+        System.arraycopy(vectorizedRowBatchBase.cols, payloadCol + 1, 
value.cols, 0,
+            vectorizedRowBatchBase.cols.length - payloadCol - 1);
+      } else {
+        StructColumnVector payloadStruct =
+            (StructColumnVector) vectorizedRowBatchBase.cols[payloadCol];
+        // Transfer columnVector objects from base batch to outgoing batch.
+        System.arraycopy(payloadStruct.fields, 0, value.cols, 0, 
value.getDataColumnCount());
+      }
+      if (rowIdProjected) {
         recordIdColumnVector.fields[0] = 
vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_WRITEID];
         recordIdColumnVector.fields[1] = 
vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET];
         recordIdColumnVector.fields[2] = 
vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID];
       }
     }
-    if(rowIdProjected) {
+    if (rowIdProjected) {
       rbCtx.setRecordIdColumnVector(recordIdColumnVector);
     }
     progress = baseReader.getProgress();
     return true;
   }
 
+  private ColumnVector[] handleOriginalFile(
+      BitSet selectedBitSet, ColumnVector[] innerRecordIdColumnVector) throws 
IOException {
+    /*
+     * If there are deletes and reading original file, we must produce 
synthetic ROW_IDs in order
+     * to see if any deletes apply
+     */
+    boolean needSyntheticRowId =
+        needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), 
rowIdProjected);
+    if(needSyntheticRowId) {
+      assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" + 
syntheticProps;
+      assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : "" 
+ syntheticProps;
+      if(innerReader == null) {
+        throw new IllegalStateException(getClass().getName() + " requires " +
+          org.apache.orc.RecordReader.class +
+          " to handle original files that require ROW__IDs: " + rootPath);
+      }
+      /**
+       * {@link RecordIdentifier#getWriteId()}
+       */
+      recordIdColumnVector.fields[0].noNulls = true;
+      recordIdColumnVector.fields[0].isRepeating = true;
+      ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 
syntheticProps.syntheticTxnId;
+      /**
+       * This is {@link RecordIdentifier#getBucketProperty()}
+       * Also see {@link BucketCodec}
+       */
+      recordIdColumnVector.fields[1].noNulls = true;
+      recordIdColumnVector.fields[1].isRepeating = true;
+      ((LongColumnVector)recordIdColumnVector.fields[1]).vector[0] = 
syntheticProps.bucketProperty;
+      /**
+       * {@link RecordIdentifier#getRowId()}
+       */
+      recordIdColumnVector.fields[2].noNulls = true;
+      recordIdColumnVector.fields[2].isRepeating = false;
+      long[] rowIdVector = 
((LongColumnVector)recordIdColumnVector.fields[2]).vector;
+      for(int i = 0; i < vectorizedRowBatchBase.size; i++) {
+        //baseReader.getRowNumber() seems to point at the start of the batch 
todo: validate
+        rowIdVector[i] = syntheticProps.rowIdOffset + 
innerReader.getRowNumber() + i;
+      }
+      //Now populate a structure to use to apply delete events
+      innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS];
+      innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_WRITEID] = 
recordIdColumnVector.fields[0];
+      innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = 
recordIdColumnVector.fields[1];
+      innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = 
recordIdColumnVector.fields[2];
+      //these are insert events so (original txn == current) txn for all rows
+      innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_WRITEID] = 
recordIdColumnVector.fields[0];
+    }
+    if(syntheticProps.syntheticTxnId > 0) {
+      //"originals" (written before table was converted to acid) is considered 
written by
+      // txnid:0 which is always committed so there is no need to check wrt 
invalid transactions
+      //But originals written by Load Data for example can be in base_x or 
delta_x_x so we must
+      //check if 'x' is committed or not evn if ROW_ID is not needed in the 
Operator pipeline.
+      if (needSyntheticRowId) {
+        findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector,
+            vectorizedRowBatchBase.size, selectedBitSet);
+      } else {
+        /*since ROW_IDs are not needed we didn't create the ColumnVectors to 
hold them but we
+        * still have to check if the data being read is committed as far as 
current
+        * reader (transactions) is concerned.  Since here we are reading 
'original' schema file,
+        * all rows in it have been created by the same txn, namely 
'syntheticProps.syntheticTxnId'
+        */
+        if (!validWriteIdList.isWriteIdValid(syntheticProps.syntheticTxnId)) {
+          selectedBitSet.clear(0, vectorizedRowBatchBase.size);
+        }
+      }
+    }
+    return innerRecordIdColumnVector;
+  }
+
   private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, 
BitSet selectedBitSet) {
     findRecordsWithInvalidTransactionIds(batch.cols, batch.size, 
selectedBitSet);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
index c1e55c7..f6b949e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java
@@ -34,14 +34,14 @@ public interface EncodedReader {
    * @param index Externally provided metadata (from metadata reader or 
external cache).
    * @param encodings Externally provided metadata (from metadata reader or 
external cache).
    * @param streams Externally provided metadata (from metadata reader or 
external cache).
-   * @param included The array of booleans indicating whether each column 
should be read.
+   * @param physicalFileIncludes The array of booleans indicating whether each 
column should be read.
    * @param colRgs Arrays of rgs, per column set to true in included, that are 
to be read.
    *               null in each respective position means all rgs for this 
column need to be read.
    * @param consumer The sink for data that has been read.
    */
   void readEncodedColumns(int stripeIx, StripeInformation stripe,
       OrcProto.RowIndex[] index, List<OrcProto.ColumnEncoding> encodings,
-      List<OrcProto.Stream> streams, boolean[] included, boolean[] rgs,
+      List<OrcProto.Stream> streams, boolean[] physicalFileIncludes, boolean[] 
rgs,
       Consumer<OrcEncodedColumnBatch> consumer) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 893a2bb..9ee1229 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -100,6 +100,8 @@ import sun.misc.Cleaner;
  * 6) Given that RG end boundaries in ORC are estimates, we can request data 
from cache and then
  *    not use it; thus, at the end we go thru all the MBs, and release those 
not released by (5).
  */
+// Note: this thing should know nothing about ACID or schema. It reads 
physical columns by index;
+//       schema evolution/ACID schema considerations should be on higher level.
 class EncodedReaderImpl implements EncodedReader {
   public static final Logger LOG = 
LoggerFactory.getLogger(EncodedReaderImpl.class);
   private static Field cleanerField;
@@ -282,7 +284,7 @@ class EncodedReaderImpl implements EncodedReader {
   @Override
   public void readEncodedColumns(int stripeIx, StripeInformation stripe,
       OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings,
-      List<OrcProto.Stream> streamList, boolean[] included, boolean[] rgs,
+      List<OrcProto.Stream> streamList, boolean[] physicalFileIncludes, 
boolean[] rgs,
       Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
     // Note: for now we don't have to setError here, caller will setError if 
we throw.
     // We are also not supposed to call setDone, since we are only part of the 
operation.
@@ -298,11 +300,11 @@ class EncodedReaderImpl implements EncodedReader {
     // We assume stream list is sorted by column and that non-data
     // streams do not interleave data streams for the same column.
     // 1.2. With that in mind, determine disk ranges to read/get from cache 
(not by stream).
-    ColumnReadContext[] colCtxs = new ColumnReadContext[included.length];
+    ColumnReadContext[] colCtxs = new 
ColumnReadContext[physicalFileIncludes.length];
     int colRgIx = -1;
     // Don't create context for the 0-s column.
-    for (int i = 1; i < included.length; ++i) {
-      if (!included[i]) continue;
+    for (int i = 1; i < physicalFileIncludes.length; ++i) {
+      if (!physicalFileIncludes[i]) continue;
       ColumnEncoding enc = encodings.get(i);
       colCtxs[i] = new ColumnReadContext(i, enc, indexes[i], ++colRgIx);
       if (isTracingEnabled) {
@@ -316,10 +318,10 @@ class EncodedReaderImpl implements EncodedReader {
       long length = stream.getLength();
       int colIx = stream.getColumn();
       OrcProto.Stream.Kind streamKind = stream.getKind();
-      if (!included[colIx] || StreamName.getArea(streamKind) != 
StreamName.Area.DATA) {
+      if (!physicalFileIncludes[colIx] || StreamName.getArea(streamKind) != 
StreamName.Area.DATA) {
         // We have a stream for included column, but in future it might have 
no data streams.
         // It's more like "has at least one column included that has an index 
stream".
-        hasIndexOnlyCols = hasIndexOnlyCols || included[colIx];
+        hasIndexOnlyCols = hasIndexOnlyCols || physicalFileIncludes[colIx];
         if (isTracingEnabled) {
           LOG.trace("Skipping stream for column " + colIx + ": "
               + streamKind + " at " + offset + ", " + length);
@@ -358,7 +360,7 @@ class EncodedReaderImpl implements EncodedReader {
       // TODO: there may be a bug here. Could there be partial RG filtering on 
index-only column?
       if (hasIndexOnlyCols && (rgs == null)) {
         OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
-        ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, 
included.length);
+        ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, 
physicalFileIncludes.length);
         try {
           consumer.consumeData(ecb);
         } catch (InterruptedException e) {
@@ -400,7 +402,7 @@ class EncodedReaderImpl implements EncodedReader {
         trace.logStartRg(rgIx);
         boolean hasErrorForEcb = true;
         try {
-          ecb.init(fileKey, stripeIx, rgIx, included.length);
+          ecb.init(fileKey, stripeIx, rgIx, physicalFileIncludes.length);
           for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
             ColumnReadContext ctx = colCtxs[colIx];
             if (ctx == null) continue; // This column is not included.
@@ -1829,21 +1831,21 @@ class EncodedReaderImpl implements EncodedReader {
 
   @Override
   public void readIndexStreams(OrcIndex index, StripeInformation stripe,
-      List<OrcProto.Stream> streams, boolean[] included, boolean[] sargColumns)
+      List<OrcProto.Stream> streams, boolean[] physicalFileIncludes, boolean[] 
sargColumns)
           throws IOException {
     long stripeOffset = stripe.getOffset();
-    DiskRangeList indexRanges = planIndexReading(
-        fileSchema, streams, true, included, sargColumns, version, 
index.getBloomFilterKinds());
+    DiskRangeList indexRanges = planIndexReading(fileSchema, streams, true, 
physicalFileIncludes,
+        sargColumns, version, index.getBloomFilterKinds());
     if (indexRanges == null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Nothing to read for stripe [" + stripe + "]");
       }
       return;
     }
-    ReadContext[] colCtxs = new ReadContext[included.length];
+    ReadContext[] colCtxs = new ReadContext[physicalFileIncludes.length];
     int colRgIx = -1;
-    for (int i = 0; i < included.length; ++i) {
-      if (!included[i] && (sargColumns == null || !sargColumns[i])) continue;
+    for (int i = 0; i < physicalFileIncludes.length; ++i) {
+      if (!physicalFileIncludes[i] && (sargColumns == null || 
!sargColumns[i])) continue;
       colCtxs[i] = new ReadContext(i, ++colRgIx);
       if (isTracingEnabled) {
         LOG.trace("Creating context: " + colCtxs[i].toString());
@@ -1858,7 +1860,7 @@ class EncodedReaderImpl implements EncodedReader {
       // See planIndexReading - only read non-row-index streams if involved in 
SARGs.
       if ((StreamName.getArea(streamKind) == StreamName.Area.INDEX)
           && ((sargColumns != null && sargColumns[colIx])
-              || (included[colIx] && streamKind == Kind.ROW_INDEX))) {
+              || (physicalFileIncludes[colIx] && streamKind == 
Kind.ROW_INDEX))) {
         trace.logAddStream(colIx, streamKind, offset, length, -1, true);
         colCtxs[colIx].addStream(offset, stream, -1);
         if (isTracingEnabled) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index 1e7708e..42532f9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
 import org.apache.orc.impl.RunLengthByteReader;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
+import org.apache.curator.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import 
org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -2099,35 +2101,27 @@ public class EncodedTreeReaderFactory extends 
TreeReaderFactory {
     }
   }
 
-  public static StructTreeReader createRootTreeReader(TypeDescription schema,
+  public static StructTreeReader createRootTreeReader(TypeDescription[] 
batchSchemas,
        List<OrcProto.ColumnEncoding> encodings, OrcEncodedColumnBatch batch,
-       CompressionCodec codec, TreeReaderFactory.Context context, int[] 
columnMapping)
-           throws IOException {
-    if (schema.getCategory() != Category.STRUCT) {
-      throw new AssertionError("Schema is not a struct: " + schema);
-    }
-    // Some child types may be excluded. Note that this can only happen at 
root level.
-    List<TypeDescription> children = schema.getChildren();
-    int childCount = children.size(), includedCount = 0;
-    for (int childIx = 0; childIx < childCount; ++childIx) {
-      int batchColIx = children.get(childIx).getId();
+       CompressionCodec codec, TreeReaderFactory.Context context) throws 
IOException {
+    // Note: we only look at the schema here to deal with complex types. 
Somebody has set up the
+    //       reader with whatever ideas they had to the schema and we just 
trust the reader to
+    //       produce the CVBs that was asked for. However, we only need to 
look at top level columns.
+    int includedCount = batch.getColumnsWithDataCount();
+    if (batchSchemas.length > includedCount) {
+      throw new AssertionError("For " + Arrays.toString(batchSchemas) + ", 
only received "
+          + includedCount + " columns");
+    }
+    TreeReader[] childReaders = new TreeReader[batchSchemas.length];
+    for (int i = 0; i < batchSchemas.length; ++i) {
+      int batchColIx = batchSchemas[i].getId();
       if (!batch.hasData(batchColIx) && !batch.hasVectors(batchColIx)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Column at " + childIx + " " + 
children.get(childIx).getId()
-              + ":" + children.get(childIx).toString() + " has no data");
-        }
-        continue;
+        throw new AssertionError("No data for column " + batchColIx + ": " + 
batchSchemas[i]);
       }
-      ++includedCount;
-    }
-    TreeReader[] childReaders = new TreeReader[includedCount];
-    for (int schemaChildIx = 0, inclChildIx = -1; schemaChildIx < childCount; 
++schemaChildIx) {
-      int batchColIx = children.get(schemaChildIx).getId();
-      if (!batch.hasData(batchColIx) && !batch.hasVectors(batchColIx)) 
continue;
-      childReaders[++inclChildIx] = createEncodedTreeReader(
-          schema.getChildren().get(schemaChildIx), encodings, batch, codec, 
context);
-      columnMapping[inclChildIx] = schemaChildIx;
+      childReaders[i] = createEncodedTreeReader(batchSchemas[i], encodings, 
batch, codec, context);
     }
+
+    // TODO: do we actually need this reader? the caller just extracts child 
readers.
     return StructStreamReader.builder()
         .setColumnIndex(0)
         .setCompressionCodec(codec)

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 50d10e3..025600b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -105,11 +105,10 @@ public interface Reader extends 
org.apache.hadoop.hive.ql.io.orc.Reader {
       if (columnVectors != null && columnCount == columnVectors.length) {
         Arrays.fill(columnVectors, null);
         return;
-      } if (columnVectors != null) {
-        columnVectors = new List[columnCount];
-      } else {
-        columnVectors = null;
       }
+      if (columnVectors != null) {
+        columnVectors = new List[columnCount];
+      } // else just keep it null
     }
 
     public boolean hasVectors(int colIx) {
@@ -120,5 +119,14 @@ public interface Reader extends 
org.apache.hadoop.hive.ql.io.orc.Reader {
       if (!hasVectors(colIx)) throw new AssertionError("No data for column " + 
colIx);
       return columnVectors[colIx];
     }
+
+    public int getColumnsWithDataCount() {
+      int childCount = hasData.length, result = 0;
+      for (int childIx = 0; childIx < childCount; ++childIx) {
+        if (!hasData(childIx) && !hasVectors(childIx)) continue;
+        ++result;
+      }
+      return result;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/test/queries/clientpositive/llap_acid2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/llap_acid2.q 
b/ql/src/test/queries/clientpositive/llap_acid2.q
new file mode 100644
index 0000000..76f6203
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/llap_acid2.q
@@ -0,0 +1,84 @@
+set hive.mapred.mode=nonstrict;
+SET hive.vectorized.execution.enabled=true;
+
+SET hive.llap.io.enabled=false;
+
+SET hive.exec.orc.default.buffer.size=32768;
+SET hive.exec.orc.default.row.index.stride=1000;
+SET hive.optimize.index.filter=true;
+set hive.fetch.task.conversion=none;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+DROP TABLE orc_llap;
+
+CREATE TABLE orc_llap (
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cint0 INT,
+    cbigint0 BIGINT,
+    cfloat0 FLOAT,
+    cdouble0 DOUBLE,
+    cint1 INT,
+    cbigint1 BIGINT,
+    cfloat1 FLOAT,
+    cdouble1 DOUBLE,
+    cstring1 string,
+    cfloat2 float
+)  stored as orc TBLPROPERTIES ('transactional'='true');
+
+
+insert into table orc_llap
+select cint, cbigint, cfloat, cdouble,
+ cint as c1, cbigint as c2, cfloat as c3, cdouble as c4,
+ cint as c8, cbigint as c7, cfloat as c6, cdouble as c5,
+ cstring1, cfloat as c9 from alltypesorc order by cdouble asc  limit 30;
+
+
+
+
+
+CREATE TABLE orc_llap2 (
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cint0 INT,
+    cbigint0 BIGINT,
+    cfloat0 FLOAT,
+    cdouble0 DOUBLE,
+    cint1 INT,
+    cbigint1 BIGINT,
+    cfloat1 FLOAT,
+    cdouble1 DOUBLE,
+    cstring1 string,
+    cfloat2 float
+)  stored as orc TBLPROPERTIES ('transactional'='false');
+
+insert into table orc_llap2
+select cint, cbigint, cfloat, cdouble,
+ cint as c1, cbigint as c2, cfloat as c3, cdouble as c4,
+ cint as c8, cbigint as c7, cfloat as c6, cdouble as c5,
+ cstring1, cfloat as c9 from alltypesorc order by cdouble asc  limit 30;
+
+alter table orc_llap2 set TBLPROPERTIES ('transactional'='true');
+
+update orc_llap2 set cstring1 = 'testvalue' where cstring1 = 'N016jPED08o';
+
+
+SET hive.llap.io.enabled=true;
+
+select cstring1 from orc_llap;
+select cfloat2, cint from orc_llap;
+select * from orc_llap;
+
+select cstring1 from orc_llap2;
+select cfloat2, cint from orc_llap2;
+select * from orc_llap2;
+
+
+DROP TABLE orc_llap;

Reply via email to