HIVE-18738 : LLAP IO ACID - includes handling is broken (Sergey Shelukhin, reviewed by Teddy Choi)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1a3090f8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1a3090f8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1a3090f8 Branch: refs/heads/master Commit: 1a3090f858ce3ac18d1eb799d87a45b3ee0defce Parents: a4198f5 Author: sergey <ser...@apache.org> Authored: Fri Mar 2 18:49:59 2018 -0800 Committer: sergey <ser...@apache.org> Committed: Fri Mar 2 18:49:59 2018 -0800 ---------------------------------------------------------------------- .../test/resources/testconfiguration.properties | 1 + .../llap/io/api/impl/ColumnVectorBatch.java | 42 ++ .../hive/llap/io/api/impl/LlapInputFormat.java | 9 +- .../hive/llap/io/api/impl/LlapRecordReader.java | 286 +++++++++----- .../llap/io/decode/ColumnVectorProducer.java | 19 +- .../io/decode/GenericColumnVectorProducer.java | 17 +- .../llap/io/decode/OrcColumnVectorProducer.java | 17 +- .../llap/io/decode/OrcEncodedDataConsumer.java | 55 +-- .../hive/llap/io/decode/ReadPipeline.java | 1 - .../llap/io/encoded/OrcEncodedDataReader.java | 60 +-- .../llap/io/encoded/SerDeEncodedDataReader.java | 18 +- .../hive/ql/exec/vector/VectorExtractRow.java | 6 +- .../ql/exec/vector/VectorSelectOperator.java | 1 + .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 53 ++- .../io/orc/VectorizedOrcAcidRowBatchReader.java | 191 ++++----- .../hive/ql/io/orc/encoded/EncodedReader.java | 4 +- .../ql/io/orc/encoded/EncodedReaderImpl.java | 32 +- .../orc/encoded/EncodedTreeReaderFactory.java | 44 +-- .../hadoop/hive/ql/io/orc/encoded/Reader.java | 16 +- ql/src/test/queries/clientpositive/llap_acid2.q | 84 ++++ .../clientpositive/llap/llap_acid2.q.out | 392 +++++++++++++++++++ .../common/io/encoded/EncodedColumnBatch.java | 9 +- 22 files changed, 1013 insertions(+), 344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 2776fe9..544c836 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -576,6 +576,7 @@ minillaplocal.query.files=\ lineage2.q,\ lineage3.q,\ list_bucket_dml_10.q,\ + llap_acid2.q,\ llap_partitioned.q,\ llap_vector_nohybridgrace.q,\ load_data_acid_rename.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java index 9262bf0..19b0b55 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java @@ -43,4 +43,46 @@ public class ColumnVectorBatch { other[otherIx] = cols[ix]; cols[ix] = old; } + + + @Override + public String toString() { + if (size == 0) { + return ""; + } + StringBuilder b = new StringBuilder(); + b.append("Column vector types: "); + for (int k = 0; k < cols.length; k++) { + ColumnVector cv = cols[k]; + b.append(k); + b.append(":"); + b.append(cv == null ? "null" : cv.getClass().getSimpleName().replace("ColumnVector", "")); + } + b.append('\n'); + + + for (int i = 0; i < size; i++) { + b.append('['); + for (int k = 0; k < cols.length; k++) { + ColumnVector cv = cols[k]; + if (k > 0) { + b.append(", "); + } + if (cv == null) continue; + if (cv != null) { + try { + cv.stringifyValue(b, i); + } catch (Exception ex) { + b.append("invalid"); + } + } + } + b.append(']'); + if (i < size - 1) { + b.append('\n'); + } + } + + return b.toString(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index bb319f0..6d29163 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -100,9 +100,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB FileSplit fileSplit = (FileSplit) split; reporter.setStatus(fileSplit.toString()); try { - List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job) + // At this entry point, we are going to assume that these are logical table columns. + // Perhaps we should go thru the code and clean this up to be more explicit; for now, we + // will start with this single assumption and maintain clear semantics from here. + List<Integer> tableIncludedCols = ColumnProjectionUtils.isReadAllColumns(job) ? null : ColumnProjectionUtils.getReadColumnIDs(job); - LlapRecordReader rr = LlapRecordReader.create(job, fileSplit, includedCols, hostName, + LlapRecordReader rr = LlapRecordReader.create(job, fileSplit, tableIncludedCols, hostName, cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf); if (rr == null) { // Reader-specific incompatibility like SMB or schema evolution. @@ -111,7 +114,7 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB // For non-vectorized operator case, wrap the reader if possible. RecordReader<NullWritable, VectorizedRowBatch> result = rr; if (!Utilities.getIsVectorized(job)) { - result = wrapLlapReader(includedCols, rr, split); + result = wrapLlapReader(tableIncludedCols, rr, split); if (result == null) { // Cannot wrap a reader for non-vectorized pipeline. return sourceInputFormat.getRecordReader(split, job, reporter); http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index a69c9a0..3a2c19a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -44,15 +46,16 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; @@ -70,17 +73,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> { + private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class); private static final Object DONE_OBJECT = new Object(); private final FileSplit split; - private List<Integer> columnIds; + private final IncludesImpl includes; private final SearchArgument sarg; - private final String[] columnNames; private final VectorizedRowBatchCtx rbCtx; private final Object[] partitionValues; @@ -100,21 +104,19 @@ class LlapRecordReader private final JobConf jobConf; private final ReadPipeline rp; private final ExecutorService executor; - private final int columnCount; private final boolean isAcidScan; /** * Creates the record reader and checks the input-specific compatibility. * @return The reader if the split can be read, null otherwise. */ - public static LlapRecordReader create(JobConf job, FileSplit split, List<Integer> includedCols, - String hostName, ColumnVectorProducer cvp, ExecutorService executor, - InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, - Configuration daemonConf) - throws IOException, HiveException { + public static LlapRecordReader create(JobConf job, FileSplit split, + List<Integer> tableIncludedCols, String hostName, ColumnVectorProducer cvp, + ExecutorService executor, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, + Reporter reporter, Configuration daemonConf) throws IOException, HiveException { MapWork mapWork = findMapWork(job); if (mapWork == null) return null; // No compatible MapWork. - LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, includedCols, hostName, + LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, tableIncludedCols, hostName, cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf); if (!rr.checkOrcSchemaEvolution()) { rr.close(); @@ -124,7 +126,7 @@ class LlapRecordReader } private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, - List<Integer> includedCols, String hostName, ColumnVectorProducer cvp, + List<Integer> tableIncludedCols, String hostName, ColumnVectorProducer cvp, ExecutorService executor, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, Configuration daemonConf) throws IOException, HiveException { this.executor = executor; @@ -132,7 +134,6 @@ class LlapRecordReader this.split = split; this.sarg = ConvertAstToSearchArg.createFromConf(job); - this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); final String fragmentId = LlapTezUtils.getFragmentId(job); final String dagId = LlapTezUtils.getDagId(job); final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID); @@ -152,32 +153,11 @@ class LlapRecordReader VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx(); rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork); - // Note: columnIds below makes additional changes for ACID. Don't use this var directly. - if (includedCols == null) { - // Assume including everything means the VRB will have everything. - includedCols = new ArrayList<>(rbCtx.getRowColumnTypeInfos().length); - for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) { - includedCols.add(i); - } - } - isAcidScan = AcidUtils.isFullAcidScan(jobConf); TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr( job, isAcidScan, Integer.MAX_VALUE); - if (isAcidScan) { - this.columnIds = new ArrayList<>(); - final int ACID_FIELDS = OrcInputFormat.getRootColumn(false); - for (int i = 0; i < ACID_FIELDS; i++) { - columnIds.add(i); - } - for (int i = 0; i < includedCols.size(); i++) { - columnIds.add(i + ACID_FIELDS); - } - this.columnCount = columnIds.size(); - } else { - this.columnIds = includedCols; - this.columnCount = columnIds.size(); - } + + this.includes = new IncludesImpl(tableIncludedCols, isAcidScan, rbCtx, schema, job); int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, job, daemonConf); int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf); @@ -195,9 +175,8 @@ class LlapRecordReader } // Create the consumer of encoded data; it will coordinate decoding to CVBs. - feedback = rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, - counters, schema, sourceInputFormat, sourceSerDe, reporter, job, - mapWork.getPathToPartitionInfo()); + feedback = rp = cvp.createReadPipeline(this, split, includes, sarg, counters, includes, + sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo()); } private static int getQueueVar(ConfVars var, JobConf jobConf, Configuration daemonConf) { @@ -287,11 +266,11 @@ class LlapRecordReader private boolean checkOrcSchemaEvolution() { SchemaEvolution evolution = rp.getSchemaEvolution(); - for (int i = 0; i < columnCount; ++i) { - int projectedColId = columnIds == null ? i : columnIds.get(i); + // TODO: should this just use physical IDs? + for (int i = 0; i < includes.getReaderLogicalColumnIds().size(); ++i) { + int projectedColId = includes.getReaderLogicalColumnIds().get(i); // Adjust file column index for ORC struct. - // LLAP IO does not support ACID. When it supports, this would be auto adjusted. - int fileColId = OrcInputFormat.getRootColumn(!isAcidScan) + projectedColId + 1; + int fileColId = OrcInputFormat.getRootColumn(!isAcidScan) + projectedColId + 1; if (!evolution.isPPDSafeConversion(fileColId)) { LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split); return false; @@ -301,8 +280,8 @@ class LlapRecordReader } @Override - public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { - assert value != null; + public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException { + assert vrb != null; if (isClosed) { throw new AssertionError("next called after close"); } @@ -310,7 +289,7 @@ class LlapRecordReader boolean wasFirst = isFirst; if (isFirst) { if (partitionValues != null) { - rbCtx.addPartitionColsToBatch(value, partitionValues); + rbCtx.addPartitionColsToBatch(vrb, partitionValues); } isFirst = false; } @@ -332,59 +311,47 @@ class LlapRecordReader } final boolean isVectorized = HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); - if (isAcidScan) { - value.selectedInUse = true; + vrb.selectedInUse = true; if (isVectorized) { - final VectorizedRowBatch acidVrb = new VectorizedRowBatch(cvb.cols.length); - acidVrb.cols = cvb.cols; - acidVrb.size = cvb.size; - final VectorizedOrcAcidRowBatchReader acidReader = - new VectorizedOrcAcidRowBatchReader((OrcSplit)split, jobConf, Reporter.NULL, - new RecordReader<NullWritable, VectorizedRowBatch>() { - @Override - public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { - return true; - } - - @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public VectorizedRowBatch createValue() { - return acidVrb; - } - - @Override - public long getPos() throws IOException { - return 0; - } - - @Override - public void close() throws IOException { - } - - @Override - public float getProgress() throws IOException { - return 0; - } - }, rbCtx); - acidReader.next(NullWritable.get(), value); + // TODO: relying everywhere on the magical constants and columns being together means ACID + // columns are going to be super hard to change in a backward compat manner. I can + // foresee someone cursing while refactoring all the magic for prefix schema changes. + // Exclude the row column. + int acidColCount = OrcInputFormat.getRootColumn(false) - 1; + VectorizedRowBatch inputVrb = new VectorizedRowBatch( + acidColCount + 1 + vrb.getDataColumnCount() ); + // By assumption, ACID columns are currently always in the beginning of the arrays. + System.arraycopy(cvb.cols, 0, inputVrb.cols, 0, acidColCount); + for (int ixInReadSet = acidColCount; ixInReadSet < cvb.cols.length; ++ixInReadSet) { + int ixInVrb = includes.getPhysicalColumnIds().get(ixInReadSet); + // TODO: should we create the batch from vrbctx, and reuse the vectors, like below? Future work. + inputVrb.cols[ixInVrb] = cvb.cols[ixInReadSet]; + } + inputVrb.size = cvb.size; + // TODO: reuse between calls + @SuppressWarnings("resource") + VectorizedOrcAcidRowBatchReader acidReader = new VectorizedOrcAcidRowBatchReader( + (OrcSplit)split, jobConf, Reporter.NULL, new AcidWrapper(inputVrb), rbCtx, true); + acidReader.next(NullWritable.get(), vrb); + } else { + // TODO: WTF? The old code seems to just drop the ball here. + throw new AssertionError("Unsupported mode"); } } else { - if (columnCount != cvb.cols.length) { - throw new RuntimeException("Unexpected number of columns, VRB has " + columnCount - + " included, but the reader returned " + cvb.cols.length); + if (includes.getPhysicalColumnIds().size() != cvb.cols.length) { + throw new RuntimeException("Unexpected number of columns, VRB has " + + includes.getPhysicalColumnIds().size() + " included, but the reader returned " + + cvb.cols.length); } - // VRB was created from VrbCtx, so we already have pre-allocated column vectors - for (int i = 0; i < cvb.cols.length; ++i) { - // Return old CVs (if any) to caller. We assume these things all have the same schema. - cvb.swapColumnVector(i, value.cols, columnIds.get(i)); + // VRB was created from VrbCtx, so we already have pre-allocated column vectors. + // Return old CVs (if any) to caller. We assume these things all have the same schema. + for (int ixInReadSet = 0; ixInReadSet < cvb.cols.length; ++ixInReadSet) { + int ixInVrb = includes.getPhysicalColumnIds().get(ixInReadSet); + cvb.swapColumnVector(ixInReadSet, vrb.cols, ixInVrb); } - value.selectedInUse = false; - value.size = cvb.size; + vrb.selectedInUse = false; + vrb.size = cvb.size; } if (wasFirst) { @@ -397,6 +364,44 @@ class LlapRecordReader return rbCtx; } + private static final class AcidWrapper + implements RecordReader<NullWritable, VectorizedRowBatch> { + private final VectorizedRowBatch acidVrb; + + private AcidWrapper(VectorizedRowBatch acidVrb) { + this.acidVrb = acidVrb; + } + + @Override + public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + return true; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + return acidVrb; + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + } + + @Override + public float getProgress() throws IOException { + return 0; + } + } + private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(final Thread t, final Throwable e) { @@ -528,4 +533,99 @@ class LlapRecordReader // TODO: plumb progress info thru the reader if we can get metadata from loader first. return 0.0f; } -} + + + /** This class encapsulates include-related logic for LLAP readers. It is not actually specific + * to LLAP IO but in LLAP IO in particular, I want to encapsulate all this mess for now until + * we have smth better like Schema Evolution v2. This can also hypothetically encapsulate + * field pruning inside structs and stuff like that. */ + private static class IncludesImpl implements SchemaEvolutionFactory, Includes { + private List<Integer> readerLogicalColumnIds; + private List<Integer> filePhysicalColumnIds; + private Integer acidStructColumnId = null; + + // For current schema evolution. + private TypeDescription readerSchema; + private JobConf jobConf; + + public IncludesImpl(List<Integer> tableIncludedCols, boolean isAcidScan, + VectorizedRowBatchCtx rbCtx, TypeDescription readerSchema, JobConf jobConf) { + // Note: columnIds below makes additional changes for ACID. Don't use this var directly. + this.readerSchema = readerSchema; + this.jobConf = jobConf; + if (tableIncludedCols == null) { + // Assume including everything means the VRB will have everything. + // TODO: this is rather brittle, esp. in view of schema evolution (in abstract, not as + // currently implemented in Hive). The compile should supply the columns it expects + // to see, which is not "all, of any schema". Is VRB row CVs the right mechanism + // for that? Who knows. Perhaps resolve in schema evolution v2. + tableIncludedCols = new ArrayList<>(rbCtx.getRowColumnTypeInfos().length); + for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) { + tableIncludedCols.add(i); + } + } + LOG.debug("Logical table includes: {}", tableIncludedCols); + this.readerLogicalColumnIds = tableIncludedCols; + // Note: schema evolution currently does not support column index changes. + // So, the indices should line up... to be fixed in SE v2? + List<Integer> filePhysicalColumnIds = readerLogicalColumnIds; + if (isAcidScan) { + int rootCol = OrcInputFormat.getRootColumn(false); + filePhysicalColumnIds = new ArrayList<Integer>(filePhysicalColumnIds.size() + rootCol); + this.acidStructColumnId = rootCol - 1; // OrcRecordUpdater.ROW. This is somewhat fragile... + // Note: this guarantees that physical column IDs are in order. + for (int i = 0; i < rootCol; ++i) { + // We don't want to include the root struct in ACID case; it would cause the whole + // struct to get read without projection. + if (acidStructColumnId == i) continue; + filePhysicalColumnIds.add(i); + } + for (int tableColumnId : readerLogicalColumnIds) { + filePhysicalColumnIds.add(rootCol + tableColumnId); + } + } + + this.filePhysicalColumnIds = filePhysicalColumnIds; + } + + @Override + public String toString() { + return "logical columns " + readerLogicalColumnIds + + ", physical columns " + filePhysicalColumnIds; + } + + @Override + public SchemaEvolution createSchemaEvolution(TypeDescription fileSchema) { + if (readerSchema == null) { + readerSchema = fileSchema; + } + // TODO: will this work correctly with ACID? + boolean[] readerIncludes = OrcInputFormat.genIncludedColumns( + readerSchema, readerLogicalColumnIds); + Reader.Options options = new Reader.Options(jobConf).include(readerIncludes); + return new SchemaEvolution(fileSchema, readerSchema, options); + } + + @Override + public boolean[] generateFileIncludes(TypeDescription fileSchema) { + return OrcInputFormat.genIncludedColumns( + fileSchema, filePhysicalColumnIds, acidStructColumnId); + } + + @Override + public List<Integer> getPhysicalColumnIds() { + return filePhysicalColumnIds; + } + + @Override + public List<Integer> getReaderLogicalColumnIds() { + return readerLogicalColumnIds; + } + + @Override + public TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema) { + return OrcInputFormat.genIncludedTypes( + fileSchema, filePhysicalColumnIds, acidStructColumnId); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java index 2a2be56..a830c07 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java @@ -34,14 +34,25 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.SchemaEvolution; /** * Entry point used by LlapInputFormat to create read pipeline to get data. */ public interface ColumnVectorProducer { + public interface SchemaEvolutionFactory { + SchemaEvolution createSchemaEvolution(TypeDescription fileSchema); + } + + public interface Includes { + boolean[] generateFileIncludes(TypeDescription fileSchema); + List<Integer> getPhysicalColumnIds(); + List<Integer> getReaderLogicalColumnIds(); + TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema); + } + ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split, - List<Integer> columnIds, SearchArgument sarg, String[] columnNames, - QueryFragmentCounters counters, TypeDescription readerSchema, - InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, - JobConf job, Map<Path, PartitionDesc> parts) throws IOException; + Includes includes, SearchArgument sarg, QueryFragmentCounters counters, + SchemaEvolutionFactory sef, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, + Reporter reporter, JobConf job, Map<Path, PartitionDesc> parts) throws IOException; } http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java index d66e2f2..7af1b05 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes; import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader; import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata; @@ -81,13 +82,12 @@ public class GenericColumnVectorProducer implements ColumnVectorProducer { @Override public ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split, - List<Integer> columnIds, SearchArgument sarg, String[] columnNames, - QueryFragmentCounters counters, TypeDescription schema, InputFormat<?, ?> sourceInputFormat, - Deserializer sourceSerDe, Reporter reporter, JobConf job, Map<Path, PartitionDesc> parts) - throws IOException { + Includes includes, SearchArgument sarg, QueryFragmentCounters counters, + SchemaEvolutionFactory sef, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, + Reporter reporter, JobConf job, Map<Path, PartitionDesc> parts) throws IOException { cacheMetrics.incrCacheReadRequests(); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( - consumer, columnIds.size(), false, counters, ioMetrics); + consumer, includes, false, counters, ioMetrics); SerDeFileMetadata fm; try { fm = new SerDeFileMetadata(sourceSerDe); @@ -97,13 +97,10 @@ public class GenericColumnVectorProducer implements ColumnVectorProducer { edc.setFileMetadata(fm); // Note that we pass job config to the record reader, but use global config for LLAP IO. // TODO: add tracing to serde reader - SerDeEncodedDataReader reader = new SerDeEncodedDataReader(cache, - bufferManager, conf, split, columnIds, edc, job, reporter, sourceInputFormat, + SerDeEncodedDataReader reader = new SerDeEncodedDataReader(cache, bufferManager, conf, + split, includes.getPhysicalColumnIds(), edc, job, reporter, sourceInputFormat, sourceSerDe, counters, fm.getSchema(), parts); edc.init(reader, reader, new IoTrace(0, false)); - if (LlapIoImpl.LOG.isDebugEnabled()) { - LlapIoImpl.LOG.debug("Ignoring schema: " + schema); - } return edc; } http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 3a7b192..2a0c5ca 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -85,16 +85,15 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer { @Override public ReadPipeline createReadPipeline( - Consumer<ColumnVectorBatch> consumer, FileSplit split, List<Integer> columnIds, - SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, - TypeDescription readerSchema, InputFormat<?, ?> unused0, Deserializer unused1, - Reporter reporter, JobConf job, Map<Path, PartitionDesc> unused2) throws IOException { + Consumer<ColumnVectorBatch> consumer, FileSplit split, Includes includes, + SearchArgument sarg, QueryFragmentCounters counters, SchemaEvolutionFactory sef, + InputFormat<?, ?> unused0, Deserializer unused1, Reporter reporter, JobConf job, + Map<Path, PartitionDesc> unused2) throws IOException { cacheMetrics.incrCacheReadRequests(); - OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(), - _skipCorrupt, counters, ioMetrics); - OrcEncodedDataReader reader = new OrcEncodedDataReader( - lowLevelCache, bufferManager, metadataCache, conf, job, split, columnIds, sarg, - columnNames, edc, counters, readerSchema, tracePool); + OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( + consumer, includes, _skipCorrupt, counters, ioMetrics); + OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, + metadataCache, conf, job, split, includes, sarg, edc, counters, sef, tracePool); edc.init(reader, reader, reader.getTrace()); return edc; } http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 36810d9..9e8ae10 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes; import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; @@ -62,21 +63,22 @@ import org.apache.orc.OrcProto; public class OrcEncodedDataConsumer extends EncodedDataConsumer<OrcBatchKey, OrcEncodedColumnBatch> { private TreeReaderFactory.TreeReader[] columnReaders; - private int[] columnMapping; // Mapping from columnReaders (by index) to columns in file schema. private int previousStripeIndex = -1; private ConsumerFileMetadata fileMetadata; // We assume one request is only for one file. private CompressionCodec codec; private List<ConsumerStripeMetadata> stripes; private final boolean skipCorrupt; // TODO: get rid of this private final QueryFragmentCounters counters; - private boolean[] includedColumns; private SchemaEvolution evolution; private IoTrace trace; + private final Includes includes; + private TypeDescription[] batchSchemas; public OrcEncodedDataConsumer( - Consumer<ColumnVectorBatch> consumer, int colCount, boolean skipCorrupt, + Consumer<ColumnVectorBatch> consumer, Includes includes, boolean skipCorrupt, QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) { - super(consumer, colCount, ioMetrics); + super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics); + this.includes = includes; // TODO: get rid of this this.skipCorrupt = skipCorrupt; this.counters = counters; @@ -120,20 +122,10 @@ public class OrcEncodedDataConsumer } int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1); int batchSize = VectorizedRowBatch.DEFAULT_SIZE; - TypeDescription schema = fileMetadata.getSchema(); + TypeDescription fileSchema = fileMetadata.getSchema(); if (columnReaders == null || !sameStripe) { - int[] columnMapping = new int[schema.getChildren().size()]; - TreeReaderFactory.Context context = - new TreeReaderFactory.ReaderContext() - .setSchemaEvolution(evolution) - .writerTimeZone(stripeMetadata.getWriterTimezone()) - .skipCorrupt(skipCorrupt); - StructTreeReader treeReader = EncodedTreeReaderFactory.createRootTreeReader( - schema, stripeMetadata.getEncodings(), batch, codec, context, columnMapping); - this.columnReaders = treeReader.getChildReaders(); - this.columnMapping = Arrays.copyOf(columnMapping, columnReaders.length); - positionInStreams(columnReaders, batch.getBatchKey(), stripeMetadata); + createColumnReaders(batch, stripeMetadata, fileSchema); } else { repositionInStreams(this.columnReaders, batch, sameStripe, stripeMetadata); } @@ -154,8 +146,7 @@ public class OrcEncodedDataConsumer if (cvb.cols[idx] == null) { // Orc store rows inside a root struct (hive writes it this way). // When we populate column vectors we skip over the root struct. - cvb.cols[idx] = createColumn(schema.getChildren().get(columnMapping[idx]), - VectorizedRowBatch.DEFAULT_SIZE); + cvb.cols[idx] = createColumn(batchSchemas[idx], VectorizedRowBatch.DEFAULT_SIZE); } trace.logTreeReaderNextVector(idx); @@ -214,6 +205,25 @@ public class OrcEncodedDataConsumer } } + private void createColumnReaders(OrcEncodedColumnBatch batch, + ConsumerStripeMetadata stripeMetadata, TypeDescription fileSchema) throws IOException { + TreeReaderFactory.Context context = new TreeReaderFactory.ReaderContext() + .setSchemaEvolution(evolution).skipCorrupt(skipCorrupt) + .writerTimeZone(stripeMetadata.getWriterTimezone()); + this.batchSchemas = includes.getBatchReaderTypes(fileSchema); + StructTreeReader treeReader = EncodedTreeReaderFactory.createRootTreeReader( + batchSchemas, stripeMetadata.getEncodings(), batch, codec, context); + this.columnReaders = treeReader.getChildReaders(); + + if (LlapIoImpl.LOG.isDebugEnabled()) { + for (int i = 0; i < columnReaders.length; ++i) { + LlapIoImpl.LOG.debug("Created a reader at " + i + ": " + columnReaders[i] + + " from schema " + batchSchemas[i]); + } + } + positionInStreams(columnReaders, batch.getBatchKey(), stripeMetadata); + } + private ColumnVector createColumn(TypeDescription type, int batchSize) { switch (type.getCategory()) { case BOOLEAN: @@ -344,15 +354,6 @@ public class OrcEncodedDataConsumer return rowIndexEntry.getStatistics().getNumberOfValues(); } - @Override - public boolean[] getIncludedColumns() { - return includedColumns; - } - - public void setIncludedColumns(final boolean[] includedColumns) { - this.includedColumns = includedColumns; - } - public void setSchemaEvolution(SchemaEvolution evolution) { this.evolution = evolution; } http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java index 06708d3..05b1429 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java @@ -27,5 +27,4 @@ import org.apache.orc.impl.SchemaEvolution; public interface ReadPipeline extends ConsumerFeedback<ColumnVectorBatch> { public Callable<Void> getReadCallable(); SchemaEvolution getSchemaEvolution(); - boolean[] getIncludedColumns(); } http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 9219d28..65b2d34 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -67,6 +67,8 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory; import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; @@ -79,6 +81,7 @@ import org.apache.orc.CompressionKind; import org.apache.orc.DataReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.orc.OrcConf; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSplit; @@ -154,9 +157,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> private final BufferUsageManager bufferManager; private final Configuration daemonConf, jobConf; private final FileSplit split; - private List<Integer> includedColumnIds; private final SearchArgument sarg; - private final String[] columnNames; private final OrcEncodedDataConsumer consumer; private final QueryFragmentCounters counters; private final UserGroupInformation ugi; @@ -184,27 +185,21 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> @SuppressWarnings("unused") private volatile boolean isPaused = false; - boolean[] readerIncludes = null, sargColumns = null, fileIncludes = null; + boolean[] sargColumns = null, fileIncludes = null; private final IoTrace trace; private Pool<IoTrace> tracePool; public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager, MetadataCache metadataCache, Configuration daemonConf, Configuration jobConf, - FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames, - OrcEncodedDataConsumer consumer, QueryFragmentCounters counters, - TypeDescription readerSchema, Pool<IoTrace> tracePool) + FileSplit split, Includes includes, SearchArgument sarg, OrcEncodedDataConsumer consumer, + QueryFragmentCounters counters, SchemaEvolutionFactory sef, Pool<IoTrace> tracePool) throws IOException { this.lowLevelCache = lowLevelCache; this.metadataCache = metadataCache; this.bufferManager = bufferManager; this.daemonConf = daemonConf; this.split = split; - this.includedColumnIds = columnIds; - if (this.includedColumnIds != null) { - Collections.sort(this.includedColumnIds); - } this.sarg = sarg; - this.columnNames = columnNames; this.consumer = consumer; this.counters = counters; this.trace = tracePool.take(); @@ -216,8 +211,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } this.useCodecPool = HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_ORC_CODEC_POOL); - // moved this part of code from performDataRead as LlapInputFormat need to know the file schema - // to decide if schema evolution is supported or not. + // LlapInputFormat needs to know the file schema to decide if schema evolution is supported. orcReader = null; // 1. Get file metadata from cache, or create the reader and read it. // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that @@ -227,15 +221,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID)); fileMetadata = getFileFooterFromCacheOrDisk(); final TypeDescription fileSchema = fileMetadata.getSchema(); - if (readerSchema == null) { - readerSchema = fileMetadata.getSchema(); - } - readerIncludes = OrcInputFormat.genIncludedColumns(readerSchema, includedColumnIds); - if (AcidUtils.isFullAcidScan(jobConf)) { - fileIncludes = OrcInputFormat.shiftReaderIncludedForAcid(readerIncludes); - } else { - fileIncludes = OrcInputFormat.genIncludedColumns(fileSchema, includedColumnIds); + + fileIncludes = includes.generateFileIncludes(fileSchema); + if (LOG.isDebugEnabled()) { + LOG.debug("From {}, the file includes are {}", includes, DebugUtils.toString(fileIncludes)); } + // Do not allow users to override zero-copy setting. The rest can be taken from user config. boolean useZeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(daemonConf); if (useZeroCopy != OrcConf.USE_ZEROCOPY.getBoolean(jobConf)) { @@ -243,10 +234,9 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> jobConf.setBoolean(OrcConf.USE_ZEROCOPY.getAttribute(), useZeroCopy); } this.jobConf = jobConf; - Reader.Options options = new Reader.Options(jobConf).include(readerIncludes); - evolution = new SchemaEvolution(fileMetadata.getSchema(), readerSchema, options); + // TODO: setFileMetadata could just create schema. Called in two places; clean up later. + this.evolution = sef.createSchemaEvolution(fileMetadata.getSchema()); consumer.setFileMetadata(fileMetadata); - consumer.setIncludedColumns(readerIncludes); consumer.setSchemaEvolution(evolution); } @@ -290,9 +280,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> + (fileKey == null ? "" : " (" + fileKey + ")")); try { validateFileMetadata(); - if (includedColumnIds == null) { - includedColumnIds = getAllColumnIds(fileMetadata); - } // 2. Determine which stripes to read based on the split. determineStripesToRead(); @@ -316,12 +303,14 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> try { if (sarg != null && stride != 0) { // TODO: move this to a common method + // Note: this gets IDs by name, so we assume indices don't need to be adjusted for ACID. int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx( sarg.getLeaves(), evolution); // included will not be null, row options will fill the array with trues if null sargColumns = new boolean[evolution.getFileSchema().getMaximumId() + 1]; for (int i : filterColumns) { // filter columns may have -1 as index which could be partition column in SARG. + // TODO: should this then be >=? if (i > 0) { sargColumns[i] = true; } @@ -332,7 +321,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } // Now, apply SARG if any; w/o sarg, this will just initialize stripeRgs. - boolean hasData = determineRgsToRead(fileIncludes, stride, stripeMetadatas); + boolean hasData = determineRgsToRead(stride, stripeMetadatas); if (!hasData) { consumer.setDone(); recordReaderTime(startTime); @@ -530,19 +519,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } /** - * Puts all column indexes from metadata to make a column list to read all column. - */ - private static List<Integer> getAllColumnIds(OrcFileMetadata metadata) { - int rootColumn = OrcInputFormat.getRootColumn(true); - List<Integer> types = metadata.getTypes().get(rootColumn).getSubtypesList(); - List<Integer> columnIds = new ArrayList<Integer>(types.size()); - for (int i = 0; i < types.size(); ++i) { - columnIds.add(i); - } - return columnIds; - } - - /** * Closes the stripe readers (on error). */ private void cleanupReaders() { @@ -822,7 +798,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> * Determines which RGs need to be read, after stripes have been determined. * SARG is applied, and readState is populated for each stripe accordingly. */ - private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, + private boolean determineRgsToRead(int rowIndexStride, ArrayList<OrcStripeMetadata> metadata) throws IOException { RecordReaderImpl.SargApplier sargApp = null; if (sarg != null && rowIndexStride != 0) { http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 166abf7..0cb1828 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -948,7 +948,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> OrcEncodedColumnBatch ecb = ECB_POOL.take(); ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length); - for (int colIx = 0; colIx < writerIncludes.length; ++colIx) { + // Skip the 0th column that is the root structure. + for (int colIx = 1; colIx < writerIncludes.length; ++colIx) { if (!writerIncludes[colIx]) continue; ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS); if (!hasAllData && splitIncludes[colIx]) { @@ -1035,18 +1036,15 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length); int vectorsIx = 0; for (int colIx = 0; colIx < writerIncludes.length; ++colIx) { + // Skip the 0-th column, since it won't have a vector after reading the text source. + if (colIx == 0) continue; if (!writerIncludes[colIx]) continue; if (splitIncludes[colIx]) { - // Skip the 0-th column, since it won't have a vector after reading the text source. - if (colIx != 0 ) { - List<ColumnVector> vectors = diskData.getVectors(vectorsIx++); - if (LlapIoImpl.LOG.isTraceEnabled()) { - LlapIoImpl.LOG.trace("Processing vectors for column " + colIx + ": " + vectors); - } - ecb.initColumnWithVectors(colIx, vectors); - } else { - ecb.initColumn(0, OrcEncodedColumnBatch.MAX_DATA_STREAMS); + List<ColumnVector> vectors = diskData.getVectors(vectorsIx++); + if (LlapIoImpl.LOG.isTraceEnabled()) { + LlapIoImpl.LOG.trace("Processing vectors for column " + colIx + ": " + vectors); } + ecb.initColumnWithVectors(colIx, vectors); } else { ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS); processColumnCacheData(cacheBuffers, ecb, colIx); http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java index 681d9ca..f429308 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java @@ -65,6 +65,7 @@ import org.apache.hadoop.io.Writable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; /** @@ -112,7 +113,6 @@ public class VectorExtractRow { */ public void init(StructObjectInspector structObjectInspector, List<Integer> projectedColumns) throws HiveException { - List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs(); final int count = fields.size(); allocateArrays(count); @@ -125,7 +125,6 @@ public class VectorExtractRow { ObjectInspector fieldInspector = field.getFieldObjectInspector(); TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldInspector.getTypeName()); - initEntry(i, projectionColumnNum, typeInfo); } } @@ -148,7 +147,8 @@ public class VectorExtractRow { * Initialize using data type names. * No projection -- the column range 0 .. types.size()-1 */ - public void init(List<String> typeNames) throws HiveException { + @VisibleForTesting + void init(List<String> typeNames) throws HiveException { final int count = typeNames.size(); allocateArrays(count); http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index 59b3ae9..22d2f34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index e956485..cf0d013 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.io.orc; import org.apache.hadoop.hive.ql.plan.DynamicValue.NoDynamicValuesException; - import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -30,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -342,10 +342,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, return false; } - + public static boolean[] genIncludedColumns(TypeDescription readerSchema, List<Integer> included) { + return genIncludedColumns(readerSchema, included, null); + } + public static boolean[] genIncludedColumns(TypeDescription readerSchema, + List<Integer> included, + Integer recursiveStruct) { boolean[] result = new boolean[readerSchema.getMaximumId() + 1]; if (included == null) { Arrays.fill(result, true); @@ -355,15 +360,53 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, List<TypeDescription> children = readerSchema.getChildren(); for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) { if (included.contains(columnNumber)) { - TypeDescription child = children.get(columnNumber); - for(int col = child.getId(); col <= child.getMaximumId(); ++col) { - result[col] = true; + addColumnToIncludes(children.get(columnNumber), result); + } else if (recursiveStruct != null && recursiveStruct == columnNumber) { + // This assumes all struct cols immediately follow struct + List<TypeDescription> nestedChildren = children.get(columnNumber).getChildren(); + for (int columnNumberDelta = 0; columnNumberDelta < nestedChildren.size(); ++columnNumberDelta) { + int columnNumberNested = columnNumber + 1 + columnNumberDelta; + if (included.contains(columnNumberNested)) { + addColumnToIncludes(nestedChildren.get(columnNumberDelta), result); + } + } + } + } + + return result; + } + + // Mostly dup of genIncludedColumns + public static TypeDescription[] genIncludedTypes(TypeDescription fileSchema, + List<Integer> included, Integer recursiveStruct) { + TypeDescription[] result = new TypeDescription[included.size()]; + List<TypeDescription> children = fileSchema.getChildren(); + for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) { + int indexInBatchCols = included.indexOf(columnNumber); + if (indexInBatchCols >= 0) { + result[indexInBatchCols] = children.get(columnNumber); + } else if (recursiveStruct != null && recursiveStruct == columnNumber) { + // This assumes all struct cols immediately follow struct + List<TypeDescription> nestedChildren = children.get(columnNumber).getChildren(); + for (int columnNumberDelta = 0; columnNumberDelta < nestedChildren.size(); ++columnNumberDelta) { + int columnNumberNested = columnNumber + 1 + columnNumberDelta; + int nestedIxInBatchCols = included.indexOf(columnNumberNested); + if (nestedIxInBatchCols >= 0) { + result[nestedIxInBatchCols] = nestedChildren.get(columnNumberDelta); + } } } } return result; } + + private static void addColumnToIncludes(TypeDescription child, boolean[] result) { + for(int col = child.getId(); col <= child.getMaximumId(); ++col) { + result[col] = true; + } + } + /** * Reverses genIncludedColumns; produces the table columns indexes from ORC included columns. * @param readerSchema The ORC reader schema for the table. http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index e296351..77736ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -72,6 +72,7 @@ public class VectorizedOrcAcidRowBatchReader protected float progress = 0.0f; protected Object[] partitionValues; private boolean addPartitionCols = true; + private final boolean isFlatPayload; private final ValidWriteIdList validWriteIdList; private final DeleteEventRegistry deleteEventRegistry; /** @@ -104,7 +105,8 @@ public class VectorizedOrcAcidRowBatchReader @VisibleForTesting VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter, VectorizedRowBatchCtx rbCtx) throws IOException { - this(conf, inputSplit, reporter, rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx); + this(conf, inputSplit, reporter, + rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx, false); final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, (OrcSplit) inputSplit); // Careful with the range here now, we do not want to read the whole base file like deltas. @@ -143,20 +145,22 @@ public class VectorizedOrcAcidRowBatchReader }; this.vectorizedRowBatchBase = ((RecordReaderImpl) innerReader).createRowBatch(); } + /** * LLAP IO c'tor */ public VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter, org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader, - VectorizedRowBatchCtx rbCtx) throws IOException { - this(conf, inputSplit, reporter, rbCtx); + VectorizedRowBatchCtx rbCtx, boolean isFlatPayload) throws IOException { + this(conf, inputSplit, reporter, rbCtx, isFlatPayload); this.baseReader = baseReader; this.innerReader = null; this.vectorizedRowBatchBase = baseReader.createValue(); } private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter, - VectorizedRowBatchCtx rowBatchCtx) throws IOException { + VectorizedRowBatchCtx rowBatchCtx, boolean isFlatPayload) throws IOException { + this.isFlatPayload = isFlatPayload; this.rbCtx = rowBatchCtx; final boolean isAcidRead = AcidUtils.isFullAcidScan(conf); final AcidUtils.AcidOperationalProperties acidOperationalProperties @@ -206,13 +210,13 @@ public class VectorizedOrcAcidRowBatchReader } this.deleteEventRegistry = der; isOriginal = orcSplit.isOriginal(); - if(isOriginal) { + if (isOriginal) { recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, new LongColumnVector(), new LongColumnVector(), new LongColumnVector()); - } - else { - //will swap in the Vectors from underlying row batch - recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null); + } else { + // Will swap in the Vectors from underlying row batch. + recordIdColumnVector = new StructColumnVector( + VectorizedRowBatch.DEFAULT_SIZE, null, null, null); } rowIdProjected = areRowIdsProjected(rbCtx); rootPath = orcSplit.getRootDir(); @@ -412,73 +416,10 @@ public class VectorizedOrcAcidRowBatchReader selectedBitSet.set(0, vectorizedRowBatchBase.size, true); } ColumnVector[] innerRecordIdColumnVector = vectorizedRowBatchBase.cols; - if(isOriginal) { - /* - * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order - * to see if any deletes apply - */ - boolean needSyntheticRowId = - needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), rowIdProjected); - if(needSyntheticRowId) { - assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" + syntheticProps; - assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : "" + syntheticProps; - if(innerReader == null) { - throw new IllegalStateException(getClass().getName() + " requires " + - org.apache.orc.RecordReader.class + - " to handle original files that require ROW__IDs: " + rootPath); - } - /** - * {@link RecordIdentifier#getWriteId()} - */ - recordIdColumnVector.fields[0].noNulls = true; - recordIdColumnVector.fields[0].isRepeating = true; - ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId; - /** - * This is {@link RecordIdentifier#getBucketProperty()} - * Also see {@link BucketCodec} - */ - recordIdColumnVector.fields[1].noNulls = true; - recordIdColumnVector.fields[1].isRepeating = true; - ((LongColumnVector)recordIdColumnVector.fields[1]).vector[0] = syntheticProps.bucketProperty; - /** - * {@link RecordIdentifier#getRowId()} - */ - recordIdColumnVector.fields[2].noNulls = true; - recordIdColumnVector.fields[2].isRepeating = false; - long[] rowIdVector = ((LongColumnVector)recordIdColumnVector.fields[2]).vector; - for(int i = 0; i < vectorizedRowBatchBase.size; i++) { - //baseReader.getRowNumber() seems to point at the start of the batch todo: validate - rowIdVector[i] = syntheticProps.rowIdOffset + innerReader.getRowNumber() + i; - } - //Now populate a structure to use to apply delete events - innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS]; - innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_WRITEID] = recordIdColumnVector.fields[0]; - innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1]; - innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2]; - //these are insert events so (original txn == current) txn for all rows - innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_WRITEID] = recordIdColumnVector.fields[0]; - } - if(syntheticProps.syntheticTxnId > 0) { - //"originals" (written before table was converted to acid) is considered written by - // txnid:0 which is always committed so there is no need to check wrt invalid transactions - //But originals written by Load Data for example can be in base_x or delta_x_x so we must - //check if 'x' is committed or not evn if ROW_ID is not needed in the Operator pipeline. - if (needSyntheticRowId) { - findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector, - vectorizedRowBatchBase.size, selectedBitSet); - } else { - /*since ROW_IDs are not needed we didn't create the ColumnVectors to hold them but we - * still have to check if the data being read is committed as far as current - * reader (transactions) is concerned. Since here we are reading 'original' schema file, - * all rows in it have been created by the same txn, namely 'syntheticProps.syntheticTxnId' - */ - if (!validWriteIdList.isWriteIdValid(syntheticProps.syntheticTxnId)) { - selectedBitSet.clear(0, vectorizedRowBatchBase.size); - } - } - } - } - else { + if (isOriginal) { + // Handle synthetic row IDs for the original files. + innerRecordIdColumnVector = handleOriginalFile(selectedBitSet, innerRecordIdColumnVector); + } else { // Case 1- find rows which belong to transactions that are not valid. findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); } @@ -505,29 +446,103 @@ public class VectorizedOrcAcidRowBatchReader } } - if(isOriginal) { - /*Just copy the payload. {@link recordIdColumnVector} has already been populated*/ - System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, - value.getDataColumnCount()); - } - else { - // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch. - StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW]; - // Transfer columnVector objects from base batch to outgoing batch. - System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); - if(rowIdProjected) { + if (isOriginal) { + /* Just copy the payload. {@link recordIdColumnVector} has already been populated */ + System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, value.getDataColumnCount()); + } else { + int payloadCol = OrcRecordUpdater.ROW; + if (isFlatPayload) { + // Ignore the struct column and just copy all the following data columns. + System.arraycopy(vectorizedRowBatchBase.cols, payloadCol + 1, value.cols, 0, + vectorizedRowBatchBase.cols.length - payloadCol - 1); + } else { + StructColumnVector payloadStruct = + (StructColumnVector) vectorizedRowBatchBase.cols[payloadCol]; + // Transfer columnVector objects from base batch to outgoing batch. + System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); + } + if (rowIdProjected) { recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_WRITEID]; recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET]; recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID]; } } - if(rowIdProjected) { + if (rowIdProjected) { rbCtx.setRecordIdColumnVector(recordIdColumnVector); } progress = baseReader.getProgress(); return true; } + private ColumnVector[] handleOriginalFile( + BitSet selectedBitSet, ColumnVector[] innerRecordIdColumnVector) throws IOException { + /* + * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order + * to see if any deletes apply + */ + boolean needSyntheticRowId = + needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), rowIdProjected); + if(needSyntheticRowId) { + assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" + syntheticProps; + assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : "" + syntheticProps; + if(innerReader == null) { + throw new IllegalStateException(getClass().getName() + " requires " + + org.apache.orc.RecordReader.class + + " to handle original files that require ROW__IDs: " + rootPath); + } + /** + * {@link RecordIdentifier#getWriteId()} + */ + recordIdColumnVector.fields[0].noNulls = true; + recordIdColumnVector.fields[0].isRepeating = true; + ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId; + /** + * This is {@link RecordIdentifier#getBucketProperty()} + * Also see {@link BucketCodec} + */ + recordIdColumnVector.fields[1].noNulls = true; + recordIdColumnVector.fields[1].isRepeating = true; + ((LongColumnVector)recordIdColumnVector.fields[1]).vector[0] = syntheticProps.bucketProperty; + /** + * {@link RecordIdentifier#getRowId()} + */ + recordIdColumnVector.fields[2].noNulls = true; + recordIdColumnVector.fields[2].isRepeating = false; + long[] rowIdVector = ((LongColumnVector)recordIdColumnVector.fields[2]).vector; + for(int i = 0; i < vectorizedRowBatchBase.size; i++) { + //baseReader.getRowNumber() seems to point at the start of the batch todo: validate + rowIdVector[i] = syntheticProps.rowIdOffset + innerReader.getRowNumber() + i; + } + //Now populate a structure to use to apply delete events + innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS]; + innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_WRITEID] = recordIdColumnVector.fields[0]; + innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1]; + innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2]; + //these are insert events so (original txn == current) txn for all rows + innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_WRITEID] = recordIdColumnVector.fields[0]; + } + if(syntheticProps.syntheticTxnId > 0) { + //"originals" (written before table was converted to acid) is considered written by + // txnid:0 which is always committed so there is no need to check wrt invalid transactions + //But originals written by Load Data for example can be in base_x or delta_x_x so we must + //check if 'x' is committed or not evn if ROW_ID is not needed in the Operator pipeline. + if (needSyntheticRowId) { + findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector, + vectorizedRowBatchBase.size, selectedBitSet); + } else { + /*since ROW_IDs are not needed we didn't create the ColumnVectors to hold them but we + * still have to check if the data being read is committed as far as current + * reader (transactions) is concerned. Since here we are reading 'original' schema file, + * all rows in it have been created by the same txn, namely 'syntheticProps.syntheticTxnId' + */ + if (!validWriteIdList.isWriteIdValid(syntheticProps.syntheticTxnId)) { + selectedBitSet.clear(0, vectorizedRowBatchBase.size); + } + } + } + return innerRecordIdColumnVector; + } + private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { findRecordsWithInvalidTransactionIds(batch.cols, batch.size, selectedBitSet); } http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java index c1e55c7..f6b949e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java @@ -34,14 +34,14 @@ public interface EncodedReader { * @param index Externally provided metadata (from metadata reader or external cache). * @param encodings Externally provided metadata (from metadata reader or external cache). * @param streams Externally provided metadata (from metadata reader or external cache). - * @param included The array of booleans indicating whether each column should be read. + * @param physicalFileIncludes The array of booleans indicating whether each column should be read. * @param colRgs Arrays of rgs, per column set to true in included, that are to be read. * null in each respective position means all rgs for this column need to be read. * @param consumer The sink for data that has been read. */ void readEncodedColumns(int stripeIx, StripeInformation stripe, OrcProto.RowIndex[] index, List<OrcProto.ColumnEncoding> encodings, - List<OrcProto.Stream> streams, boolean[] included, boolean[] rgs, + List<OrcProto.Stream> streams, boolean[] physicalFileIncludes, boolean[] rgs, Consumer<OrcEncodedColumnBatch> consumer) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 893a2bb..9ee1229 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -100,6 +100,8 @@ import sun.misc.Cleaner; * 6) Given that RG end boundaries in ORC are estimates, we can request data from cache and then * not use it; thus, at the end we go thru all the MBs, and release those not released by (5). */ +// Note: this thing should know nothing about ACID or schema. It reads physical columns by index; +// schema evolution/ACID schema considerations should be on higher level. class EncodedReaderImpl implements EncodedReader { public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class); private static Field cleanerField; @@ -282,7 +284,7 @@ class EncodedReaderImpl implements EncodedReader { @Override public void readEncodedColumns(int stripeIx, StripeInformation stripe, OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings, - List<OrcProto.Stream> streamList, boolean[] included, boolean[] rgs, + List<OrcProto.Stream> streamList, boolean[] physicalFileIncludes, boolean[] rgs, Consumer<OrcEncodedColumnBatch> consumer) throws IOException { // Note: for now we don't have to setError here, caller will setError if we throw. // We are also not supposed to call setDone, since we are only part of the operation. @@ -298,11 +300,11 @@ class EncodedReaderImpl implements EncodedReader { // We assume stream list is sorted by column and that non-data // streams do not interleave data streams for the same column. // 1.2. With that in mind, determine disk ranges to read/get from cache (not by stream). - ColumnReadContext[] colCtxs = new ColumnReadContext[included.length]; + ColumnReadContext[] colCtxs = new ColumnReadContext[physicalFileIncludes.length]; int colRgIx = -1; // Don't create context for the 0-s column. - for (int i = 1; i < included.length; ++i) { - if (!included[i]) continue; + for (int i = 1; i < physicalFileIncludes.length; ++i) { + if (!physicalFileIncludes[i]) continue; ColumnEncoding enc = encodings.get(i); colCtxs[i] = new ColumnReadContext(i, enc, indexes[i], ++colRgIx); if (isTracingEnabled) { @@ -316,10 +318,10 @@ class EncodedReaderImpl implements EncodedReader { long length = stream.getLength(); int colIx = stream.getColumn(); OrcProto.Stream.Kind streamKind = stream.getKind(); - if (!included[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) { + if (!physicalFileIncludes[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) { // We have a stream for included column, but in future it might have no data streams. // It's more like "has at least one column included that has an index stream". - hasIndexOnlyCols = hasIndexOnlyCols || included[colIx]; + hasIndexOnlyCols = hasIndexOnlyCols || physicalFileIncludes[colIx]; if (isTracingEnabled) { LOG.trace("Skipping stream for column " + colIx + ": " + streamKind + " at " + offset + ", " + length); @@ -358,7 +360,7 @@ class EncodedReaderImpl implements EncodedReader { // TODO: there may be a bug here. Could there be partial RG filtering on index-only column? if (hasIndexOnlyCols && (rgs == null)) { OrcEncodedColumnBatch ecb = POOLS.ecbPool.take(); - ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, included.length); + ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, physicalFileIncludes.length); try { consumer.consumeData(ecb); } catch (InterruptedException e) { @@ -400,7 +402,7 @@ class EncodedReaderImpl implements EncodedReader { trace.logStartRg(rgIx); boolean hasErrorForEcb = true; try { - ecb.init(fileKey, stripeIx, rgIx, included.length); + ecb.init(fileKey, stripeIx, rgIx, physicalFileIncludes.length); for (int colIx = 0; colIx < colCtxs.length; ++colIx) { ColumnReadContext ctx = colCtxs[colIx]; if (ctx == null) continue; // This column is not included. @@ -1829,21 +1831,21 @@ class EncodedReaderImpl implements EncodedReader { @Override public void readIndexStreams(OrcIndex index, StripeInformation stripe, - List<OrcProto.Stream> streams, boolean[] included, boolean[] sargColumns) + List<OrcProto.Stream> streams, boolean[] physicalFileIncludes, boolean[] sargColumns) throws IOException { long stripeOffset = stripe.getOffset(); - DiskRangeList indexRanges = planIndexReading( - fileSchema, streams, true, included, sargColumns, version, index.getBloomFilterKinds()); + DiskRangeList indexRanges = planIndexReading(fileSchema, streams, true, physicalFileIncludes, + sargColumns, version, index.getBloomFilterKinds()); if (indexRanges == null) { if (LOG.isDebugEnabled()) { LOG.debug("Nothing to read for stripe [" + stripe + "]"); } return; } - ReadContext[] colCtxs = new ReadContext[included.length]; + ReadContext[] colCtxs = new ReadContext[physicalFileIncludes.length]; int colRgIx = -1; - for (int i = 0; i < included.length; ++i) { - if (!included[i] && (sargColumns == null || !sargColumns[i])) continue; + for (int i = 0; i < physicalFileIncludes.length; ++i) { + if (!physicalFileIncludes[i] && (sargColumns == null || !sargColumns[i])) continue; colCtxs[i] = new ReadContext(i, ++colRgIx); if (isTracingEnabled) { LOG.trace("Creating context: " + colCtxs[i].toString()); @@ -1858,7 +1860,7 @@ class EncodedReaderImpl implements EncodedReader { // See planIndexReading - only read non-row-index streams if involved in SARGs. if ((StreamName.getArea(streamKind) == StreamName.Area.INDEX) && ((sargColumns != null && sargColumns[colIx]) - || (included[colIx] && streamKind == Kind.ROW_INDEX))) { + || (physicalFileIncludes[colIx] && streamKind == Kind.ROW_INDEX))) { trace.logAddStream(colIx, streamKind, offset, length, -1, true); colCtxs[colIx].addStream(offset, stream, -1); if (isTracingEnabled) { http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java index 1e7708e..42532f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java @@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.io.orc.encoded; import org.apache.orc.impl.RunLengthByteReader; import java.io.IOException; +import java.util.Arrays; import java.util.List; +import org.apache.curator.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -2099,35 +2101,27 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } } - public static StructTreeReader createRootTreeReader(TypeDescription schema, + public static StructTreeReader createRootTreeReader(TypeDescription[] batchSchemas, List<OrcProto.ColumnEncoding> encodings, OrcEncodedColumnBatch batch, - CompressionCodec codec, TreeReaderFactory.Context context, int[] columnMapping) - throws IOException { - if (schema.getCategory() != Category.STRUCT) { - throw new AssertionError("Schema is not a struct: " + schema); - } - // Some child types may be excluded. Note that this can only happen at root level. - List<TypeDescription> children = schema.getChildren(); - int childCount = children.size(), includedCount = 0; - for (int childIx = 0; childIx < childCount; ++childIx) { - int batchColIx = children.get(childIx).getId(); + CompressionCodec codec, TreeReaderFactory.Context context) throws IOException { + // Note: we only look at the schema here to deal with complex types. Somebody has set up the + // reader with whatever ideas they had to the schema and we just trust the reader to + // produce the CVBs that was asked for. However, we only need to look at top level columns. + int includedCount = batch.getColumnsWithDataCount(); + if (batchSchemas.length > includedCount) { + throw new AssertionError("For " + Arrays.toString(batchSchemas) + ", only received " + + includedCount + " columns"); + } + TreeReader[] childReaders = new TreeReader[batchSchemas.length]; + for (int i = 0; i < batchSchemas.length; ++i) { + int batchColIx = batchSchemas[i].getId(); if (!batch.hasData(batchColIx) && !batch.hasVectors(batchColIx)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Column at " + childIx + " " + children.get(childIx).getId() - + ":" + children.get(childIx).toString() + " has no data"); - } - continue; + throw new AssertionError("No data for column " + batchColIx + ": " + batchSchemas[i]); } - ++includedCount; - } - TreeReader[] childReaders = new TreeReader[includedCount]; - for (int schemaChildIx = 0, inclChildIx = -1; schemaChildIx < childCount; ++schemaChildIx) { - int batchColIx = children.get(schemaChildIx).getId(); - if (!batch.hasData(batchColIx) && !batch.hasVectors(batchColIx)) continue; - childReaders[++inclChildIx] = createEncodedTreeReader( - schema.getChildren().get(schemaChildIx), encodings, batch, codec, context); - columnMapping[inclChildIx] = schemaChildIx; + childReaders[i] = createEncodedTreeReader(batchSchemas[i], encodings, batch, codec, context); } + + // TODO: do we actually need this reader? the caller just extracts child readers. return StructStreamReader.builder() .setColumnIndex(0) .setCompressionCodec(codec) http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java index 50d10e3..025600b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java @@ -105,11 +105,10 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader { if (columnVectors != null && columnCount == columnVectors.length) { Arrays.fill(columnVectors, null); return; - } if (columnVectors != null) { - columnVectors = new List[columnCount]; - } else { - columnVectors = null; } + if (columnVectors != null) { + columnVectors = new List[columnCount]; + } // else just keep it null } public boolean hasVectors(int colIx) { @@ -120,5 +119,14 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader { if (!hasVectors(colIx)) throw new AssertionError("No data for column " + colIx); return columnVectors[colIx]; } + + public int getColumnsWithDataCount() { + int childCount = hasData.length, result = 0; + for (int childIx = 0; childIx < childCount; ++childIx) { + if (!hasData(childIx) && !hasVectors(childIx)) continue; + ++result; + } + return result; + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/1a3090f8/ql/src/test/queries/clientpositive/llap_acid2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/llap_acid2.q b/ql/src/test/queries/clientpositive/llap_acid2.q new file mode 100644 index 0000000..76f6203 --- /dev/null +++ b/ql/src/test/queries/clientpositive/llap_acid2.q @@ -0,0 +1,84 @@ +set hive.mapred.mode=nonstrict; +SET hive.vectorized.execution.enabled=true; + +SET hive.llap.io.enabled=false; + +SET hive.exec.orc.default.buffer.size=32768; +SET hive.exec.orc.default.row.index.stride=1000; +SET hive.optimize.index.filter=true; +set hive.fetch.task.conversion=none; + +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +DROP TABLE orc_llap; + +CREATE TABLE orc_llap ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cint0 INT, + cbigint0 BIGINT, + cfloat0 FLOAT, + cdouble0 DOUBLE, + cint1 INT, + cbigint1 BIGINT, + cfloat1 FLOAT, + cdouble1 DOUBLE, + cstring1 string, + cfloat2 float +) stored as orc TBLPROPERTIES ('transactional'='true'); + + +insert into table orc_llap +select cint, cbigint, cfloat, cdouble, + cint as c1, cbigint as c2, cfloat as c3, cdouble as c4, + cint as c8, cbigint as c7, cfloat as c6, cdouble as c5, + cstring1, cfloat as c9 from alltypesorc order by cdouble asc limit 30; + + + + + +CREATE TABLE orc_llap2 ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cint0 INT, + cbigint0 BIGINT, + cfloat0 FLOAT, + cdouble0 DOUBLE, + cint1 INT, + cbigint1 BIGINT, + cfloat1 FLOAT, + cdouble1 DOUBLE, + cstring1 string, + cfloat2 float +) stored as orc TBLPROPERTIES ('transactional'='false'); + +insert into table orc_llap2 +select cint, cbigint, cfloat, cdouble, + cint as c1, cbigint as c2, cfloat as c3, cdouble as c4, + cint as c8, cbigint as c7, cfloat as c6, cdouble as c5, + cstring1, cfloat as c9 from alltypesorc order by cdouble asc limit 30; + +alter table orc_llap2 set TBLPROPERTIES ('transactional'='true'); + +update orc_llap2 set cstring1 = 'testvalue' where cstring1 = 'N016jPED08o'; + + +SET hive.llap.io.enabled=true; + +select cstring1 from orc_llap; +select cfloat2, cint from orc_llap; +select * from orc_llap; + +select cstring1 from orc_llap2; +select cfloat2, cint from orc_llap2; +select * from orc_llap2; + + +DROP TABLE orc_llap;