Repository: orc Updated Branches: refs/heads/master 65c5eeff1 -> 59c359dfc
ORC-311: [C++] Fix null pointers when StripeFooter corrupts Fixes #223 Signed-off-by: Deepak Majeti <mdee...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/59c359df Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/59c359df Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/59c359df Branch: refs/heads/master Commit: 59c359dfceeafe26898dfc635b5fac679e51d277 Parents: 65c5eef Author: stiga-huang <huangquanl...@gmail.com> Authored: Fri Mar 2 06:53:44 2018 -0800 Committer: Deepak Majeti <mdee...@apache.org> Committed: Wed Mar 7 21:21:51 2018 -0500 ---------------------------------------------------------------------- c++/src/ColumnReader.cc | 136 ++++++++++++++++++++++++------------------- 1 file changed, 77 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/59c359df/c++/src/ColumnReader.cc ---------------------------------------------------------------------- diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc index 24f0820..ee2d80d 100644 --- a/c++/src/ColumnReader.cc +++ b/c++/src/ColumnReader.cc @@ -146,9 +146,11 @@ namespace orc { BooleanColumnReader::BooleanColumnReader(const Type& type, StripeStreams& stripe ): ColumnReader(type, stripe){ - rle = createBooleanRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_DATA, - true)); + std::unique_ptr<SeekableInputStream> stream = + stripe.getStream(columnId, proto::Stream_Kind_DATA, true); + if (stream == nullptr) + throw ParseError("DATA stream not found in Boolean column"); + rle = createBooleanRleDecoder(std::move(stream)); } BooleanColumnReader::~BooleanColumnReader() { @@ -191,9 +193,11 @@ namespace orc { ByteColumnReader::ByteColumnReader(const Type& type, StripeStreams& stripe ): ColumnReader(type, stripe){ - rle = createByteRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_DATA, - true)); + std::unique_ptr<SeekableInputStream> stream = + stripe.getStream(columnId, proto::Stream_Kind_DATA, true); + if (stream == nullptr) + throw ParseError("DATA stream not found in Byte column"); + rle = createByteRleDecoder(std::move(stream)); } ByteColumnReader::~ByteColumnReader() { @@ -237,10 +241,11 @@ namespace orc { StripeStreams& stripe ): ColumnReader(type, stripe) { RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); - rle = createRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_DATA, - true), - true, vers, memoryPool); + std::unique_ptr<SeekableInputStream> stream = + stripe.getStream(columnId, proto::Stream_Kind_DATA, true); + if (stream == nullptr) + throw ParseError("DATA stream not found in Integer column"); + rle = createRleDecoder(std::move(stream), true, vers, memoryPool); } IntegerColumnReader::~IntegerColumnReader() { @@ -286,14 +291,15 @@ namespace orc { writerTimezone(stripe.getWriterTimezone()), epochOffset(writerTimezone.getEpoch()) { RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); - secondsRle = createRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_DATA, - true), - true, vers, memoryPool); - nanoRle = createRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_SECONDARY, - true), - false, vers, memoryPool); + std::unique_ptr<SeekableInputStream> stream = + stripe.getStream(columnId, proto::Stream_Kind_DATA, true); + if (stream == nullptr) + throw ParseError("DATA stream not found in Timestamp column"); + secondsRle = createRleDecoder(std::move(stream), true, vers, memoryPool); + stream = stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true); + if (stream == nullptr) + throw ParseError("SECONDARY stream not found in Timestamp column"); + nanoRle = createRleDecoder(std::move(stream), false, vers, memoryPool); } TimestampColumnReader::~TimestampColumnReader() { @@ -391,17 +397,14 @@ namespace orc { DoubleColumnReader::DoubleColumnReader(const Type& type, StripeStreams& stripe ): ColumnReader(type, stripe), - inputStream - (stripe.getStream - (columnId, - proto::Stream_Kind_DATA, - true)), columnKind(type.getKind()), bytesPerValue((type.getKind() == FLOAT) ? 4 : 8), bufferPointer(nullptr), bufferEnd(nullptr) { - // PASS + inputStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true); + if (inputStream == nullptr) + throw ParseError("DATA stream not found in Double column"); } DoubleColumnReader::~DoubleColumnReader() { @@ -413,11 +416,10 @@ namespace orc { if (static_cast<size_t>(bufferEnd - bufferPointer) >= bytesPerValue * numValues) { - bufferPointer+= bytesPerValue*numValues; + bufferPointer += bytesPerValue * numValues; } else { - inputStream->Skip(static_cast<int>(bytesPerValue*numValues - - static_cast<size_t>(bufferEnd - - bufferPointer))); + inputStream->Skip(static_cast<int>(bytesPerValue * numValues - + static_cast<size_t>(bufferEnd - bufferPointer))); bufferEnd = nullptr; bufferPointer = nullptr; } @@ -503,28 +505,34 @@ namespace orc { RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId) .kind()); dictionaryCount = stripe.getEncoding(columnId).dictionarysize(); - rle = createRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_DATA, - true), - false, rleVersion, memoryPool); + std::unique_ptr<SeekableInputStream> stream = + stripe.getStream(columnId, proto::Stream_Kind_DATA, true); + if (stream == nullptr) + throw ParseError("DATA stream not found in StringDictionaryColumn"); + rle = createRleDecoder(std::move(stream), false, rleVersion, memoryPool); + stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false); + if (dictionaryCount > 0 && stream == nullptr) { + throw ParseError("LENGTH stream not found in StringDictionaryColumn"); + } std::unique_ptr<RleDecoder> lengthDecoder = - createRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_LENGTH, - false), - false, rleVersion, memoryPool); + createRleDecoder(std::move(stream), false, rleVersion, memoryPool); dictionaryOffset.resize(dictionaryCount+1); int64_t* lengthArray = dictionaryOffset.data(); lengthDecoder->next(lengthArray + 1, dictionaryCount, nullptr); lengthArray[0] = 0; - for(uint64_t i=1; i < dictionaryCount + 1; ++i) { + for (uint64_t i = 1; i < dictionaryCount + 1; ++i) { if (lengthArray[i] < 0) throw ParseError("Negative dictionary entry length"); - lengthArray[i] += lengthArray[i-1]; + lengthArray[i] += lengthArray[i - 1]; } int64_t blobSize = lengthArray[dictionaryCount]; dictionaryBlob.resize(static_cast<uint64_t>(blobSize)); std::unique_ptr<SeekableInputStream> blobStream = - stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false); + stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false); + if (blobSize > 0 && blobStream == nullptr) { + throw ParseError( + "DICTIONARY_DATA stream not found in StringDictionaryColumn"); + } readFully(dictionaryBlob.data(), blobSize, blobStream.get()); } @@ -611,11 +619,15 @@ namespace orc { blobBuffer(stripe.getMemoryPool()) { RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId) .kind()); - lengthRle = createRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_LENGTH, - true), - false, rleVersion, memoryPool); + std::unique_ptr<SeekableInputStream> stream = + stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true); + if (stream == nullptr) + throw ParseError("LENGTH stream not found in StringDirectColumn"); + lengthRle = createRleDecoder( + std::move(stream), false, rleVersion, memoryPool); blobStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true); + if (blobStream == nullptr) + throw ParseError("DATA stream not found in StringDirectColumn"); lastBuffer = nullptr; lastBufferLength = 0; } @@ -847,10 +859,11 @@ namespace orc { // count the number of selected sub-columns const std::vector<bool> selectedColumns = stripe.getSelectedColumns(); RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); - rle = createRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_LENGTH, - true), - false, vers, memoryPool); + std::unique_ptr<SeekableInputStream> stream = + stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true); + if (stream == nullptr) + throw ParseError("LENGTH stream not found in List column"); + rle = createRleDecoder(std::move(stream), false, vers, memoryPool); const Type& childType = *type.getSubtype(0); if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) { child = buildReader(childType, stripe); @@ -940,10 +953,11 @@ namespace orc { // Determine if the key and/or value columns are selected const std::vector<bool> selectedColumns = stripe.getSelectedColumns(); RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); - rle = createRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_LENGTH, - true), - false, vers, memoryPool); + std::unique_ptr<SeekableInputStream> stream = + stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true); + if (stream == nullptr) + throw ParseError("LENGTH stream not found in Map column"); + rle = createRleDecoder(std::move(stream), false, vers, memoryPool); const Type& keyType = *type.getSubtype(0); if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) { keyReader = buildReader(keyType, stripe); @@ -1049,9 +1063,11 @@ namespace orc { childrenReader.resize(numChildren); childrenCounts.resize(numChildren); - rle = createByteRleDecoder(stripe.getStream(columnId, - proto::Stream_Kind_DATA, - true)); + std::unique_ptr<SeekableInputStream> stream = + stripe.getStream(columnId, proto::Stream_Kind_DATA, true); + if (stream == nullptr) + throw ParseError("LENGTH stream not found in Union column"); + rle = createByteRleDecoder(std::move(stream)); // figure out which types are selected const std::vector<bool> selectedColumns = stripe.getSelectedColumns(); for(unsigned int i=0; i < numChildren; ++i) { @@ -1232,14 +1248,16 @@ namespace orc { scale = static_cast<int32_t>(type.getScale()); precision = static_cast<int32_t>(type.getPrecision()); valueStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true); + if (valueStream == nullptr) + throw ParseError("DATA stream not found in Decimal64Column"); buffer = nullptr; bufferEnd = nullptr; RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); - scaleDecoder = createRleDecoder(stripe.getStream - (columnId, - proto::Stream_Kind_SECONDARY, - true), - true, vers, memoryPool); + std::unique_ptr<SeekableInputStream> stream = + stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true); + if (stream == nullptr) + throw ParseError("SECONDARY stream not found in Decimal64Column"); + scaleDecoder = createRleDecoder(std::move(stream), true, vers, memoryPool); } Decimal64ColumnReader::~Decimal64ColumnReader() {