HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline and Prasanth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ac424f0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ac424f0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ac424f0 Branch: refs/heads/llap Commit: 0ac424f0a17b341efe299da167791112e4a953e9 Parents: bf44f3e Author: Owen O'Malley <omal...@apache.org> Authored: Tue Apr 26 08:15:00 2016 -0700 Committer: Owen O'Malley <omal...@apache.org> Committed: Tue Apr 26 08:17:24 2016 -0700 ---------------------------------------------------------------------- itests/qtest/pom.xml | 2 +- .../llap/io/decode/OrcEncodedDataConsumer.java | 46 +- .../llap/io/encoded/OrcEncodedDataReader.java | 62 +- .../llap/io/metadata/OrcStripeMetadata.java | 6 +- .../TestIncrementalObjectSizeEstimator.java | 31 +- orc/src/java/org/apache/orc/DataReader.java | 24 +- .../java/org/apache/orc/DataReaderFactory.java | 9 - .../org/apache/orc/MetadataReaderFactory.java | 12 - orc/src/java/org/apache/orc/OrcUtils.java | 78 + orc/src/java/org/apache/orc/Reader.java | 29 +- orc/src/java/org/apache/orc/RecordReader.java | 8 +- .../java/org/apache/orc/TypeDescription.java | 62 +- .../org/apache/orc/impl/BitFieldReader.java | 5 +- .../apache/orc/impl/DataReaderProperties.java | 41 +- .../orc/impl/DefaultMetadataReaderFactory.java | 14 - orc/src/java/org/apache/orc/impl/InStream.java | 4 +- .../java/org/apache/orc/impl/IntegerReader.java | 26 +- .../org/apache/orc/impl/MetadataReader.java | 33 - .../org/apache/orc/impl/MetadataReaderImpl.java | 120 -- .../orc/impl/MetadataReaderProperties.java | 96 - .../apache/orc/impl/RunLengthByteReader.java | 36 +- .../apache/orc/impl/RunLengthIntegerReader.java | 31 +- .../orc/impl/RunLengthIntegerReaderV2.java | 33 +- .../java/org/apache/orc/impl/WriterImpl.java | 47 +- .../orc/impl/TestDataReaderProperties.java | 12 +- .../orc/impl/TestMetadataReaderProperties.java | 72 - .../ql/exec/vector/VectorizedRowBatchCtx.java | 13 +- .../ql/io/orc/DefaultDataReaderFactory.java | 14 - .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 43 +- .../hive/ql/io/orc/OrcRawRecordMerger.java | 3 +- .../hadoop/hive/ql/io/orc/ReaderImpl.java | 65 +- .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 231 +-- .../hive/ql/io/orc/RecordReaderUtils.java | 127 +- .../hadoop/hive/ql/io/orc/SchemaEvolution.java | 234 +-- .../hive/ql/io/orc/TreeReaderFactory.java | 838 +++++---- .../ql/io/orc/VectorizedOrcInputFormat.java | 32 +- .../hadoop/hive/ql/io/orc/WriterImpl.java | 2 - .../hive/ql/io/orc/TestRecordReaderImpl.java | 55 +- .../hive/ql/io/orc/TestTypeDescription.java | 4 +- .../hive/ql/io/orc/TestVectorOrcFile.java | 1647 +++++++++--------- .../hive/ql/io/orc/TestVectorizedORCReader.java | 7 +- .../hive/ql/exec/vector/BytesColumnVector.java | 13 +- .../ql/exec/vector/DecimalColumnVector.java | 2 +- .../hive/ql/exec/vector/DoubleColumnVector.java | 2 +- .../hive/ql/exec/vector/LongColumnVector.java | 2 +- .../ql/exec/vector/MultiValuedColumnVector.java | 2 +- .../ql/exec/vector/TimestampColumnVector.java | 2 +- .../hive/ql/exec/vector/UnionColumnVector.java | 2 - 48 files changed, 2133 insertions(+), 2146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/itests/qtest/pom.xml ---------------------------------------------------------------------- diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml index b042774..a479557 100644 --- a/itests/qtest/pom.xml +++ b/itests/qtest/pom.xml @@ -431,7 +431,7 @@ templatePath="${basedir}/${hive.path.to.root}/ql/src/test/templates/" template="TestCliDriver.vm" queryDirectory="${basedir}/${hive.path.to.root}/ql/src/test/queries/clientpositive/" queryFile="${qfile}" - excludeQueryFile="${minillap.query.files},${minimr.query.files},${minitez.query.files},${encrypted.query.files},${spark.only.query.files},${disabled.query.files},regexp_extract.q" + excludeQueryFile="${minillap.query.files},${minimr.query.files},${minitez.query.files},${encrypted.query.files},${spark.only.query.files},${disabled.query.files}" queryFileRegex="${qfile_regex}" clusterMode="${clustermode}" runDisabled="${run_disabled}" http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 0651557..a689f10 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.io.decode; import java.io.IOException; +import java.util.List; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; @@ -27,7 +28,12 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.CompressionCodec; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; @@ -71,6 +77,35 @@ public class OrcEncodedDataConsumer stripes[m.getStripeIx()] = m; } + private static ColumnVector createColumn(OrcProto.Type type, + int batchSize) { + switch (type.getKind()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + return new LongColumnVector(batchSize); + case FLOAT: + case DOUBLE: + return new DoubleColumnVector(batchSize); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + return new BytesColumnVector(batchSize); + case TIMESTAMP: + return new TimestampColumnVector(batchSize); + case DECIMAL: + return new DecimalColumnVector(batchSize, type.getPrecision(), + type.getScale()); + default: + throw new IllegalArgumentException("LLAP does not support " + + type.getKind()); + } + } + @Override protected void decodeBatch(OrcEncodedColumnBatch batch, Consumer<ColumnVectorBatch> downstreamConsumer) { @@ -112,9 +147,16 @@ public class OrcEncodedDataConsumer ColumnVectorBatch cvb = cvbPool.take(); assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split. cvb.size = batchSize; - + List<OrcProto.Type> types = fileMetadata.getTypes(); + int[] columnMapping = batch.getColumnIxs(); for (int idx = 0; idx < batch.getColumnIxs().length; idx++) { - cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(cvb.cols[idx], batchSize); + if (cvb.cols[idx] == null) { + // skip over the top level struct, but otherwise assume no complex + // types + cvb.cols[idx] = createColumn(types.get(columnMapping[idx]), batchSize); + } + cvb.cols[idx].ensureSize(batchSize, false); + columnReaders[idx].nextVector(cvb.cols[idx], null, batchSize); } // we are done reading a batch, send it to consumer for processing http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 406f8f6..83011fb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -27,6 +27,8 @@ import java.util.List; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.orc.impl.DataReaderProperties; +import org.apache.orc.impl.OrcIndex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -58,7 +60,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.orc.CompressionKind; import org.apache.orc.DataReader; -import org.apache.orc.impl.MetadataReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; import org.apache.orc.OrcConf; @@ -144,8 +145,9 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // Read state. private int stripeIxFrom; private OrcFileMetadata fileMetadata; + private Path path; private Reader orcReader; - private MetadataReader metadataReader; + private DataReader metadataReader; private EncodedReader stripeReader; private Object fileKey; private FileSystem fs; @@ -555,13 +557,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> * Closes the stripe readers (on error). */ private void cleanupReaders() { - if (metadataReader != null) { - try { - metadataReader.close(); - } catch (IOException ex) { - // Ignore. - } - } if (stripeReader != null) { try { stripeReader.close(); @@ -576,7 +571,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> */ private void ensureOrcReader() throws IOException { if (orcReader != null) return; - Path path = split.getPath(); + path = split.getPath(); if (fileKey instanceof Long && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) { path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey); } @@ -661,7 +656,16 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> ensureOrcReader(); if (metadataReader != null) return; long startTime = counters.startTimeCounter(); - metadataReader = orcReader.metadata(); + boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf); + metadataReader = RecordReaderUtils.createDefaultDataReader( + DataReaderProperties.builder() + .withBufferSize(orcReader.getCompressionSize()) + .withCompression(orcReader.getCompressionKind()) + .withFileSystem(fs) + .withPath(path) + .withTypeCount(orcReader.getSchema().getMaximumId() + 1) + .withZeroCopy(useZeroCopy) + .build()); counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); } @@ -805,13 +809,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> private class DataWrapperForOrc implements DataReader, DataCache { private final DataReader orcDataReader; - public DataWrapperForOrc() { - boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf); - if (useZeroCopy && !getAllocator().isDirectAlloc()) { - throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache " - + "buffers; either disable zero-copy or enable direct cache allocation"); - } - this.orcDataReader = orcReader.createDefaultDataReader(useZeroCopy); + private DataWrapperForOrc(DataWrapperForOrc other) { + orcDataReader = other.orcDataReader.clone(); + } + + public DataWrapperForOrc() throws IOException { + ensureMetadataReader(); + this.orcDataReader = metadataReader.clone(); } @Override @@ -881,10 +885,32 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } @Override + public DataWrapperForOrc clone() { + return new DataWrapperForOrc(this); + } + + @Override public void open() throws IOException { long startTime = counters.startTimeCounter(); orcDataReader.open(); counters.recordHdfsTime(startTime); } + + @Override + public OrcIndex readRowIndex(StripeInformation stripe, + OrcProto.StripeFooter footer, + boolean[] included, + OrcProto.RowIndex[] indexes, + boolean[] sargColumns, + OrcProto.BloomFilterIndex[] bloomFilterIndices + ) throws IOException { + return orcDataReader.readRowIndex(stripe, footer, included, indexes, + sargColumns, bloomFilterIndices); + } + + @Override + public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException { + return orcDataReader.readStripeFooter(stripe); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java index 82187bd..6874586 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java @@ -28,9 +28,9 @@ import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer; import org.apache.hadoop.hive.ql.io.SyntheticFileId; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; +import org.apache.orc.DataReader; import org.apache.orc.OrcProto; import org.apache.orc.StripeInformation; -import org.apache.orc.impl.MetadataReader; import org.apache.orc.impl.OrcIndex; public class OrcStripeMetadata extends LlapCacheableBuffer { @@ -54,7 +54,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer { SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcStripeMetadata.class); } - public OrcStripeMetadata(OrcBatchKey stripeKey, MetadataReader mr, StripeInformation stripe, + public OrcStripeMetadata(OrcBatchKey stripeKey, DataReader mr, StripeInformation stripe, boolean[] includes, boolean[] sargColumns) throws IOException { this.stripeKey = stripeKey; OrcProto.StripeFooter footer = mr.readStripeFooter(stripe); @@ -95,7 +95,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer { return true; } - public void loadMissingIndexes(MetadataReader mr, StripeInformation stripe, boolean[] includes, + public void loadMissingIndexes(DataReader mr, StripeInformation stripe, boolean[] includes, boolean[] sargColumns) throws IOException { // TODO: should we save footer to avoid a read here? rowIndex = mr.readRowIndex(stripe, null, includes, rowIndex.getRowGroupIndex(), http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java index a078f73..183fb1b 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java @@ -21,18 +21,20 @@ import static org.junit.Assert.*; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.ArrayList; import java.util.LinkedHashSet; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.orc.DataReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator; import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; -import org.apache.orc.impl.MetadataReader; import org.apache.orc.impl.OrcIndex; import org.apache.orc.StripeInformation; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; @@ -46,11 +48,16 @@ import com.google.protobuf.CodedOutputStream; public class TestIncrementalObjectSizeEstimator { private static final Logger LOG = LoggerFactory.getLogger(TestIncrementalObjectSizeEstimator.class); - private static class DummyMetadataReader implements MetadataReader { + private static class DummyMetadataReader implements DataReader { public boolean doStreamStep = false; public boolean isEmpty; @Override + public void open() throws IOException { + + } + + @Override public OrcIndex readRowIndex(StripeInformation stripe, OrcProto.StripeFooter footer, boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns, @@ -118,6 +125,26 @@ public class TestIncrementalObjectSizeEstimator { } @Override + public DiskRangeList readFileData(DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException { + return null; + } + + @Override + public boolean isTrackingDiskRanges() { + return false; + } + + @Override + public void releaseBuffer(ByteBuffer toRelease) { + + } + + @Override + public DataReader clone() { + return null; + } + + @Override public void close() throws IOException { } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/DataReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/DataReader.java b/orc/src/java/org/apache/orc/DataReader.java index b70f26b..a5dbb76 100644 --- a/orc/src/java/org/apache/orc/DataReader.java +++ b/orc/src/java/org/apache/orc/DataReader.java @@ -18,18 +18,27 @@ package org.apache.orc; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.orc.impl.OrcIndex; /** An abstract data reader that IO formats can use to read bytes from underlying storage. */ -public interface DataReader extends Closeable { +public interface DataReader extends AutoCloseable { /** Opens the DataReader, making it ready to use. */ void open() throws IOException; + OrcIndex readRowIndex(StripeInformation stripe, + OrcProto.StripeFooter footer, + boolean[] included, OrcProto.RowIndex[] indexes, + boolean[] sargColumns, + OrcProto.BloomFilterIndex[] bloomFilterIndices + ) throws IOException; + + OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException; + /** Reads the data. * * Note that for the cases such as zero-copy read, caller must release the disk ranges @@ -53,4 +62,15 @@ public interface DataReader extends Closeable { * @param toRelease The buffer to release. */ void releaseBuffer(ByteBuffer toRelease); + + /** + * Clone the entire state of the DataReader with the assumption that the + * clone will be closed at a different time. Thus, any file handles in the + * implementation need to be cloned. + * @return a new instance + */ + DataReader clone(); + + @Override + public void close() throws IOException; } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/DataReaderFactory.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/DataReaderFactory.java b/orc/src/java/org/apache/orc/DataReaderFactory.java deleted file mode 100644 index ec3a0e9..0000000 --- a/orc/src/java/org/apache/orc/DataReaderFactory.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.apache.orc; - -import org.apache.orc.impl.DataReaderProperties; - -public interface DataReaderFactory { - - DataReader create(DataReaderProperties properties); - -} http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/MetadataReaderFactory.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/MetadataReaderFactory.java b/orc/src/java/org/apache/orc/MetadataReaderFactory.java deleted file mode 100644 index 64629da..0000000 --- a/orc/src/java/org/apache/orc/MetadataReaderFactory.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.apache.orc; - -import org.apache.orc.impl.MetadataReader; -import org.apache.orc.impl.MetadataReaderProperties; - -import java.io.IOException; - -public interface MetadataReaderFactory { - - MetadataReader create(MetadataReaderProperties properties) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/OrcUtils.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/OrcUtils.java b/orc/src/java/org/apache/orc/OrcUtils.java index 2e93254..5845ba6 100644 --- a/orc/src/java/org/apache/orc/OrcUtils.java +++ b/orc/src/java/org/apache/orc/OrcUtils.java @@ -449,4 +449,82 @@ public class OrcUtils { return columnId; } + /** + * Translate the given rootColumn from the list of types to a TypeDescription. + * @param types all of the types + * @param rootColumn translate this type + * @return a new TypeDescription that matches the given rootColumn + */ + public static + TypeDescription convertTypeFromProtobuf(List<OrcProto.Type> types, + int rootColumn) { + OrcProto.Type type = types.get(rootColumn); + switch (type.getKind()) { + case BOOLEAN: + return TypeDescription.createBoolean(); + case BYTE: + return TypeDescription.createByte(); + case SHORT: + return TypeDescription.createShort(); + case INT: + return TypeDescription.createInt(); + case LONG: + return TypeDescription.createLong(); + case FLOAT: + return TypeDescription.createFloat(); + case DOUBLE: + return TypeDescription.createDouble(); + case STRING: + return TypeDescription.createString(); + case CHAR: + case VARCHAR: { + TypeDescription result = type.getKind() == OrcProto.Type.Kind.CHAR ? + TypeDescription.createChar() : TypeDescription.createVarchar(); + if (type.hasMaximumLength()) { + result.withMaxLength(type.getMaximumLength()); + } + return result; + } + case BINARY: + return TypeDescription.createBinary(); + case TIMESTAMP: + return TypeDescription.createTimestamp(); + case DATE: + return TypeDescription.createDate(); + case DECIMAL: { + TypeDescription result = TypeDescription.createDecimal(); + if (type.hasScale()) { + result.withScale(type.getScale()); + } + if (type.hasPrecision()) { + result.withPrecision(type.getPrecision()); + } + return result; + } + case LIST: + return TypeDescription.createList( + convertTypeFromProtobuf(types, type.getSubtypes(0))); + case MAP: + return TypeDescription.createMap( + convertTypeFromProtobuf(types, type.getSubtypes(0)), + convertTypeFromProtobuf(types, type.getSubtypes(1))); + case STRUCT: { + TypeDescription result = TypeDescription.createStruct(); + for(int f=0; f < type.getSubtypesCount(); ++f) { + result.addField(type.getFieldNames(f), + convertTypeFromProtobuf(types, type.getSubtypes(f))); + } + return result; + } + case UNION: { + TypeDescription result = TypeDescription.createUnion(); + for(int f=0; f < type.getSubtypesCount(); ++f) { + result.addUnionChild( + convertTypeFromProtobuf(types, type.getSubtypes(f))); + } + return result; + } + } + throw new IllegalArgumentException("Unknown ORC type " + type.getKind()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/Reader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/Reader.java b/orc/src/java/org/apache/orc/Reader.java index be722b5..39de763 100644 --- a/orc/src/java/org/apache/orc/Reader.java +++ b/orc/src/java/org/apache/orc/Reader.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; -import org.apache.orc.impl.MetadataReader; /** * The interface for reading ORC files. @@ -116,9 +115,15 @@ public interface Reader { ColumnStatistics[] getStatistics(); /** + * Get the type of rows in this ORC file. + */ + TypeDescription getSchema(); + + /** * Get the list of types contained in the file. The root type is the first * type in the list. * @return the list of flattened types + * @deprecated use getSchema instead */ List<OrcProto.Type> getTypes(); @@ -144,6 +149,7 @@ public interface Reader { private Boolean useZeroCopy = null; private Boolean skipCorruptRecords = null; private TypeDescription schema = null; + private DataReader dataReader = null; /** * Set the list of columns to read. @@ -197,6 +203,11 @@ public interface Reader { return this; } + public Options dataReader(DataReader value) { + this.dataReader = value; + return this; + } + /** * Set whether to skip corrupt records. * @param value the new skip corrupt records flag @@ -247,6 +258,10 @@ public interface Reader { return skipCorruptRecords; } + public DataReader getDataReader() { + return dataReader; + } + public Options clone() { Options result = new Options(); result.include = include; @@ -257,6 +272,7 @@ public interface Reader { result.columnNames = columnNames; result.useZeroCopy = useZeroCopy; result.skipCorruptRecords = skipCorruptRecords; + result.dataReader = dataReader == null ? null : dataReader.clone(); return result; } @@ -321,11 +337,6 @@ public interface Reader { RecordReader rowsOptions(Options options) throws IOException; /** - * @return Metadata reader used to read file metadata. - */ - MetadataReader metadata() throws IOException; - - /** * @return List of integers representing version of the file, in order from major to minor. */ List<Integer> getVersionList(); @@ -351,12 +362,6 @@ public interface Reader { List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics(); /** - * @param useZeroCopy Whether zero-copy read should be used. - * @return The default data reader that ORC is using to read bytes from disk. - */ - DataReader createDefaultDataReader(boolean useZeroCopy); - - /** * @return Serialized file metadata read from disk for the purposes of caching, etc. */ ByteBuffer getSerializedFileFooter(); http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/RecordReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/RecordReader.java b/orc/src/java/org/apache/orc/RecordReader.java index 7229dda..09ba0f0 100644 --- a/orc/src/java/org/apache/orc/RecordReader.java +++ b/orc/src/java/org/apache/orc/RecordReader.java @@ -30,13 +30,11 @@ public interface RecordReader { * controlled by the callers. Caller need to look at * VectorizedRowBatch.size of the retunred object to know the batch * size read. - * @param previousBatch a row batch object that can be reused by the reader - * @return the row batch that was read. The batch will have a non-zero row - * count if the pointer isn't at the end of the file + * @param batch a row batch object to read into + * @return were more rows available to read? * @throws java.io.IOException */ - VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch - ) throws IOException; + boolean nextBatch(VectorizedRowBatch batch) throws IOException; /** * Get the row number of the row that will be returned by the following http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/TypeDescription.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/TypeDescription.java b/orc/src/java/org/apache/orc/TypeDescription.java index bd900ac..b8e057e 100644 --- a/orc/src/java/org/apache/orc/TypeDescription.java +++ b/orc/src/java/org/apache/orc/TypeDescription.java @@ -61,7 +61,7 @@ public class TypeDescription { LIST("array", false), MAP("map", false), STRUCT("struct", false), - UNION("union", false); + UNION("uniontype", false); Category(String name, boolean isPrimitive) { this.name = name; @@ -258,6 +258,66 @@ public class TypeDescription { return id; } + public TypeDescription clone() { + TypeDescription result = new TypeDescription(category); + result.maxLength = maxLength; + result.precision = precision; + result.scale = scale; + if (fieldNames != null) { + result.fieldNames.addAll(fieldNames); + } + if (children != null) { + for(TypeDescription child: children) { + TypeDescription clone = child.clone(); + clone.parent = result; + result.children.add(clone); + } + } + return result; + } + + @Override + public int hashCode() { + return getId(); + } + + @Override + public boolean equals(Object other) { + if (other == null || other.getClass() != TypeDescription.class) { + return false; + } + if (other == this) { + return true; + } + TypeDescription castOther = (TypeDescription) other; + if (category != castOther.category || + getId() != castOther.getId() || + getMaximumId() != castOther.getMaximumId() || + maxLength != castOther.maxLength || + scale != castOther.scale || + precision != castOther.precision) { + return false; + } + if (children != null) { + if (children.size() != castOther.children.size()) { + return false; + } + for (int i = 0; i < children.size(); ++i) { + if (!children.get(i).equals(castOther.children.get(i))) { + return false; + } + } + } + if (category == Category.STRUCT) { + for(int i=0; i < fieldNames.size(); ++i) { + if (!fieldNames.get(i).equals(castOther.fieldNames.get(i))) { + return false; + } + } + } + return true; + } + /** * Get the maximum id assigned to this type or its children. * The first call will cause all of the the ids in tree to be assigned, so http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/BitFieldReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/BitFieldReader.java b/orc/src/java/org/apache/orc/impl/BitFieldReader.java index 8d9d3cb..dda7355 100644 --- a/orc/src/java/org/apache/orc/impl/BitFieldReader.java +++ b/orc/src/java/org/apache/orc/impl/BitFieldReader.java @@ -137,7 +137,7 @@ public class BitFieldReader { long previousLen) throws IOException { previous.isRepeating = true; for (int i = 0; i < previousLen; i++) { - if (!previous.isNull[i]) { + if (previous.noNulls || !previous.isNull[i]) { previous.vector[i] = next(); } else { // The default value of null for int types in vectorized @@ -150,7 +150,8 @@ public class BitFieldReader { // when determining the isRepeating flag. if (previous.isRepeating && i > 0 - && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) { + && ((previous.vector[0] != previous.vector[i]) || + (previous.isNull[0] != previous.isNull[i]))) { previous.isRepeating = false; } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/DataReaderProperties.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java index 49f47d6..bb73d53 100644 --- a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java +++ b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java @@ -3,7 +3,7 @@ package org.apache.orc.impl; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.orc.CompressionCodec; +import org.apache.orc.CompressionKind; import javax.annotation.Nullable; @@ -11,14 +11,18 @@ public final class DataReaderProperties { private final FileSystem fileSystem; private final Path path; - private final CompressionCodec codec; + private final CompressionKind compression; private final boolean zeroCopy; + private final int typeCount; + private final int bufferSize; private DataReaderProperties(Builder builder) { this.fileSystem = builder.fileSystem; this.path = builder.path; - this.codec = builder.codec; + this.compression = builder.compression; this.zeroCopy = builder.zeroCopy; + this.typeCount = builder.typeCount; + this.bufferSize = builder.bufferSize; } public FileSystem getFileSystem() { @@ -29,15 +33,22 @@ public final class DataReaderProperties { return path; } - @Nullable - public CompressionCodec getCodec() { - return codec; + public CompressionKind getCompression() { + return compression; } public boolean getZeroCopy() { return zeroCopy; } + public int getTypeCount() { + return typeCount; + } + + public int getBufferSize() { + return bufferSize; + } + public static Builder builder() { return new Builder(); } @@ -46,8 +57,10 @@ public final class DataReaderProperties { private FileSystem fileSystem; private Path path; - private CompressionCodec codec; + private CompressionKind compression; private boolean zeroCopy; + private int typeCount; + private int bufferSize; private Builder() { @@ -63,8 +76,8 @@ public final class DataReaderProperties { return this; } - public Builder withCodec(CompressionCodec codec) { - this.codec = codec; + public Builder withCompression(CompressionKind value) { + this.compression = value; return this; } @@ -73,6 +86,16 @@ public final class DataReaderProperties { return this; } + public Builder withTypeCount(int value) { + this.typeCount = value; + return this; + } + + public Builder withBufferSize(int value) { + this.bufferSize = value; + return this; + } + public DataReaderProperties build() { Preconditions.checkNotNull(fileSystem); Preconditions.checkNotNull(path); http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java b/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java deleted file mode 100644 index fc0d141..0000000 --- a/orc/src/java/org/apache/orc/impl/DefaultMetadataReaderFactory.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.apache.orc.impl; - -import org.apache.orc.MetadataReaderFactory; - -import java.io.IOException; - -public final class DefaultMetadataReaderFactory implements MetadataReaderFactory { - - @Override - public MetadataReader create(MetadataReaderProperties properties) throws IOException { - return new MetadataReaderImpl(properties); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/InStream.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/InStream.java b/orc/src/java/org/apache/orc/impl/InStream.java index 1893afe..851f645 100644 --- a/orc/src/java/org/apache/orc/impl/InStream.java +++ b/orc/src/java/org/apache/orc/impl/InStream.java @@ -53,6 +53,9 @@ public abstract class InStream extends InputStream { return length; } + @Override + public abstract void close(); + public static class UncompressedStream extends InStream { private List<DiskRange> bytes; private long length; @@ -423,7 +426,6 @@ public abstract class InStream extends InputStream { /** * Create an input stream from a list of buffers. - * @param fileName name of the file * @param streamName the name of the stream * @param buffers the list of ranges of bytes for the stream * @param offsets a list of offsets (the same length as input) that must http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/IntegerReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java index 7dfd289..8bef0f1 100644 --- a/orc/src/java/org/apache/orc/impl/IntegerReader.java +++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java @@ -20,7 +20,7 @@ package org.apache.orc.impl; import java.io.IOException; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; /** * Interface for reading integers. @@ -57,9 +57,25 @@ public interface IntegerReader { /** * Return the next available vector for values. - * @return + * @param column the column being read + * @param data the vector to read into + * @param length the number of numbers to read + * @throws IOException + */ + void nextVector(ColumnVector column, + long[] data, + int length + ) throws IOException; + + /** + * Return the next available vector for values. Does not change the + * value of column.isRepeating. + * @param column the column being read + * @param data the vector to read into + * @param length the number of numbers to read * @throws IOException */ - void nextVector(LongColumnVector previous, final int previousLen) - throws IOException; -} + void nextVector(ColumnVector column, + int[] data, + int length + ) throws IOException;} http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/MetadataReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/MetadataReader.java b/orc/src/java/org/apache/orc/impl/MetadataReader.java deleted file mode 100644 index 500239d..0000000 --- a/orc/src/java/org/apache/orc/impl/MetadataReader.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.orc.OrcProto; -import org.apache.orc.StripeInformation; - -public interface MetadataReader extends Closeable { - OrcIndex readRowIndex(StripeInformation stripe, - OrcProto.StripeFooter footer, - boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns, - OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException; - - OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java deleted file mode 100644 index c3ea74f..0000000 --- a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.io.DiskRange; -import org.apache.orc.CompressionCodec; -import org.apache.orc.OrcProto; -import org.apache.orc.StripeInformation; - -import com.google.common.collect.Lists; - -public class MetadataReaderImpl implements MetadataReader { - private final FSDataInputStream file; - private final CompressionCodec codec; - private final int bufferSize; - private final int typeCount; - - MetadataReaderImpl(MetadataReaderProperties properties) throws IOException { - this.file = properties.getFileSystem().open(properties.getPath()); - this.codec = properties.getCodec(); - this.bufferSize = properties.getBufferSize(); - this.typeCount = properties.getTypeCount(); - } - - @Override - public OrcIndex readRowIndex(StripeInformation stripe, - OrcProto.StripeFooter footer, boolean[] included, OrcProto.RowIndex[] indexes, - boolean[] sargColumns, OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException { - if (footer == null) { - footer = readStripeFooter(stripe); - } - if (indexes == null) { - indexes = new OrcProto.RowIndex[typeCount]; - } - if (bloomFilterIndices == null) { - bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount]; - } - long offset = stripe.getOffset(); - List<OrcProto.Stream> streams = footer.getStreamsList(); - for (int i = 0; i < streams.size(); i++) { - OrcProto.Stream stream = streams.get(i); - OrcProto.Stream nextStream = null; - if (i < streams.size() - 1) { - nextStream = streams.get(i+1); - } - int col = stream.getColumn(); - int len = (int) stream.getLength(); - // row index stream and bloom filter are interlaced, check if the sarg column contains bloom - // filter and combine the io to read row index and bloom filters for that column together - if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) { - boolean readBloomFilter = false; - if (sargColumns != null && sargColumns[col] && - nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) { - len += nextStream.getLength(); - i += 1; - readBloomFilter = true; - } - if ((included == null || included[col]) && indexes[col] == null) { - byte[] buffer = new byte[len]; - file.readFully(offset, buffer, 0, buffer.length); - ByteBuffer bb = ByteBuffer.wrap(buffer); - indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index", - Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(), - codec, bufferSize)); - if (readBloomFilter) { - bb.position((int) stream.getLength()); - bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create( - "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), - nextStream.getLength(), codec, bufferSize)); - } - } - } - offset += len; - } - - OrcIndex index = new OrcIndex(indexes, bloomFilterIndices); - return index; - } - - @Override - public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException { - long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength(); - int tailLength = (int) stripe.getFooterLength(); - - // read the footer - ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); - file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); - return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer", - Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)), - tailLength, codec, bufferSize)); - } - - @Override - public void close() throws IOException { - file.close(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java b/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java deleted file mode 100644 index 321931c..0000000 --- a/orc/src/java/org/apache/orc/impl/MetadataReaderProperties.java +++ /dev/null @@ -1,96 +0,0 @@ -package org.apache.orc.impl; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.orc.CompressionCodec; - -import javax.annotation.Nullable; - -public final class MetadataReaderProperties { - - private final FileSystem fileSystem; - private final Path path; - private final CompressionCodec codec; - private final int bufferSize; - private final int typeCount; - - private MetadataReaderProperties(Builder builder) { - this.fileSystem = builder.fileSystem; - this.path = builder.path; - this.codec = builder.codec; - this.bufferSize = builder.bufferSize; - this.typeCount = builder.typeCount; - } - - public FileSystem getFileSystem() { - return fileSystem; - } - - public Path getPath() { - return path; - } - - @Nullable - public CompressionCodec getCodec() { - return codec; - } - - public int getBufferSize() { - return bufferSize; - } - - public int getTypeCount() { - return typeCount; - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private FileSystem fileSystem; - private Path path; - private CompressionCodec codec; - private int bufferSize; - private int typeCount; - - private Builder() { - - } - - public Builder withFileSystem(FileSystem fileSystem) { - this.fileSystem = fileSystem; - return this; - } - - public Builder withPath(Path path) { - this.path = path; - return this; - } - - public Builder withCodec(CompressionCodec codec) { - this.codec = codec; - return this; - } - - public Builder withBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public Builder withTypeCount(int typeCount) { - this.typeCount = typeCount; - return this; - } - - public MetadataReaderProperties build() { - Preconditions.checkNotNull(fileSystem); - Preconditions.checkNotNull(path); - - return new MetadataReaderProperties(this); - } - - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java index 380f3391..24bd051 100644 --- a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java +++ b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java @@ -20,7 +20,7 @@ package org.apache.orc.impl; import java.io.EOFException; import java.io.IOException; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; /** * A reader that reads a sequence of bytes. A control byte is read before @@ -92,16 +92,16 @@ public class RunLengthByteReader { return result; } - public void nextVector(LongColumnVector previous, long previousLen) + public void nextVector(ColumnVector previous, long[] data, long size) throws IOException { previous.isRepeating = true; - for (int i = 0; i < previousLen; i++) { + for (int i = 0; i < size; i++) { if (!previous.isNull[i]) { - previous.vector[i] = next(); + data[i] = next(); } else { // The default value of null for int types in vectorized // processing is 1, so set that if the value is null - previous.vector[i] = 1; + data[i] = 1; } // The default value for nulls in Vectorization for int types is 1 @@ -109,12 +109,36 @@ public class RunLengthByteReader { // when determining the isRepeating flag. if (previous.isRepeating && i > 0 - && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) { + && ((data[0] != data[i]) || + (previous.isNull[0] != previous.isNull[i]))) { previous.isRepeating = false; } } } + /** + * Read the next size bytes into the data array, skipping over any slots + * where isNull is true. + * @param isNull if non-null, skip any rows where isNull[r] is true + * @param data the array to read into + * @param size the number of elements to read + * @throws IOException + */ + public void nextVector(boolean[] isNull, int[] data, + long size) throws IOException { + if (isNull == null) { + for(int i=0; i < size; ++i) { + data[i] = next(); + } + } else { + for(int i=0; i < size; ++i) { + if (!isNull[i]) { + data[i] = next(); + } + } + } + } + public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java index 0c90cde..b91a263 100644 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java +++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java @@ -20,7 +20,7 @@ package org.apache.orc.impl; import java.io.EOFException; import java.io.IOException; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; /** * A reader that reads a sequence of integers. @@ -99,15 +99,17 @@ public class RunLengthIntegerReader implements IntegerReader { } @Override - public void nextVector(LongColumnVector previous, final int previousLen) throws IOException { + public void nextVector(ColumnVector previous, + long[] data, + int previousLen) throws IOException { previous.isRepeating = true; for (int i = 0; i < previousLen; i++) { if (!previous.isNull[i]) { - previous.vector[i] = next(); + data[i] = next(); } else { // The default value of null for int type in vectorized // processing is 1, so set that if the value is null - previous.vector[i] = 1; + data[i] = 1; } // The default value for nulls in Vectorization for int types is 1 @@ -115,13 +117,32 @@ public class RunLengthIntegerReader implements IntegerReader { // when determining the isRepeating flag. if (previous.isRepeating && i > 0 - && (previous.vector[i - 1] != previous.vector[i] || previous.isNull[i - 1] != previous.isNull[i])) { + && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) { previous.isRepeating = false; } } } @Override + public void nextVector(ColumnVector vector, + int[] data, + int size) throws IOException { + if (vector.noNulls) { + for(int r=0; r < data.length && r < size; ++r) { + data[r] = (int) next(); + } + } else if (!(vector.isRepeating && vector.isNull[0])) { + for(int r=0; r < data.length && r < size; ++r) { + if (!vector.isNull[r]) { + data[r] = (int) next(); + } else { + data[r] = 1; + } + } + } + } + + @Override public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java index c6d685a..610d9b5 100644 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java +++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java @@ -21,9 +21,9 @@ import java.io.EOFException; import java.io.IOException; import java.util.Arrays; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; /** * A reader that reads a sequence of light weight compressed integers. Refer @@ -360,15 +360,17 @@ public class RunLengthIntegerReaderV2 implements IntegerReader { } @Override - public void nextVector(LongColumnVector previous, final int previousLen) throws IOException { + public void nextVector(ColumnVector previous, + long[] data, + int previousLen) throws IOException { previous.isRepeating = true; for (int i = 0; i < previousLen; i++) { if (!previous.isNull[i]) { - previous.vector[i] = next(); + data[i] = next(); } else { // The default value of null for int type in vectorized // processing is 1, so set that if the value is null - previous.vector[i] = 1; + data[i] = 1; } // The default value for nulls in Vectorization for int types is 1 @@ -376,10 +378,29 @@ public class RunLengthIntegerReaderV2 implements IntegerReader { // when determining the isRepeating flag. if (previous.isRepeating && i > 0 - && (previous.vector[i - 1] != previous.vector[i] || - previous.isNull[i - 1] != previous.isNull[i])) { + && (data[0] != data[i] || + previous.isNull[0] != previous.isNull[i])) { previous.isRepeating = false; } } } + + @Override + public void nextVector(ColumnVector vector, + int[] data, + int size) throws IOException { + if (vector.noNulls) { + for(int r=0; r < data.length && r < size; ++r) { + data[r] = (int) next(); + } + } else if (!(vector.isRepeating && vector.isNull[0])) { + for(int r=0; r < data.length && r < size; ++r) { + if (!vector.isNull[r]) { + data[r] = (int) next(); + } else { + data[r] = 1; + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/java/org/apache/orc/impl/WriterImpl.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java index f8afe06..b2966e0 100644 --- a/orc/src/java/org/apache/orc/impl/WriterImpl.java +++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java @@ -1693,9 +1693,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } + public static long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; + public static long NANOS_PER_MILLI = 1000000; public static final int MILLIS_PER_SECOND = 1000; static final int NANOS_PER_SECOND = 1000000000; - static final int MILLIS_PER_NANO = 1000000; public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00"; private static class TimestampTreeWriter extends TreeWriter { @@ -2261,32 +2262,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } else { // write the records in runs of the same tag - byte prevTag = 0; - int currentRun = 0; - boolean started = false; + int[] currentStart = new int[vec.fields.length]; + int[] currentLength = new int[vec.fields.length]; for(int i=0; i < length; ++i) { - if (!vec.isNull[i + offset]) { + // only need to deal with the non-nulls, since the nulls were dealt + // with in the super method. + if (vec.noNulls || !vec.isNull[i + offset]) { byte tag = (byte) vec.tags[offset + i]; tags.write(tag); - if (!started) { - started = true; - currentRun = i; - prevTag = tag; - } else if (tag != prevTag) { - childrenWriters[prevTag].writeBatch(vec.fields[prevTag], - offset + currentRun, i - currentRun); - currentRun = i; - prevTag = tag; + if (currentLength[tag] == 0) { + // start a new sequence + currentStart[tag] = i + offset; + currentLength[tag] = 1; + } else if (currentStart[tag] + currentLength[tag] == i + offset) { + // ok, we are extending the current run for that tag. + currentLength[tag] += 1; + } else { + // otherwise, we need to close off the old run and start a new one + childrenWriters[tag].writeBatch(vec.fields[tag], + currentStart[tag], currentLength[tag]); + currentStart[tag] = i + offset; + currentLength[tag] = 1; } - } else if (started) { - started = false; - childrenWriters[prevTag].writeBatch(vec.fields[prevTag], - offset + currentRun, i - currentRun); } } - if (started) { - childrenWriters[prevTag].writeBatch(vec.fields[prevTag], - offset + currentRun, length - currentRun); + // write out any left over sequences + for(int tag=0; tag < currentStart.length; ++tag) { + if (currentLength[tag] != 0) { + childrenWriters[tag].writeBatch(vec.fields[tag], currentStart[tag], + currentLength[tag]); + } } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java ---------------------------------------------------------------------- diff --git a/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java b/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java index 9ec08f3..b9918f2 100644 --- a/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java +++ b/orc/src/test/org/apache/orc/impl/TestDataReaderProperties.java @@ -3,6 +3,7 @@ package org.apache.orc.impl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.orc.CompressionCodec; +import org.apache.orc.CompressionKind; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -14,7 +15,6 @@ public class TestDataReaderProperties { private FileSystem mockedFileSystem = mock(FileSystem.class); private Path mockedPath = mock(Path.class); - private CompressionCodec mockedCodec = mock(CompressionCodec.class); private boolean mockedZeroCopy = false; @Test @@ -22,12 +22,12 @@ public class TestDataReaderProperties { DataReaderProperties properties = DataReaderProperties.builder() .withFileSystem(mockedFileSystem) .withPath(mockedPath) - .withCodec(mockedCodec) + .withCompression(CompressionKind.ZLIB) .withZeroCopy(mockedZeroCopy) .build(); assertEquals(mockedFileSystem, properties.getFileSystem()); assertEquals(mockedPath, properties.getPath()); - assertEquals(mockedCodec, properties.getCodec()); + assertEquals(CompressionKind.ZLIB, properties.getCompression()); assertEquals(mockedZeroCopy, properties.getZeroCopy()); } @@ -39,7 +39,7 @@ public class TestDataReaderProperties { .build(); assertEquals(mockedFileSystem, properties.getFileSystem()); assertEquals(mockedPath, properties.getPath()); - assertNull(properties.getCodec()); + assertNull(properties.getCompression()); assertFalse(properties.getZeroCopy()); } @@ -52,7 +52,7 @@ public class TestDataReaderProperties { public void testMissingPath() { DataReaderProperties.builder() .withFileSystem(mockedFileSystem) - .withCodec(mockedCodec) + .withCompression(CompressionKind.NONE) .withZeroCopy(mockedZeroCopy) .build(); } @@ -61,7 +61,7 @@ public class TestDataReaderProperties { public void testMissingFileSystem() { DataReaderProperties.builder() .withPath(mockedPath) - .withCodec(mockedCodec) + .withCompression(CompressionKind.NONE) .withZeroCopy(mockedZeroCopy) .build(); } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java ---------------------------------------------------------------------- diff --git a/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java b/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java deleted file mode 100644 index 12e8eb4..0000000 --- a/orc/src/test/org/apache/orc/impl/TestMetadataReaderProperties.java +++ /dev/null @@ -1,72 +0,0 @@ -package org.apache.orc.impl; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.orc.CompressionCodec; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.mock; - -public class TestMetadataReaderProperties { - - private FileSystem mockedFileSystem = mock(FileSystem.class); - private Path mockedPath = mock(Path.class); - private CompressionCodec mockedCodec = mock(CompressionCodec.class); - private int mockedBufferSize = 0; - private int mockedTypeCount = 0; - - @Test - public void testCompleteBuild() { - MetadataReaderProperties properties = MetadataReaderProperties.builder() - .withFileSystem(mockedFileSystem) - .withPath(mockedPath) - .withCodec(mockedCodec) - .withBufferSize(mockedBufferSize) - .withTypeCount(mockedTypeCount) - .build(); - assertEquals(mockedFileSystem, properties.getFileSystem()); - assertEquals(mockedPath, properties.getPath()); - assertEquals(mockedCodec, properties.getCodec()); - assertEquals(mockedBufferSize, properties.getBufferSize()); - assertEquals(mockedTypeCount, properties.getTypeCount()); - } - - @Test - public void testMissingNonRequiredArgs() { - MetadataReaderProperties properties = MetadataReaderProperties.builder() - .withFileSystem(mockedFileSystem) - .withPath(mockedPath) - .build(); - assertEquals(mockedFileSystem, properties.getFileSystem()); - assertEquals(mockedPath, properties.getPath()); - assertNull(properties.getCodec()); - assertEquals(0, properties.getBufferSize()); - assertEquals(0, properties.getTypeCount()); - } - - @Test(expected = java.lang.NullPointerException.class) - public void testEmptyBuild() { - MetadataReaderProperties.builder().build(); - } - - @Test(expected = java.lang.NullPointerException.class) - public void testMissingPath() { - MetadataReaderProperties.builder() - .withFileSystem(mockedFileSystem) - .withCodec(mockedCodec) - .withBufferSize(mockedBufferSize) - .build(); - } - - @Test(expected = java.lang.NullPointerException.class) - public void testMissingFileSystem() { - MetadataReaderProperties.builder() - .withPath(mockedPath) - .withCodec(mockedCodec) - .withBufferSize(mockedBufferSize) - .build(); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 0724191..82a97e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -215,12 +215,9 @@ public class VectorizedRowBatchCtx { LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated)); int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length; VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount); - - for (int i = 0; i < columnsToIncludeTruncated.length; i++) { - if (columnsToIncludeTruncated[i]) { - TypeInfo typeInfo = rowColumnTypeInfos[i]; - result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo); - } + for (int i = 0; i < dataColumnCount; i++) { + TypeInfo typeInfo = rowColumnTypeInfos[i]; + result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo); } for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) { @@ -476,8 +473,8 @@ public class VectorizedRowBatchCtx { bcv.isNull[0] = true; bcv.isRepeating = true; } else { - bcv.fill(sVal.getBytes()); - bcv.isNull[0] = false; + bcv.setVal(0, sVal.getBytes()); + bcv.isRepeating = true; } } break; http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java deleted file mode 100644 index de3471c..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DefaultDataReaderFactory.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.apache.hadoop.hive.ql.io.orc; - -import org.apache.orc.DataReader; -import org.apache.orc.DataReaderFactory; -import org.apache.orc.impl.DataReaderProperties; - -public final class DefaultDataReaderFactory implements DataReaderFactory { - - @Override - public DataReader create(DataReaderProperties properties) { - return RecordReaderUtils.createDefaultDataReader(properties); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index fe0be7b..fcb8ca4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -301,7 +301,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ false); + TypeDescription schema = getDesiredRowTypeDescr(conf, false, Integer.MAX_VALUE); Reader.Options options = new Reader.Options().range(offset, length); options.schema(schema); @@ -1743,7 +1743,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ true); + TypeDescription schema = getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); final Reader reader; final int bucket; @@ -1994,10 +1994,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, /** * Convert a Hive type property string that contains separated type names into a list of * TypeDescription objects. + * @param hiveTypeProperty the desired types from hive + * @param maxColumns the maximum number of desired columns * @return the list of TypeDescription objects. */ - public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty( - String hiveTypeProperty) { + public static ArrayList<TypeDescription> + typeDescriptionsFromHiveTypeProperty(String hiveTypeProperty, + int maxColumns) { // CONSDIER: We need a type name parser for TypeDescription. @@ -2005,6 +2008,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size()); for (TypeInfo typeInfo : typeInfoList) { typeDescrList.add(convertTypeInfo(typeInfo)); + if (typeDescrList.size() >= maxColumns) { + break; + } } return typeDescrList; } @@ -2091,8 +2097,18 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } } - public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcidRead) - throws IOException { + /** + * Generate the desired schema for reading the file. + * @param conf the configuration + * @param isAcidRead is this an acid format? + * @param dataColumns the desired number of data columns for vectorized read + * @return the desired schema or null if schema evolution isn't enabled + * @throws IOException + */ + public static TypeDescription getDesiredRowTypeDescr(Configuration conf, + boolean isAcidRead, + int dataColumns + ) throws IOException { String columnNameProperty = null; String columnTypeProperty = null; @@ -2115,8 +2131,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, haveSchemaEvolutionProperties = false; } else { schemaEvolutionTypeDescrs = - typeDescriptionsFromHiveTypeProperty(columnTypeProperty); - if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) { + typeDescriptionsFromHiveTypeProperty(columnTypeProperty, + dataColumns); + if (schemaEvolutionTypeDescrs.size() != + Math.min(dataColumns, schemaEvolutionColumnNames.size())) { haveSchemaEvolutionProperties = false; } } @@ -2147,8 +2165,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, return null; } schemaEvolutionTypeDescrs = - typeDescriptionsFromHiveTypeProperty(columnTypeProperty); - if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) { + typeDescriptionsFromHiveTypeProperty(columnTypeProperty, dataColumns); + if (schemaEvolutionTypeDescrs.size() != + Math.min(dataColumns, schemaEvolutionColumnNames.size())) { return null; } @@ -2162,7 +2181,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } columnNum++; } - if (virtualColumnClipNum != -1) { + if (virtualColumnClipNum != -1 && virtualColumnClipNum < dataColumns) { schemaEvolutionColumnNames = Lists.newArrayList(schemaEvolutionColumnNames.subList(0, virtualColumnClipNum)); schemaEvolutionTypeDescrs = Lists.newArrayList(schemaEvolutionTypeDescrs.subList(0, virtualColumnClipNum)); @@ -2179,7 +2198,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, // Desired schema does not include virtual columns or partition columns. TypeDescription result = TypeDescription.createStruct(); - for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) { + for (int i = 0; i < schemaEvolutionTypeDescrs.size(); i++) { result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i)); } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 1fce282..0dd58b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -447,7 +447,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ this.length = options.getLength(); this.validTxnList = validTxnList; - TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ true); + TypeDescription typeDescr = + OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); objectInspector = OrcRecordUpdater.createEventSchema (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 822ef14..b7437be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -23,23 +23,20 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.orc.DataReaderFactory; -import org.apache.orc.MetadataReaderFactory; +import com.google.common.collect.Lists; +import org.apache.orc.OrcUtils; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.BufferChunk; import org.apache.orc.ColumnStatistics; import org.apache.orc.impl.ColumnStatisticsImpl; import org.apache.orc.CompressionCodec; -import org.apache.orc.DataReader; import org.apache.orc.FileMetaInfo; import org.apache.orc.FileMetadata; -import org.apache.orc.impl.DataReaderProperties; -import org.apache.orc.impl.DefaultMetadataReaderFactory; import org.apache.orc.impl.InStream; -import org.apache.orc.impl.MetadataReader; -import org.apache.orc.impl.MetadataReaderProperties; import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; import org.slf4j.Logger; @@ -56,8 +53,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Text; import org.apache.orc.OrcProto; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.google.protobuf.CodedInputStream; public class ReaderImpl implements Reader { @@ -75,13 +70,12 @@ public class ReaderImpl implements Reader { private final List<OrcProto.StripeStatistics> stripeStats; private final int metadataSize; protected final List<OrcProto.Type> types; + private final TypeDescription schema; private final List<OrcProto.UserMetadataItem> userMetadata; private final List<OrcProto.ColumnStatistics> fileStats; private final List<StripeInformation> stripes; protected final int rowIndexStride; private final long contentLength, numberOfRows; - private final MetadataReaderFactory metadataReaderFactory = new DefaultMetadataReaderFactory(); - private final DataReaderFactory dataReaderFactory = new DefaultDataReaderFactory(); private final ObjectInspector inspector; private long deserializedSize = -1; @@ -248,6 +242,11 @@ public class ReaderImpl implements Reader { return result; } + @Override + public TypeDescription getSchema() { + return schema; + } + /** * Ensure this is an ORC file to prevent users from trying to read text * files or RC files as ORC files. @@ -391,11 +390,13 @@ public class ReaderImpl implements Reader { this.writerVersion = footerMetaData.writerVersion; this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList()); } + this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0); } + /** * Get the WriterVersion based on the ORC file postscript. * @param writerVersion the integer writer version - * @return + * @return the writer version of the file */ static OrcFile.WriterVersion getWriterVersion(int writerVersion) { for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) { @@ -672,20 +673,7 @@ public class ReaderImpl implements Reader { Arrays.fill(include, true); options.include(include); } - - return RecordReaderImpl.builder() - .withMetadataReaderFactory(metadataReaderFactory) - .withDataReaderFactory(dataReaderFactory) - .withStripes(this.getStripes()) - .withFileSystem(fileSystem) - .withPath(path) - .withOptions(options) - .withTypes(types) - .withCodec(codec) - .withBufferSize(bufferSize) - .withStrideRate(rowIndexStride) - .withConf(conf) - .build(); + return new RecordReaderImpl(this, options); } @@ -837,7 +825,7 @@ public class ReaderImpl implements Reader { } private int getLastIdx() { - Set<Integer> indices = Sets.newHashSet(); + Set<Integer> indices = new HashSet<>(); for (OrcProto.Type type : types) { indices.addAll(type.getSubtypesList()); } @@ -856,7 +844,7 @@ public class ReaderImpl implements Reader { @Override public List<StripeStatistics> getStripeStatistics() { - List<StripeStatistics> result = Lists.newArrayList(); + List<StripeStatistics> result = new ArrayList<>(); for (OrcProto.StripeStatistics ss : stripeStats) { result.add(new StripeStatistics(ss.getColStatsList())); } @@ -868,17 +856,6 @@ public class ReaderImpl implements Reader { } @Override - public MetadataReader metadata() throws IOException { - return metadataReaderFactory.create(MetadataReaderProperties.builder() - .withBufferSize(bufferSize) - .withCodec(codec) - .withFileSystem(fileSystem) - .withPath(path) - .withTypeCount(types.size()) - .build()); - } - - @Override public List<Integer> getVersionList() { return versionList; } @@ -889,16 +866,6 @@ public class ReaderImpl implements Reader { } @Override - public DataReader createDefaultDataReader(boolean useZeroCopy) { - return dataReaderFactory.create(DataReaderProperties.builder() - .withFileSystem(fileSystem) - .withPath(path) - .withCodec(codec) - .withZeroCopy(useZeroCopy) - .build()); - } - - @Override public String toString() { StringBuilder buffer = new StringBuilder(); buffer.append("ORC Reader(");