ORC-343: Enable C++ writer to support RleV2 Fixes #273
Signed-off-by: Gang Wu <gan...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/f31c80bd Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/f31c80bd Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/f31c80bd Branch: refs/heads/master Commit: f31c80bd8d786695a4e1e3f3146e42703ec1b207 Parents: 533e0c4 Author: Yurui Zhou <yurui....@alibaba-inc.com> Authored: Fri Mar 9 13:23:37 2018 +0800 Committer: Gang Wu <gan...@alibaba-inc.com> Committed: Tue Jun 12 15:59:10 2018 -0700 ---------------------------------------------------------------------- c++/include/orc/Writer.hh | 16 + c++/src/CMakeLists.txt | 6 +- c++/src/ColumnWriter.cc | 64 +- c++/src/RLE.cc | 56 +- c++/src/RLE.hh | 45 +- c++/src/RLEV2Util.hh | 145 ++ c++/src/RLEv1.cc | 64 +- c++/src/RLEv1.hh | 43 +- c++/src/RLEv2.cc | 484 ------- c++/src/RLEv2.hh | 81 +- c++/src/RleDecoderV2.cc | 426 ++++++ c++/src/RleEncoderV2.cc | 767 +++++++++++ c++/src/Writer.cc | 18 +- c++/test/CMakeLists.txt | 4 +- c++/test/TestRLEv1Encoder.cc | 246 ---- c++/test/TestRle.cc | 2656 ------------------------------------- c++/test/TestRleDecoder.cc | 2656 +++++++++++++++++++++++++++++++++++++ c++/test/TestRleEncoder.cc | 286 ++++ c++/test/TestWriter.cc | 107 +- 19 files changed, 4605 insertions(+), 3565 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/include/orc/Writer.hh ---------------------------------------------------------------------- diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh index c91399a..cdda922 100644 --- a/c++/include/orc/Writer.hh +++ b/c++/include/orc/Writer.hh @@ -38,6 +38,11 @@ namespace orc { CompressionStrategy_COMPRESSION }; + enum RleVersion { + RleVersion_1 = 0, + RleVersion_2 = 1 + }; + class Timezone; /** @@ -132,6 +137,12 @@ namespace orc { CompressionStrategy getCompressionStrategy() const; /** + * Get if the bitpacking should be aligned. + * @return true if should be aligned, return false otherwise + */ + bool getAlignedBitpacking() const; + + /** * Set the padding tolerance. */ WriterOptions& setPaddingTolerance(double tolerance); @@ -165,6 +176,11 @@ namespace orc { std::ostream * getErrorStream() const; /** + * Get the RLE version. + */ + RleVersion getRleVersion() const; + + /** * Get whether or not to write row group index * @return if not set, the default is false */ http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt index 695a0ae..c3d5993 100644 --- a/c++/src/CMakeLists.txt +++ b/c++/src/CMakeLists.txt @@ -179,15 +179,15 @@ set(SOURCE_FILES OrcFile.cc Reader.cc RLEv1.cc - RLEv2.cc + RleDecoderV2.cc + RleEncoderV2.cc RLE.cc Statistics.cc StripeStream.cc Timezone.cc TypeImpl.cc Vector.cc - Writer.cc - ) + Writer.cc) if(ORC_CXX_HAS_THREAD_LOCAL AND BUILD_LIBHDFSPP) set(SOURCE_FILES ${SOURCE_FILES} OrcHdfsFile.cc) http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/ColumnWriter.cc ---------------------------------------------------------------------- diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc index 8b82783..eb2fc40 100644 --- a/c++/src/ColumnWriter.cc +++ b/c++/src/ColumnWriter.cc @@ -72,6 +72,19 @@ namespace orc { // PASS } + proto::ColumnEncoding_Kind RleVersionMapper(RleVersion rleVersion) + { + switch (rleVersion) + { + case RleVersion_1: + return proto::ColumnEncoding_Kind_DIRECT; + case RleVersion_2: + return proto::ColumnEncoding_Kind_DIRECT_V2; + default: + throw InvalidArgument("Invalid param"); + } + } + ColumnWriter::ColumnWriter( const Type& type, const StreamsFactory& factory, @@ -399,14 +412,15 @@ namespace orc { const StreamsFactory& factory, const WriterOptions& options) : ColumnWriter(type, factory, options), - rleVersion(RleVersion_1) { + rleVersion(options.getRleVersion()) { std::unique_ptr<BufferedOutputStream> dataStream = factory.createStream(proto::Stream_Kind_DATA); rleEncoder = createRleEncoder( std::move(dataStream), true, rleVersion, - memPool); + memPool, + options.getAlignedBitpacking()); if (enableIndex) { recordPosition(); @@ -469,9 +483,7 @@ namespace orc { void IntegerColumnWriter::getColumnEncoding( std::vector<proto::ColumnEncoding>& encodings) const { proto::ColumnEncoding encoding; - encoding.set_kind(rleVersion == RleVersion_1 ? - proto::ColumnEncoding_Kind_DIRECT : - proto::ColumnEncoding_Kind_DIRECT_V2); + encoding.set_kind(RleVersionMapper(rleVersion)); encoding.set_dictionarysize(0); encodings.push_back(encoding); } @@ -840,13 +852,14 @@ namespace orc { const StreamsFactory& factory, const WriterOptions& options) : ColumnWriter(type, factory, options), - rleVersion(RleVersion_1) { + rleVersion(options.getRleVersion()) { std::unique_ptr<BufferedOutputStream> lengthStream = factory.createStream(proto::Stream_Kind_LENGTH); lengthEncoder = createRleEncoder(std::move(lengthStream), false, rleVersion, - memPool); + memPool, + options.getAlignedBitpacking()); dataStream.reset(new AppendOnlyBufferedStream( factory.createStream(proto::Stream_Kind_DATA))); @@ -916,9 +929,7 @@ namespace orc { void StringColumnWriter::getColumnEncoding( std::vector<proto::ColumnEncoding>& encodings) const { proto::ColumnEncoding encoding; - encoding.set_kind(rleVersion == RleVersion_1 ? - proto::ColumnEncoding_Kind_DIRECT : - proto::ColumnEncoding_Kind_DIRECT_V2); + encoding.set_kind(RleVersionMapper(rleVersion)); encoding.set_dictionarysize(0); encodings.push_back(encoding); } @@ -1131,7 +1142,7 @@ namespace orc { const StreamsFactory& factory, const WriterOptions& options) : ColumnWriter(type, factory, options), - rleVersion(RleVersion_1), + rleVersion(options.getRleVersion()), timezone(getTimezoneByName("GMT")){ std::unique_ptr<BufferedOutputStream> dataStream = factory.createStream(proto::Stream_Kind_DATA); @@ -1140,11 +1151,13 @@ namespace orc { secRleEncoder = createRleEncoder(std::move(dataStream), true, rleVersion, - memPool); + memPool, + options.getAlignedBitpacking()); nanoRleEncoder = createRleEncoder(std::move(secondaryStream), false, rleVersion, - memPool); + memPool, + options.getAlignedBitpacking()); if (enableIndex) { recordPosition(); @@ -1241,9 +1254,7 @@ namespace orc { void TimestampColumnWriter::getColumnEncoding( std::vector<proto::ColumnEncoding>& encodings) const { proto::ColumnEncoding encoding; - encoding.set_kind(rleVersion == RleVersion_1 ? - proto::ColumnEncoding_Kind_DIRECT : - proto::ColumnEncoding_Kind_DIRECT_V2); + encoding.set_kind(RleVersionMapper(rleVersion)); encoding.set_dictionarysize(0); encodings.push_back(encoding); } @@ -1344,7 +1355,7 @@ namespace orc { const StreamsFactory& factory, const WriterOptions& options) : ColumnWriter(type, factory, options), - rleVersion(RleVersion_1), + rleVersion(options.getRleVersion()), precision(type.getPrecision()), scale(type.getScale()) { valueStream.reset(new AppendOnlyBufferedStream( @@ -1354,7 +1365,8 @@ namespace orc { scaleEncoder = createRleEncoder(std::move(scaleStream), true, rleVersion, - memPool); + memPool, + options.getAlignedBitpacking()); if (enableIndex) { recordPosition(); @@ -1435,7 +1447,7 @@ namespace orc { void Decimal64ColumnWriter::getColumnEncoding( std::vector<proto::ColumnEncoding>& encodings) const { proto::ColumnEncoding encoding; - encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_kind(RleVersionMapper(rleVersion)); encoding.set_dictionarysize(0); encodings.push_back(encoding); } @@ -1575,14 +1587,15 @@ namespace orc { const StreamsFactory& factory, const WriterOptions& options) : ColumnWriter(type, factory, options), - rleVersion(RleVersion_1){ + rleVersion(options.getRleVersion()){ std::unique_ptr<BufferedOutputStream> lengthStream = factory.createStream(proto::Stream_Kind_LENGTH); lengthEncoder = createRleEncoder(std::move(lengthStream), false, rleVersion, - memPool); + memPool, + options.getAlignedBitpacking()); if (type.getSubtypeCount() == 1) { child = buildWriter(*type.getSubtype(0), factory, options); @@ -1675,7 +1688,7 @@ namespace orc { void ListColumnWriter::getColumnEncoding( std::vector<proto::ColumnEncoding>& encodings) const { proto::ColumnEncoding encoding; - encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_kind(RleVersionMapper(rleVersion)); encoding.set_dictionarysize(0); encodings.push_back(encoding); if (child.get()) { @@ -1771,13 +1784,14 @@ namespace orc { const StreamsFactory& factory, const WriterOptions& options) : ColumnWriter(type, factory, options), - rleVersion(RleVersion_1){ + rleVersion(options.getRleVersion()){ std::unique_ptr<BufferedOutputStream> lengthStream = factory.createStream(proto::Stream_Kind_LENGTH); lengthEncoder = createRleEncoder(std::move(lengthStream), false, rleVersion, - memPool); + memPool, + options.getAlignedBitpacking()); if (type.getSubtypeCount() > 0) { keyWriter = buildWriter(*type.getSubtype(0), factory, options); @@ -1888,7 +1902,7 @@ namespace orc { void MapColumnWriter::getColumnEncoding( std::vector<proto::ColumnEncoding>& encodings) const { proto::ColumnEncoding encoding; - encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_kind(RleVersionMapper(rleVersion)); encoding.set_dictionarysize(0); encodings.push_back(encoding); if (keyWriter.get()) { http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLE.cc ---------------------------------------------------------------------- diff --git a/c++/src/RLE.cc b/c++/src/RLE.cc index f03c3ac..21f9082 100644 --- a/c++/src/RLE.cc +++ b/c++/src/RLE.cc @@ -34,13 +34,16 @@ namespace orc { (std::unique_ptr<BufferedOutputStream> output, bool isSigned, RleVersion version, - MemoryPool&) { + MemoryPool&, + bool alignedBitpacking) { switch (static_cast<int64_t>(version)) { case RleVersion_1: // We don't have std::make_unique() yet. return std::unique_ptr<RleEncoder>(new RleEncoderV1(std::move(output), isSigned)); case RleVersion_2: + return std::unique_ptr<RleEncoder>(new RleEncoderV2(std::move(output), + isSigned, alignedBitpacking)); default: throw NotImplementedYet("Not implemented yet"); } @@ -64,4 +67,55 @@ namespace orc { } } + void RleEncoder::add(const int64_t* data, uint64_t numValues, + const char* notNull) { + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull[i]) { + write(data[i]); + } + } + } + + void RleEncoder::writeVslong(int64_t val) { + writeVulong((val << 1) ^ (val >> 63)); + } + + void RleEncoder::writeVulong(int64_t val) { + while (true) { + if ((val & ~0x7f) == 0) { + writeByte(static_cast<char>(val)); + return; + } else { + writeByte(static_cast<char>(0x80 | (val & 0x7f))); + // cast val to unsigned so as to force 0-fill right shift + val = (static_cast<uint64_t>(val) >> 7); + } + } + } + + void RleEncoder::writeByte(char c) { + if (bufferPosition == bufferLength) { + int addedSize = 0; + if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) { + throw std::bad_alloc(); + } + bufferPosition = 0; + bufferLength = static_cast<size_t>(addedSize); + } + buffer[bufferPosition++] = c; + } + + void RleEncoder::recordPosition(PositionRecorder* recorder) const { + uint64_t flushedSize = outputStream->getSize(); + uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition); + if (outputStream->isCompressed()) { + recorder->add(flushedSize); + recorder->add(unflushedSize); + } else { + flushedSize -= static_cast<uint64_t>(bufferLength); + recorder->add(flushedSize + unflushedSize); + } + recorder->add(static_cast<uint64_t>(numLiterals)); + } + } // namespace orc http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLE.hh ---------------------------------------------------------------------- diff --git a/c++/src/RLE.hh b/c++/src/RLE.hh index b1d654c..3eb3489 100644 --- a/c++/src/RLE.hh +++ b/c++/src/RLE.hh @@ -26,11 +26,6 @@ namespace orc { - enum RleVersion { - RleVersion_1, - RleVersion_2 - }; - inline int64_t zigZag(int64_t value) { return (value << 1) ^ (value >> 63); } @@ -44,6 +39,18 @@ namespace orc { // must be non-inline! virtual ~RleEncoder(); + RleEncoder( + std::unique_ptr<BufferedOutputStream> outStream, + bool hasSigned): + outputStream(std::move(outStream)), + bufferPosition(0), + bufferLength(0), + numLiterals(0), + isSigned(hasSigned), + buffer(nullptr){ + //pass + } + /** * Encode the next batch of values. * @param data the array to read from @@ -52,12 +59,14 @@ namespace orc { * pointer is not null, positions that are false are skipped. */ virtual void add(const int64_t* data, uint64_t numValues, - const char* notNull) = 0; + const char* notNull); /** * Get size of buffer used so far. */ - virtual uint64_t getBufferSize() const = 0; + uint64_t getBufferSize() const { + return outputStream->getSize(); + } /** * Flushing underlying BufferedOutputStream @@ -68,7 +77,24 @@ namespace orc { * record current position * @param recorder use the recorder to record current positions */ - virtual void recordPosition(PositionRecorder* recorder) const = 0; + virtual void recordPosition(PositionRecorder* recorder) const; + + protected: + std::unique_ptr<BufferedOutputStream> outputStream; + size_t bufferPosition; + size_t bufferLength; + size_t numLiterals; + int64_t* literals; + bool isSigned; + char* buffer; + + virtual void write(int64_t val) = 0; + + virtual void writeByte(char c); + + virtual void writeVulong(int64_t val); + + virtual void writeVslong(int64_t val); }; class RleDecoder { @@ -108,7 +134,8 @@ namespace orc { (std::unique_ptr<BufferedOutputStream> output, bool isSigned, RleVersion version, - MemoryPool& pool); + MemoryPool& pool, + bool alignedBitpacking); /** * Create an RLE decoder. http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLEV2Util.hh ---------------------------------------------------------------------- diff --git a/c++/src/RLEV2Util.hh b/c++/src/RLEV2Util.hh new file mode 100644 index 0000000..7928c17 --- /dev/null +++ b/c++/src/RLEV2Util.hh @@ -0,0 +1,145 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef ORC_RLEV2UTIL_HH +#define ORC_RLEV2UTIL_HH + +#include "RLEv2.hh" + +namespace orc { + inline uint32_t decodeBitWidth(uint32_t n) { + if (n <= FixedBitSizes::TWENTYFOUR) { + return n + 1; + } else if (n == FixedBitSizes::TWENTYSIX) { + return 26; + } else if (n == FixedBitSizes::TWENTYEIGHT) { + return 28; + } else if (n == FixedBitSizes::THIRTY) { + return 30; + } else if (n == FixedBitSizes::THIRTYTWO) { + return 32; + } else if (n == FixedBitSizes::FORTY) { + return 40; + } else if (n == FixedBitSizes::FORTYEIGHT) { + return 48; + } else if (n == FixedBitSizes::FIFTYSIX) { + return 56; + } else { + return 64; + } + } + + inline uint32_t getClosestFixedBits(uint32_t n) { + if (n == 0) { + return 1; + } + + if (n >= 1 && n <= 24) { + return n; + } else if (n <= 26) { + return 26; + } else if (n <= 28) { + return 28; + } else if (n <= 30) { + return 30; + } else if (n <= 32) { + return 32; + } else if (n <= 40) { + return 40; + } else if (n <= 48) { + return 48; + } else if (n <= 56) { + return 56; + } else { + return 64; + } + } + + inline uint32_t getClosestAlignedFixedBits(uint32_t n) { + if (n == 0 || n == 1) { + return 1; + } else if (n <= 2) { + return 2; + } else if (n <= 4) { + return 4; + } else if (n <= 8) { + return 8; + } else if (n <= 16) { + return 16; + } else if (n <= 24) { + return 24; + } else if (n <= 32) { + return 32; + } else if (n <= 40) { + return 40; + } else if (n <= 48) { + return 48; + } else if (n <= 56) { + return 56; + } else { + return 64; + } + } + + inline uint32_t encodeBitWidth(uint32_t n) { + n = getClosestFixedBits(n); + + if (n >= 1 && n <= 24) { + return n - 1; + } else if (n <= 26) { + return FixedBitSizes::TWENTYSIX; + } else if (n <= 28) { + return FixedBitSizes::TWENTYEIGHT; + } else if (n <= 30) { + return FixedBitSizes::THIRTY; + } else if (n <= 32) { + return FixedBitSizes::THIRTYTWO; + } else if (n <= 40) { + return FixedBitSizes::FORTY; + } else if (n <= 48) { + return FixedBitSizes::FORTYEIGHT; + } else if (n <= 56) { + return FixedBitSizes::FIFTYSIX; + } else { + return FixedBitSizes::SIXTYFOUR; + } + } + + inline uint32_t findClosestNumBits(int64_t value) { + if (value < 0) { + return getClosestFixedBits(64); + } + + uint32_t count = 0; + while (value != 0) { + count++; + value = value >> 1; + } + return getClosestFixedBits(count); + } + + inline bool isSafeSubtract(int64_t left, int64_t right) { + return ((left ^ right) >= 0) | ((left ^ (left - right)) >= 0); + } + + inline uint32_t RleEncoderV2::getOpCode(EncodingType encoding) { + return static_cast<uint32_t >(encoding << 6); + } +} + +#endif //ORC_RLEV2UTIL_HH http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLEv1.cc ---------------------------------------------------------------------- diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc index e588e6a..d8b5541 100644 --- a/c++/src/RLEv1.cc +++ b/c++/src/RLEv1.cc @@ -37,43 +37,17 @@ const int MAX_LITERAL_SIZE = 128; RleEncoderV1::RleEncoderV1( std::unique_ptr<BufferedOutputStream> outStream, bool hasSigned): - outputStream(std::move(outStream)) { - isSigned = hasSigned; + RleEncoder(std::move(outStream), hasSigned) { literals = new int64_t[MAX_LITERAL_SIZE]; - numLiterals = 0; delta = 0; repeat = false; tailRunLength = 0; - bufferPosition = 0; - bufferLength = 0; - buffer = nullptr; } RleEncoderV1::~RleEncoderV1() { delete [] literals; } -void RleEncoderV1::add(const int64_t* data, uint64_t numValues, - const char* notNull) { - for (uint64_t i = 0; i < numValues; ++i) { - if (!notNull || notNull[i]) { - write(data[i]); - } - } -} - -void RleEncoderV1::writeByte(char c) { - if (bufferPosition == bufferLength) { - int addedSize = 0; - if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) { - throw std::bad_alloc(); - } - bufferPosition = 0; - bufferLength = addedSize; - } - buffer[bufferPosition++] = c; -} - void RleEncoderV1::writeValues() { if (numLiterals != 0) { if (repeat) { @@ -87,7 +61,7 @@ void RleEncoderV1::writeValues() { } } else { writeByte(static_cast<char>(-numLiterals)); - for(int i=0; i < numLiterals; ++i) { + for(size_t i=0; i < numLiterals; ++i) { if (isSigned) { writeVslong(literals[i]); } else { @@ -103,7 +77,7 @@ void RleEncoderV1::writeValues() { uint64_t RleEncoderV1::flush() { writeValues(); - outputStream->BackUp(bufferLength - bufferPosition); + outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition)); uint64_t dataSize = outputStream->flush(); bufferLength = bufferPosition = 0; return dataSize; @@ -114,7 +88,7 @@ void RleEncoderV1::write(int64_t value) { literals[numLiterals++] = value; tailRunLength = 1; } else if (repeat) { - if (value == literals[0] + delta * numLiterals) { + if (value == literals[0] + delta * static_cast<int64_t>(numLiterals)) { numLiterals += 1; if (numLiterals == MAXIMUM_REPEAT) { writeValues(); @@ -163,36 +137,6 @@ void RleEncoderV1::write(int64_t value) { } } -void RleEncoderV1::writeVslong(int64_t val) { - writeVulong((val << 1) ^ (val >> 63)); -} - -void RleEncoderV1::writeVulong(int64_t val) { - while (true) { - if ((val & ~BASE_128_MASK) == 0) { - writeByte(static_cast<char>(val)); - return; - } else { - writeByte(static_cast<char>(0x80 | (val & BASE_128_MASK))); - // cast val to unsigned so as to force 0-fill right shift - val = (static_cast<uint64_t>(val) >> 7); - } - } -} - -void RleEncoderV1::recordPosition(PositionRecorder* recorder) const { - uint64_t flushedSize = outputStream->getSize(); - uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition); - if (outputStream->isCompressed()) { - recorder->add(flushedSize); - recorder->add(unflushedSize); - } else { - flushedSize -= static_cast<uint64_t>(bufferLength); - recorder->add(flushedSize + unflushedSize); - } - recorder->add(static_cast<uint64_t>(numLiterals)); -} - signed char RleDecoderV1::readByte() { if (bufferStart == bufferEnd) { int bufferLength; http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLEv1.hh ---------------------------------------------------------------------- diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh index a4be19a..b83c51c 100644 --- a/c++/src/RLEv1.hh +++ b/c++/src/RLEv1.hh @@ -30,52 +30,19 @@ class RleEncoderV1 : public RleEncoder { public: RleEncoderV1(std::unique_ptr<BufferedOutputStream> outStream, bool hasSigned); - ~RleEncoderV1() override; - - /** - * Encode the next batch of values. - * @param data the array to be written - * @param numValues the number of values to write - * @param notNull If the pointer is null, all values are writen. If the - * pointer is not null, positions that are false are skipped. - */ - void add(const int64_t* data, uint64_t numValues, - const char* notNull) override; - - /** - * Get size of buffer used so far. - */ - uint64_t getBufferSize() const override { - return outputStream->getSize(); - } + ~RleEncoderV1() override ; /** * Flushing underlying BufferedOutputStream */ uint64_t flush() override; - /** - * record current position - * @param recorder use the recorder to record current positions - */ - virtual void recordPosition(PositionRecorder* recorder) const override; - private: - std::unique_ptr<BufferedOutputStream> outputStream; - bool isSigned; - int64_t* literals; - int numLiterals; int64_t delta; bool repeat; int tailRunLength; - int bufferPosition; - int bufferLength; - char* buffer; - - void write(int64_t val); - void writeByte(char c); - void writeVulong(int64_t val); - void writeVslong(int64_t val); + + void write(int64_t val) override; void writeValues(); }; @@ -113,8 +80,8 @@ private: const bool isSigned; uint64_t remainingValues; int64_t value; - const char* bufferStart; - const char* bufferEnd; + const char *bufferStart; + const char *bufferEnd; int64_t delta; bool repeating; }; http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLEv2.cc ---------------------------------------------------------------------- diff --git a/c++/src/RLEv2.cc b/c++/src/RLEv2.cc deleted file mode 100644 index 10c6190..0000000 --- a/c++/src/RLEv2.cc +++ /dev/null @@ -1,484 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "Adaptor.hh" -#include "Compression.hh" -#include "RLEv2.hh" - -#define MIN_REPEAT 3 - -namespace orc { - -struct FixedBitSizes { - enum FBS { - ONE = 0, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE, - THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, - TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, - TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR - }; -}; - -inline uint32_t decodeBitWidth(uint32_t n) { - if (n <= FixedBitSizes::TWENTYFOUR) { - return n + 1; - } else if (n == FixedBitSizes::TWENTYSIX) { - return 26; - } else if (n == FixedBitSizes::TWENTYEIGHT) { - return 28; - } else if (n == FixedBitSizes::THIRTY) { - return 30; - } else if (n == FixedBitSizes::THIRTYTWO) { - return 32; - } else if (n == FixedBitSizes::FORTY) { - return 40; - } else if (n == FixedBitSizes::FORTYEIGHT) { - return 48; - } else if (n == FixedBitSizes::FIFTYSIX) { - return 56; - } else { - return 64; - } -} - -inline uint32_t getClosestFixedBits(uint32_t n) { - if (n == 0) { - return 1; - } - - if (n >= 1 && n <= 24) { - return n; - } else if (n > 24 && n <= 26) { - return 26; - } else if (n > 26 && n <= 28) { - return 28; - } else if (n > 28 && n <= 30) { - return 30; - } else if (n > 30 && n <= 32) { - return 32; - } else if (n > 32 && n <= 40) { - return 40; - } else if (n > 40 && n <= 48) { - return 48; - } else if (n > 48 && n <= 56) { - return 56; - } else { - return 64; - } -} - -int64_t RleDecoderV2::readLongBE(uint64_t bsz) { - int64_t ret = 0, val; - uint64_t n = bsz; - while (n > 0) { - n--; - val = readByte(); - ret |= (val << (n * 8)); - } - return ret; -} - -inline int64_t RleDecoderV2::readVslong() { - return unZigZag(readVulong()); -} - -uint64_t RleDecoderV2::readVulong() { - uint64_t ret = 0, b; - uint64_t offset = 0; - do { - b = readByte(); - ret |= (0x7f & b) << offset; - offset += 7; - } while (b >= 0x80); - return ret; -} - -RleDecoderV2::RleDecoderV2(std::unique_ptr<SeekableInputStream> input, - bool _isSigned, MemoryPool& pool - ): inputStream(std::move(input)), - isSigned(_isSigned), - firstByte(0), - runLength(0), - runRead(0), - bufferStart(nullptr), - bufferEnd(bufferStart), - deltaBase(0), - byteSize(0), - firstValue(0), - prevValue(0), - bitSize(0), - bitsLeft(0), - curByte(0), - patchBitSize(0), - unpackedIdx(0), - patchIdx(0), - base(0), - curGap(0), - curPatch(0), - patchMask(0), - actualGap(0), - unpacked(pool, 0), - unpackedPatch(pool, 0) { - // PASS -} - -void RleDecoderV2::seek(PositionProvider& location) { - // move the input stream - inputStream->seek(location); - // clear state - bufferEnd = bufferStart = nullptr; - runRead = runLength = 0; - // skip ahead the given number of records - skip(location.next()); -} - -void RleDecoderV2::skip(uint64_t numValues) { - // simple for now, until perf tests indicate something encoding specific is - // needed - const uint64_t N = 64; - int64_t dummy[N]; - - while (numValues) { - uint64_t nRead = std::min(N, numValues); - next(dummy, nRead, nullptr); - numValues -= nRead; - } -} - -void RleDecoderV2::next(int64_t* const data, - const uint64_t numValues, - const char* const notNull) { - uint64_t nRead = 0; - - while (nRead < numValues) { - // Skip any nulls before attempting to read first byte. - while (notNull && !notNull[nRead]) { - if (++nRead == numValues) { - return; // ended with null values - } - } - - if (runRead == runLength) { - resetRun(); - firstByte = readByte(); - } - - uint64_t offset = nRead, length = numValues - nRead; - - EncodingType enc = static_cast<EncodingType> - ((firstByte >> 6) & 0x03); - switch(static_cast<int64_t>(enc)) { - case SHORT_REPEAT: - nRead += nextShortRepeats(data, offset, length, notNull); - break; - case DIRECT: - nRead += nextDirect(data, offset, length, notNull); - break; - case PATCHED_BASE: - nRead += nextPatched(data, offset, length, notNull); - break; - case DELTA: - nRead += nextDelta(data, offset, length, notNull); - break; - default: - throw ParseError("unknown encoding"); - } - } -} - -uint64_t RleDecoderV2::nextShortRepeats(int64_t* const data, - uint64_t offset, - uint64_t numValues, - const char* const notNull) { - if (runRead == runLength) { - // extract the number of fixed bytes - byteSize = (firstByte >> 3) & 0x07; - byteSize += 1; - - runLength = firstByte & 0x07; - // run lengths values are stored only after MIN_REPEAT value is met - runLength += MIN_REPEAT; - runRead = 0; - - // read the repeated value which is store using fixed bytes - firstValue = readLongBE(byteSize); - - if (isSigned) { - firstValue = unZigZag(static_cast<uint64_t>(firstValue)); - } - } - - uint64_t nRead = std::min(runLength - runRead, numValues); - - if (notNull) { - for(uint64_t pos = offset; pos < offset + nRead; ++pos) { - if (notNull[pos]) { - data[pos] = firstValue; - ++runRead; - } - } - } else { - for(uint64_t pos = offset; pos < offset + nRead; ++pos) { - data[pos] = firstValue; - ++runRead; - } - } - - return nRead; -} - -uint64_t RleDecoderV2::nextDirect(int64_t* const data, - uint64_t offset, - uint64_t numValues, - const char* const notNull) { - if (runRead == runLength) { - // extract the number of fixed bits - unsigned char fbo = (firstByte >> 1) & 0x1f; - bitSize = decodeBitWidth(fbo); - - // extract the run length - runLength = static_cast<uint64_t>(firstByte & 0x01) << 8; - runLength |= readByte(); - // runs are one off - runLength += 1; - runRead = 0; - } - - uint64_t nRead = std::min(runLength - runRead, numValues); - - runRead += readLongs(data, offset, nRead, bitSize, notNull); - - if (isSigned) { - if (notNull) { - for (uint64_t pos = offset; pos < offset + nRead; ++pos) { - if (notNull[pos]) { - data[pos] = unZigZag(static_cast<uint64_t>(data[pos])); - } - } - } else { - for (uint64_t pos = offset; pos < offset + nRead; ++pos) { - data[pos] = unZigZag(static_cast<uint64_t>(data[pos])); - } - } - } - - return nRead; -} - -uint64_t RleDecoderV2::nextPatched(int64_t* const data, - uint64_t offset, - uint64_t numValues, - const char* const notNull) { - if (runRead == runLength) { - // extract the number of fixed bits - unsigned char fbo = (firstByte >> 1) & 0x1f; - bitSize = decodeBitWidth(fbo); - - // extract the run length - runLength = static_cast<uint64_t>(firstByte & 0x01) << 8; - runLength |= readByte(); - // runs are one off - runLength += 1; - runRead = 0; - - // extract the number of bytes occupied by base - uint64_t thirdByte = readByte(); - byteSize = (thirdByte >> 5) & 0x07; - // base width is one off - byteSize += 1; - - // extract patch width - uint32_t pwo = thirdByte & 0x1f; - patchBitSize = decodeBitWidth(pwo); - - // read fourth byte and extract patch gap width - uint64_t fourthByte = readByte(); - uint32_t pgw = (fourthByte >> 5) & 0x07; - // patch gap width is one off - pgw += 1; - - // extract the length of the patch list - size_t pl = fourthByte & 0x1f; - if (pl == 0) { - throw ParseError("Corrupt PATCHED_BASE encoded data (pl==0)!"); - } - - // read the next base width number of bytes to extract base value - base = readLongBE(byteSize); - int64_t mask = (static_cast<int64_t>(1) << ((byteSize * 8) - 1)); - // if mask of base value is 1 then base is negative value else positive - if ((base & mask) != 0) { - base = base & ~mask; - base = -base; - } - - // TODO: something more efficient than resize - unpacked.resize(runLength); - unpackedIdx = 0; - readLongs(unpacked.data(), 0, runLength, bitSize); - // any remaining bits are thrown out - resetReadLongs(); - - // TODO: something more efficient than resize - unpackedPatch.resize(pl); - patchIdx = 0; - // TODO: Skip corrupt? - // if ((patchBitSize + pgw) > 64 && !skipCorrupt) { - if ((patchBitSize + pgw) > 64) { - throw ParseError("Corrupt PATCHED_BASE encoded data " - "(patchBitSize + pgw > 64)!"); - } - uint32_t cfb = getClosestFixedBits(patchBitSize + pgw); - readLongs(unpackedPatch.data(), 0, pl, cfb); - // any remaining bits are thrown out - resetReadLongs(); - - // apply the patch directly when decoding the packed data - patchMask = ((static_cast<int64_t>(1) << patchBitSize) - 1); - - adjustGapAndPatch(); - } - - uint64_t nRead = std::min(runLength - runRead, numValues); - - for(uint64_t pos = offset; pos < offset + nRead; ++pos) { - // skip null positions - if (notNull && !notNull[pos]) { - continue; - } - if (static_cast<int64_t>(unpackedIdx) != actualGap) { - // no patching required. add base to unpacked value to get final value - data[pos] = base + unpacked[unpackedIdx]; - } else { - // extract the patch value - int64_t patchedVal = unpacked[unpackedIdx] | (curPatch << bitSize); - - // add base to patched value - data[pos] = base + patchedVal; - - // increment the patch to point to next entry in patch list - ++patchIdx; - - if (patchIdx < unpackedPatch.size()) { - adjustGapAndPatch(); - - // next gap is relative to the current gap - actualGap += unpackedIdx; - } - } - - ++runRead; - ++unpackedIdx; - } - - return nRead; -} - -uint64_t RleDecoderV2::nextDelta(int64_t* const data, - uint64_t offset, - uint64_t numValues, - const char* const notNull) { - if (runRead == runLength) { - // extract the number of fixed bits - unsigned char fbo = (firstByte >> 1) & 0x1f; - if (fbo != 0) { - bitSize = decodeBitWidth(fbo); - } else { - bitSize = 0; - } - - // extract the run length - runLength = static_cast<uint64_t>(firstByte & 0x01) << 8; - runLength |= readByte(); - ++runLength; // account for first value - runRead = deltaBase = 0; - - // read the first value stored as vint - if (isSigned) { - firstValue = static_cast<int64_t>(readVslong()); - } else { - firstValue = static_cast<int64_t>(readVulong()); - } - - prevValue = firstValue; - - // read the fixed delta value stored as vint (deltas can be negative even - // if all number are positive) - deltaBase = static_cast<int64_t>(readVslong()); - } - - uint64_t nRead = std::min(runLength - runRead, numValues); - - uint64_t pos = offset; - for ( ; pos < offset + nRead; ++pos) { - // skip null positions - if (!notNull || notNull[pos]) break; - } - if (runRead == 0 && pos < offset + nRead) { - data[pos++] = firstValue; - ++runRead; - } - - if (bitSize == 0) { - // add fixed deltas to adjacent values - for ( ; pos < offset + nRead; ++pos) { - // skip null positions - if (notNull && !notNull[pos]) { - continue; - } - prevValue = data[pos] = prevValue + deltaBase; - ++runRead; - } - } else { - for ( ; pos < offset + nRead; ++pos) { - // skip null positions - if (!notNull || notNull[pos]) break; - } - if (runRead < 2 && pos < offset + nRead) { - // add delta base and first value - prevValue = data[pos++] = firstValue + deltaBase; - ++runRead; - } - - // write the unpacked values, add it to previous value and store final - // value to result buffer. if the delta base value is negative then it - // is a decreasing sequence else an increasing sequence - uint64_t remaining = (offset + nRead) - pos; - runRead += readLongs(data, pos, remaining, bitSize, notNull); - - if (deltaBase < 0) { - for ( ; pos < offset + nRead; ++pos) { - // skip null positions - if (notNull && !notNull[pos]) { - continue; - } - prevValue = data[pos] = prevValue - data[pos]; - } - } else { - for ( ; pos < offset + nRead; ++pos) { - // skip null positions - if (notNull && !notNull[pos]) { - continue; - } - prevValue = data[pos] = prevValue + data[pos]; - } - } - } - return nRead; -} - -} // namespace orc http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLEv2.hh ---------------------------------------------------------------------- diff --git a/c++/src/RLEv2.hh b/c++/src/RLEv2.hh index 5b86abc..a51b276 100644 --- a/c++/src/RLEv2.hh +++ b/c++/src/RLEv2.hh @@ -25,13 +25,89 @@ #include <vector> +#define MIN_REPEAT 3 +#define HIST_LEN 32 namespace orc { -class RleDecoderV2 : public RleDecoder { +struct FixedBitSizes { + enum FBS { + ONE = 0, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE, + THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, + TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, + TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR, SIZE + }; +}; + +enum EncodingType { SHORT_REPEAT=0, DIRECT=1, PATCHED_BASE=2, DELTA=3 }; + +struct EncodingOption { + EncodingType encoding; + int64_t fixedDelta; + int64_t gapVsPatchListCount; + int64_t zigzagLiteralsCount; + int64_t baseRedLiteralsCount; + int64_t adjDeltasCount; + uint32_t zzBits90p; + uint32_t zzBits100p; + uint32_t brBits95p; + uint32_t brBits100p; + uint32_t bitsDeltaMax; + uint32_t patchWidth; + uint32_t patchGapWidth; + uint32_t patchLength; + int64_t min; + bool isFixedDelta; +}; + +class RleEncoderV2 : public RleEncoder { public: + RleEncoderV2(std::unique_ptr<BufferedOutputStream> outStream, bool hasSigned, bool alignBitPacking = true); + + ~RleEncoderV2() override { + delete [] literals; + delete [] gapVsPatchList; + delete [] zigzagLiterals; + delete [] baseRedLiterals; + delete [] adjDeltas; + } + /** + * Flushing underlying BufferedOutputStream + */ + uint64_t flush() override; - enum EncodingType { SHORT_REPEAT=0, DIRECT=1, PATCHED_BASE=2, DELTA=3 }; +private: + + const bool alignedBitPacking; + uint32_t fixedRunLength; + uint32_t variableRunLength; + int64_t prevDelta; + int32_t histgram[HIST_LEN]; + + // The four list below should actually belong to EncodingOption since it only holds temporal values in write(int64_t val), + // it is move here for performance consideration. + int64_t* gapVsPatchList; + int64_t* zigzagLiterals; + int64_t* baseRedLiterals; + int64_t* adjDeltas; + + uint32_t getOpCode(EncodingType encoding); + void determineEncoding(EncodingOption& option); + void computeZigZagLiterals(EncodingOption& option); + void preparePatchedBlob(EncodingOption& option); + + void write(int64_t val) override ; + void writeInts(int64_t* input, uint32_t offset, size_t len, uint32_t bitSize); + void initializeLiterals(int64_t val); + void writeValues(EncodingOption& option); + void writeShortRepeatValues(EncodingOption& option); + void writeDirectValues(EncodingOption& option); + void writePatchedBasedValues(EncodingOption& option); + void writeDeltaValues(EncodingOption& option); + uint32_t percentileBits(int64_t* data, size_t offset, size_t length, double p, bool reuseHist = false); +}; +class RleDecoderV2 : public RleDecoder { +public: RleDecoderV2(std::unique_ptr<SeekableInputStream> input, bool isSigned, MemoryPool& pool); @@ -134,7 +210,6 @@ private: return ret; } - uint64_t nextShortRepeats(int64_t* data, uint64_t offset, uint64_t numValues, const char* notNull); uint64_t nextDirect(int64_t* data, uint64_t offset, uint64_t numValues, http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RleDecoderV2.cc ---------------------------------------------------------------------- diff --git a/c++/src/RleDecoderV2.cc b/c++/src/RleDecoderV2.cc new file mode 100644 index 0000000..c5c6f6a --- /dev/null +++ b/c++/src/RleDecoderV2.cc @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Adaptor.hh" +#include "Compression.hh" +#include "RLEv2.hh" +#include "RLEV2Util.hh" + +namespace orc { + +int64_t RleDecoderV2::readLongBE(uint64_t bsz) { + int64_t ret = 0, val; + uint64_t n = bsz; + while (n > 0) { + n--; + val = readByte(); + ret |= (val << (n * 8)); + } + return ret; +} + +inline int64_t RleDecoderV2::readVslong() { + return unZigZag(readVulong()); +} + +uint64_t RleDecoderV2::readVulong() { + uint64_t ret = 0, b; + uint64_t offset = 0; + do { + b = readByte(); + ret |= (0x7f & b) << offset; + offset += 7; + } while (b >= 0x80); + return ret; +} + +RleDecoderV2::RleDecoderV2(std::unique_ptr<SeekableInputStream> input, + bool _isSigned, MemoryPool& pool + ): inputStream(std::move(input)), + isSigned(_isSigned), + firstByte(0), + runLength(0), + runRead(0), + bufferStart(nullptr), + bufferEnd(bufferStart), + deltaBase(0), + byteSize(0), + firstValue(0), + prevValue(0), + bitSize(0), + bitsLeft(0), + curByte(0), + patchBitSize(0), + unpackedIdx(0), + patchIdx(0), + base(0), + curGap(0), + curPatch(0), + patchMask(0), + actualGap(0), + unpacked(pool, 0), + unpackedPatch(pool, 0) { + // PASS +} + +void RleDecoderV2::seek(PositionProvider& location) { + // move the input stream + inputStream->seek(location); + // clear state + bufferEnd = bufferStart = nullptr; + runRead = runLength = 0; + // skip ahead the given number of records + skip(location.next()); +} + +void RleDecoderV2::skip(uint64_t numValues) { + // simple for now, until perf tests indicate something encoding specific is + // needed + const uint64_t N = 64; + int64_t dummy[N]; + + while (numValues) { + uint64_t nRead = std::min(N, numValues); + next(dummy, nRead, nullptr); + numValues -= nRead; + } +} + +void RleDecoderV2::next(int64_t* const data, + const uint64_t numValues, + const char* const notNull) { + uint64_t nRead = 0; + + while (nRead < numValues) { + // Skip any nulls before attempting to read first byte. + while (notNull && !notNull[nRead]) { + if (++nRead == numValues) { + return; // ended with null values + } + } + + if (runRead == runLength) { + resetRun(); + firstByte = readByte(); + } + + uint64_t offset = nRead, length = numValues - nRead; + + EncodingType enc = static_cast<EncodingType> + ((firstByte >> 6) & 0x03); + switch(static_cast<int64_t>(enc)) { + case SHORT_REPEAT: + nRead += nextShortRepeats(data, offset, length, notNull); + break; + case DIRECT: + nRead += nextDirect(data, offset, length, notNull); + break; + case PATCHED_BASE: + nRead += nextPatched(data, offset, length, notNull); + break; + case DELTA: + nRead += nextDelta(data, offset, length, notNull); + break; + default: + throw ParseError("unknown encoding"); + } + } +} + +uint64_t RleDecoderV2::nextShortRepeats(int64_t* const data, + uint64_t offset, + uint64_t numValues, + const char* const notNull) { + if (runRead == runLength) { + // extract the number of fixed bytes + byteSize = (firstByte >> 3) & 0x07; + byteSize += 1; + + runLength = firstByte & 0x07; + // run lengths values are stored only after MIN_REPEAT value is met + runLength += MIN_REPEAT; + runRead = 0; + + // read the repeated value which is store using fixed bytes + firstValue = readLongBE(byteSize); + + if (isSigned) { + firstValue = unZigZag(static_cast<uint64_t>(firstValue)); + } + } + + uint64_t nRead = std::min(runLength - runRead, numValues); + + if (notNull) { + for(uint64_t pos = offset; pos < offset + nRead; ++pos) { + if (notNull[pos]) { + data[pos] = firstValue; + ++runRead; + } + } + } else { + for(uint64_t pos = offset; pos < offset + nRead; ++pos) { + data[pos] = firstValue; + ++runRead; + } + } + + return nRead; +} + +uint64_t RleDecoderV2::nextDirect(int64_t* const data, + uint64_t offset, + uint64_t numValues, + const char* const notNull) { + if (runRead == runLength) { + // extract the number of fixed bits + unsigned char fbo = (firstByte >> 1) & 0x1f; + bitSize = decodeBitWidth(fbo); + + // extract the run length + runLength = static_cast<uint64_t>(firstByte & 0x01) << 8; + runLength |= readByte(); + // runs are one off + runLength += 1; + runRead = 0; + } + + uint64_t nRead = std::min(runLength - runRead, numValues); + + runRead += readLongs(data, offset, nRead, bitSize, notNull); + + if (isSigned) { + if (notNull) { + for (uint64_t pos = offset; pos < offset + nRead; ++pos) { + if (notNull[pos]) { + data[pos] = unZigZag(static_cast<uint64_t>(data[pos])); + } + } + } else { + for (uint64_t pos = offset; pos < offset + nRead; ++pos) { + data[pos] = unZigZag(static_cast<uint64_t>(data[pos])); + } + } + } + + return nRead; +} + +uint64_t RleDecoderV2::nextPatched(int64_t* const data, + uint64_t offset, + uint64_t numValues, + const char* const notNull) { + if (runRead == runLength) { + // extract the number of fixed bits + unsigned char fbo = (firstByte >> 1) & 0x1f; + bitSize = decodeBitWidth(fbo); + + // extract the run length + runLength = static_cast<uint64_t>(firstByte & 0x01) << 8; + runLength |= readByte(); + // runs are one off + runLength += 1; + runRead = 0; + + // extract the number of bytes occupied by base + uint64_t thirdByte = readByte(); + byteSize = (thirdByte >> 5) & 0x07; + // base width is one off + byteSize += 1; + + // extract patch width + uint32_t pwo = thirdByte & 0x1f; + patchBitSize = decodeBitWidth(pwo); + + // read fourth byte and extract patch gap width + uint64_t fourthByte = readByte(); + uint32_t pgw = (fourthByte >> 5) & 0x07; + // patch gap width is one off + pgw += 1; + + // extract the length of the patch list + size_t pl = fourthByte & 0x1f; + if (pl == 0) { + throw ParseError("Corrupt PATCHED_BASE encoded data (pl==0)!"); + } + + // read the next base width number of bytes to extract base value + base = readLongBE(byteSize); + int64_t mask = (static_cast<int64_t>(1) << ((byteSize * 8) - 1)); + // if mask of base value is 1 then base is negative value else positive + if ((base & mask) != 0) { + base = base & ~mask; + base = -base; + } + + // TODO: something more efficient than resize + unpacked.resize(runLength); + unpackedIdx = 0; + readLongs(unpacked.data(), 0, runLength, bitSize); + // any remaining bits are thrown out + resetReadLongs(); + + // TODO: something more efficient than resize + unpackedPatch.resize(pl); + patchIdx = 0; + // TODO: Skip corrupt? + // if ((patchBitSize + pgw) > 64 && !skipCorrupt) { + if ((patchBitSize + pgw) > 64) { + throw ParseError("Corrupt PATCHED_BASE encoded data " + "(patchBitSize + pgw > 64)!"); + } + uint32_t cfb = getClosestFixedBits(patchBitSize + pgw); + readLongs(unpackedPatch.data(), 0, pl, cfb); + // any remaining bits are thrown out + resetReadLongs(); + + // apply the patch directly when decoding the packed data + patchMask = ((static_cast<int64_t>(1) << patchBitSize) - 1); + + adjustGapAndPatch(); + } + + uint64_t nRead = std::min(runLength - runRead, numValues); + + for(uint64_t pos = offset; pos < offset + nRead; ++pos) { + // skip null positions + if (notNull && !notNull[pos]) { + continue; + } + if (static_cast<int64_t>(unpackedIdx) != actualGap) { + // no patching required. add base to unpacked value to get final value + data[pos] = base + unpacked[unpackedIdx]; + } else { + // extract the patch value + int64_t patchedVal = unpacked[unpackedIdx] | (curPatch << bitSize); + + // add base to patched value + data[pos] = base + patchedVal; + + // increment the patch to point to next entry in patch list + ++patchIdx; + + if (patchIdx < unpackedPatch.size()) { + adjustGapAndPatch(); + + // next gap is relative to the current gap + actualGap += unpackedIdx; + } + } + + ++runRead; + ++unpackedIdx; + } + + return nRead; +} + +uint64_t RleDecoderV2::nextDelta(int64_t* const data, + uint64_t offset, + uint64_t numValues, + const char* const notNull) { + if (runRead == runLength) { + // extract the number of fixed bits + unsigned char fbo = (firstByte >> 1) & 0x1f; + if (fbo != 0) { + bitSize = decodeBitWidth(fbo); + } else { + bitSize = 0; + } + + // extract the run length + runLength = static_cast<uint64_t>(firstByte & 0x01) << 8; + runLength |= readByte(); + ++runLength; // account for first value + runRead = deltaBase = 0; + + // read the first value stored as vint + if (isSigned) { + firstValue = static_cast<int64_t>(readVslong()); + } else { + firstValue = static_cast<int64_t>(readVulong()); + } + + prevValue = firstValue; + + // read the fixed delta value stored as vint (deltas can be negative even + // if all number are positive) + deltaBase = static_cast<int64_t>(readVslong()); + } + + uint64_t nRead = std::min(runLength - runRead, numValues); + + uint64_t pos = offset; + for ( ; pos < offset + nRead; ++pos) { + // skip null positions + if (!notNull || notNull[pos]) break; + } + if (runRead == 0 && pos < offset + nRead) { + data[pos++] = firstValue; + ++runRead; + } + + if (bitSize == 0) { + // add fixed deltas to adjacent values + for ( ; pos < offset + nRead; ++pos) { + // skip null positions + if (notNull && !notNull[pos]) { + continue; + } + prevValue = data[pos] = prevValue + deltaBase; + ++runRead; + } + } else { + for ( ; pos < offset + nRead; ++pos) { + // skip null positions + if (!notNull || notNull[pos]) break; + } + if (runRead < 2 && pos < offset + nRead) { + // add delta base and first value + prevValue = data[pos++] = firstValue + deltaBase; + ++runRead; + } + + // write the unpacked values, add it to previous value and store final + // value to result buffer. if the delta base value is negative then it + // is a decreasing sequence else an increasing sequence + uint64_t remaining = (offset + nRead) - pos; + runRead += readLongs(data, pos, remaining, bitSize, notNull); + + if (deltaBase < 0) { + for ( ; pos < offset + nRead; ++pos) { + // skip null positions + if (notNull && !notNull[pos]) { + continue; + } + prevValue = data[pos] = prevValue - data[pos]; + } + } else { + for ( ; pos < offset + nRead; ++pos) { + // skip null positions + if (notNull && !notNull[pos]) { + continue; + } + prevValue = data[pos] = prevValue + data[pos]; + } + } + } + return nRead; +} + +} // namespace orc http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RleEncoderV2.cc ---------------------------------------------------------------------- diff --git a/c++/src/RleEncoderV2.cc b/c++/src/RleEncoderV2.cc new file mode 100644 index 0000000..4bd4c9d --- /dev/null +++ b/c++/src/RleEncoderV2.cc @@ -0,0 +1,767 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with option work for additional information + * regarding copyright ownership. The ASF licenses option file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use option file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Adaptor.hh" +#include "Compression.hh" +#include "RLEv2.hh" +#include "RLEV2Util.hh" + +#define MAX_LITERAL_SIZE 512 +#define MAX_SHORT_REPEAT_LENGTH 10 + +namespace orc { + +/** + * Compute the bits required to represent pth percentile value + * @param data - array + * @param p - percentile value (>=0.0 to <=1.0) + * @return pth percentile bits + */ +uint32_t RleEncoderV2::percentileBits(int64_t* data, size_t offset, size_t length, double p, bool reuseHist) { + if ((p > 1.0) || (p <= 0.0)) { + throw InvalidArgument("Invalid p value: " + std::to_string(p)); + } + + if (!reuseHist) { + // histogram that store the encoded bit requirement for each values. + // maximum number of bits that can encoded is 32 (refer FixedBitSizes) + memset(histgram, 0, FixedBitSizes::SIZE * sizeof(int32_t)); + // compute the histogram + for(size_t i = offset; i < (offset + length); i++) { + uint32_t idx = encodeBitWidth(findClosestNumBits(data[i])); + histgram[idx] += 1; + } + } + + int32_t perLen = static_cast<int32_t>(static_cast<double>(length) * (1.0 - p)); + + // return the bits required by pth percentile length + for(int32_t i = HIST_LEN - 1; i >= 0; i--) { + perLen -= histgram[i]; + if (perLen < 0) { + return decodeBitWidth(static_cast<uint32_t>(i)); + } + } + return 0; +} + +RleEncoderV2::RleEncoderV2(std::unique_ptr<BufferedOutputStream> outStream, + bool hasSigned, bool alignBitPacking) : + RleEncoder(std::move(outStream), hasSigned), + alignedBitPacking(alignBitPacking), + prevDelta(0){ + literals = new int64_t[MAX_LITERAL_SIZE]; + gapVsPatchList = new int64_t[MAX_LITERAL_SIZE]; + zigzagLiterals = new int64_t[MAX_LITERAL_SIZE]; + baseRedLiterals = new int64_t[MAX_LITERAL_SIZE]; + adjDeltas = new int64_t[MAX_LITERAL_SIZE]; +} + +void RleEncoderV2::write(int64_t val) { + if(numLiterals == 0) { + initializeLiterals(val); + return; + } + + if(numLiterals == 1) { + prevDelta = val - literals[0]; + literals[numLiterals++] = val; + + if(val == literals[0]) { + fixedRunLength = 2; + variableRunLength = 0; + } else { + fixedRunLength = 0; + variableRunLength = 2; + } + return; + } + + int64_t currentDelta = val - literals[numLiterals - 1]; + EncodingOption option = {}; + if (prevDelta == 0 && currentDelta == 0) { + // case 1: fixed delta run + literals[numLiterals++] = val; + + if (variableRunLength > 0) { + // if variable run is non-zero then we are seeing repeating + // values at the end of variable run in which case fixed Run + // length is 2 + fixedRunLength = 2; + } + fixedRunLength++; + + // if fixed run met the minimum condition and if variable + // run is non-zero then flush the variable run and shift the + // tail fixed runs to start of the buffer + if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) { + numLiterals -= MIN_REPEAT; + variableRunLength -= (MIN_REPEAT - 1); + + int64_t tailVals[MIN_REPEAT] = {0}; + + memcpy(tailVals, literals + numLiterals, sizeof(int64_t) * MIN_REPEAT); + determineEncoding(option); + writeValues(option); + + // shift tail fixed runs to beginning of the buffer + for (size_t i = 0; i < MIN_REPEAT; ++i) { + literals[numLiterals++] = tailVals[i]; + } + } + + if (fixedRunLength == MAX_LITERAL_SIZE) {; + determineEncoding(option); + writeValues(option); + } + return; + } + + // case 2: variable delta run + + // if fixed run length is non-zero and if it satisfies the + // short repeat conditions then write the values as short repeats + // else use delta encoding + if (fixedRunLength >= MIN_REPEAT) { + if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + option.encoding = SHORT_REPEAT; + } else { + option.encoding = DELTA; + option.isFixedDelta = true; + } + writeValues(option); + } + + // if fixed run length is <MIN_REPEAT and current value is + // different from previous then treat it as variable run + if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT && val != literals[numLiterals - 1]) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + } + + // after writing values re-initialize the variables + if (numLiterals == 0) { + initializeLiterals(val); + } else { + prevDelta = val - literals[numLiterals - 1]; + literals[numLiterals++] = val; + variableRunLength++; + + if (variableRunLength == MAX_LITERAL_SIZE) { + determineEncoding(option); + writeValues(option); + } + } +} + +void RleEncoderV2::computeZigZagLiterals(EncodingOption &option) { + int64_t zzEncVal = 0; + for (size_t i = 0; i < numLiterals; i++) { + if (isSigned) { + zzEncVal = zigZag(literals[i]); + } else { + zzEncVal = literals[i]; + } + zigzagLiterals[option.zigzagLiteralsCount++] = zzEncVal; + } +} + +void RleEncoderV2::preparePatchedBlob(EncodingOption& option) { + // mask will be max value beyond which patch will be generated + int64_t mask = static_cast<int64_t>(static_cast<uint64_t>(1) << option.brBits95p) - 1; + + // since we are considering only 95 percentile, the size of gap and + // patch array can contain only be 5% values + option.patchLength = static_cast<uint32_t>(std::ceil((numLiterals / 20))); + + // #bit for patch + option.patchWidth = option.brBits100p - option.brBits95p; + option.patchWidth = getClosestFixedBits(option.patchWidth); + + // if patch bit requirement is 64 then it will not possible to pack + // gap and patch together in a long. To make sure gap and patch can be + // packed together adjust the patch width + if (option.patchWidth == 64) { + option.patchWidth = 56; + option.brBits95p = 8; + mask = static_cast<int64_t>(static_cast<uint64_t>(1) << option.brBits95p) - 1; + } + + uint32_t gapIdx = 0; + uint32_t patchIdx = 0; + size_t prev = 0; + size_t maxGap = 0; + + std::vector<int64_t> gapList; + std::vector<int64_t> patchList; + + for(size_t i = 0; i < numLiterals; i++) { + // if value is above mask then create the patch and record the gap + if (baseRedLiterals[i] > mask) { + size_t gap = i - prev; + if (gap > maxGap) { + maxGap = gap; + } + + // gaps are relative, so store the previous patched value index + prev = i; + gapList.push_back(static_cast<int64_t>(gap)); + gapIdx++; + + // extract the most significant bits that are over mask bits + int64_t patch = baseRedLiterals[i] >> option.brBits95p; + patchList.push_back(patch); + patchIdx++; + + // strip off the MSB to enable safe bit packing + baseRedLiterals[i] &= mask; + } + } + + // adjust the patch length to number of entries in gap list + option.patchLength = gapIdx; + + // if the element to be patched is the first and only element then + // max gap will be 0, but to store the gap as 0 we need atleast 1 bit + if (maxGap == 0 && option.patchLength != 0) { + option.patchGapWidth = 1; + } else { + option.patchGapWidth = findClosestNumBits(static_cast<int64_t>(maxGap)); + } + + // special case: if the patch gap width is greater than 256, then + // we need 9 bits to encode the gap width. But we only have 3 bits in + // header to record the gap width. To deal with this case, we will save + // two entries in patch list in the following way + // 256 gap width => 0 for patch value + // actual gap - 256 => actual patch value + // We will do the same for gap width = 511. If the element to be patched is + // the last element in the scope then gap width will be 511. In this case we + // will have 3 entries in the patch list in the following way + // 255 gap width => 0 for patch value + // 255 gap width => 0 for patch value + // 1 gap width => actual patch value + if (option.patchGapWidth > 8) { + option.patchGapWidth = 8; + // for gap = 511, we need two additional entries in patch list + if (maxGap == 511) { + option.patchLength += 2; + } else { + option.patchLength += 1; + } + } + + // create gap vs patch list + gapIdx = 0; + patchIdx = 0; + for(size_t i = 0; i < option.patchLength; i++) { + int64_t g = gapList[gapIdx++]; + int64_t p = patchList[patchIdx++]; + while (g > 255) { + gapVsPatchList[option.gapVsPatchListCount++] = (255L << option.patchWidth); + i++; + g -= 255; + } + + // store patch value in LSBs and gap in MSBs + gapVsPatchList[option.gapVsPatchListCount++] = ((g << option.patchWidth) | p); + } +} + +void RleEncoderV2::determineEncoding(EncodingOption& option) { + // we need to compute zigzag values for DIRECT encoding if we decide to + // break early for delta overflows or for shorter runs + computeZigZagLiterals(option); + + option.zzBits100p = percentileBits(zigzagLiterals, 0, numLiterals, 1.0); + + // not a big win for shorter runs to determine encoding + if (numLiterals <= MIN_REPEAT) { + option.encoding = DIRECT; + return; + } + + // DELTA encoding check + + // for identifying monotonic sequences + bool isIncreasing = true; + bool isDecreasing = true; + option.isFixedDelta = true; + + option.min = literals[0]; + int64_t max = literals[0]; + int64_t initialDelta = literals[1] - literals[0]; + int64_t currDelta = 0; + int64_t deltaMax = 0; + adjDeltas[option.adjDeltasCount++] = initialDelta; + + for (size_t i = 1; i < numLiterals; i++) { + const int64_t l1 = literals[i]; + const int64_t l0 = literals[i - 1]; + currDelta = l1 - l0; + option.min = std::min(option.min, l1); + max = std::max(max, l1); + + isIncreasing &= (l0 <= l1); + isDecreasing &= (l0 >= l1); + + option.isFixedDelta &= (currDelta == initialDelta); + if (i > 1) { + adjDeltas[option.adjDeltasCount++] = std::abs(currDelta); + deltaMax = std::max(deltaMax, adjDeltas[i - 1]); + } + } + + // it's faster to exit under delta overflow condition without checking for + // PATCHED_BASE condition as encoding using DIRECT is faster and has less + // overhead than PATCHED_BASE + if (!isSafeSubtract(max, option.min)) { + option.encoding = DIRECT; + return; + } + + // invariant - subtracting any number from any other in the literals after + // option point won't overflow + + // if min is equal to max then the delta is 0, option condition happens for + // fixed values run >10 which cannot be encoded with SHORT_REPEAT + if (option.min == max) { + if (!option.isFixedDelta) { + throw InvalidArgument(std::to_string(option.min) + "==" + std::to_string(max) + ", isFixedDelta cannot be false"); + } + + if(currDelta != 0) { + throw InvalidArgument(std::to_string(option.min) + "==" + std::to_string(max) + ", currDelta should be zero"); + } + option.fixedDelta = 0; + option.encoding = DELTA; + return; + } + + if (option.isFixedDelta) { + if (currDelta != initialDelta) { + throw InvalidArgument("currDelta should be equal to initialDelta for fixed delta encoding"); + } + + option.encoding = DELTA; + option.fixedDelta = currDelta; + return; + } + + // if initialDelta is 0 then we cannot delta encode as we cannot identify + // the sign of deltas (increasing or decreasing) + if (initialDelta != 0) { + // stores the number of bits required for packing delta blob in + // delta encoding + option.bitsDeltaMax = findClosestNumBits(deltaMax); + + // monotonic condition + if (isIncreasing || isDecreasing) { + option.encoding = DELTA; + return; + } + } + + // PATCHED_BASE encoding check + + // percentile values are computed for the zigzag encoded values. if the + // number of bit requirement between 90th and 100th percentile varies + // beyond a threshold then we need to patch the values. if the variation + // is not significant then we can use direct encoding + + option.zzBits90p = percentileBits(zigzagLiterals, 0, numLiterals, 0.9, true); + uint32_t diffBitsLH = option.zzBits100p - option.zzBits90p; + + // if the difference between 90th percentile and 100th percentile fixed + // bits is > 1 then we need patch the values + if (diffBitsLH > 1) { + + // patching is done only on base reduced values. + // remove base from literals + for (size_t i = 0; i < numLiterals; i++) { + baseRedLiterals[option.baseRedLiteralsCount++] = (literals[i] - option.min); + } + + // 95th percentile width is used to determine max allowed value + // after which patching will be done + option.brBits95p = percentileBits(baseRedLiterals, 0, numLiterals, 0.95); + + // 100th percentile is used to compute the max patch width + option.brBits100p = percentileBits(baseRedLiterals, 0, numLiterals, 1.0, true); + + // after base reducing the values, if the difference in bits between + // 95th percentile and 100th percentile value is zero then there + // is no point in patching the values, in which case we will + // fallback to DIRECT encoding. + // The decision to use patched base was based on zigzag values, but the + // actual patching is done on base reduced literals. + if ((option.brBits100p - option.brBits95p) != 0) { + option.encoding = PATCHED_BASE; + preparePatchedBlob(option); + return; + } else { + option.encoding = DIRECT; + return; + } + } else { + // if difference in bits between 95th percentile and 100th percentile is + // 0, then patch length will become 0. Hence we will fallback to direct + option.encoding = DIRECT; + return; + } +} + +uint64_t RleEncoderV2::flush() { + if (numLiterals != 0) { + EncodingOption option = {}; + if (variableRunLength != 0) { + determineEncoding(option); + writeValues(option); + } else if (fixedRunLength != 0) { + if (fixedRunLength < MIN_REPEAT) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + determineEncoding(option); + writeValues(option); + } else if (fixedRunLength >= MIN_REPEAT + && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + option.encoding = SHORT_REPEAT; + writeValues(option); + } else { + option.encoding = DELTA; + option.isFixedDelta = true; + writeValues(option); + } + } + } + + outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition)); + uint64_t dataSize = outputStream->flush(); + bufferLength = bufferPosition = 0; + return dataSize; +} + +void RleEncoderV2::writeValues(EncodingOption& option) { + if (numLiterals != 0) { + switch (option.encoding) { + case SHORT_REPEAT: + writeShortRepeatValues(option); + break; + case DIRECT: + writeDirectValues(option); + break; + case PATCHED_BASE: + writePatchedBasedValues(option); + break; + case DELTA: + writeDeltaValues(option); + break; + default: + throw NotImplementedYet("Not implemented yet"); + } + + numLiterals = 0; + prevDelta = 0; + } +} + +void RleEncoderV2::writeShortRepeatValues(EncodingOption&) { + int64_t repeatVal; + if (isSigned) { + repeatVal = zigZag(literals[0]); + } else { + repeatVal = literals[0]; + } + + const uint32_t numBitsRepeatVal = findClosestNumBits(repeatVal); + const uint32_t numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? (numBitsRepeatVal >> 3) : ((numBitsRepeatVal >> 3) + 1); + + uint32_t header = getOpCode(SHORT_REPEAT); + + fixedRunLength -= MIN_REPEAT; + header |= fixedRunLength; + header |= ((numBytesRepeatVal - 1) << 3); + + writeByte(static_cast<char>(header)); + + for(int32_t i = static_cast<int32_t>(numBytesRepeatVal - 1); i >= 0; i--) { + int64_t b = ((repeatVal >> (i * 8)) & 0xff); + writeByte(static_cast<char>(b)); + } + + fixedRunLength = 0; +} + +void RleEncoderV2::writeDirectValues(EncodingOption& option) { + // write the number of fixed bits required in next 5 bits + uint32_t fb = option.zzBits100p; + if (alignedBitPacking) { + fb = getClosestAlignedFixedBits(fb); + } + + const uint32_t efb = encodeBitWidth(fb) << 1; + + // adjust variable run length + variableRunLength -= 1; + + // extract the 9th bit of run length + const uint32_t tailBits = (variableRunLength & 0x100) >> 8; + + // create first byte of the header + const char headerFirstByte = static_cast<char>(getOpCode(DIRECT) | efb | tailBits); + + // second byte of the header stores the remaining 8 bits of runlength + const char headerSecondByte = static_cast<char>(variableRunLength & 0xff); + + // write header + writeByte(headerFirstByte); + writeByte(headerSecondByte); + + // bit packing the zigzag encoded literals + writeInts(zigzagLiterals, 0, numLiterals, fb); + + // reset run length + variableRunLength = 0; +} + +void RleEncoderV2::writePatchedBasedValues(EncodingOption& option) { + // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding + // because patch is applied to MSB bits. For example: If fixed bit width of + // base value is 7 bits and if patch is 3 bits, the actual value is + // constructed by shifting the patch to left by 7 positions. + // actual_value = patch << 7 | base_value + // So, if we align base_value then actual_value can not be reconstructed. + + // write the number of fixed bits required in next 5 bits + const uint32_t efb = encodeBitWidth(option.brBits95p) << 1; + + // adjust variable run length, they are one off + variableRunLength -= 1; + + // extract the 9th bit of run length + const uint32_t tailBits = (variableRunLength & 0x100) >> 8; + + // create first byte of the header + const char headerFirstByte = static_cast<char>(getOpCode(PATCHED_BASE) | efb | tailBits); + + // second byte of the header stores the remaining 8 bits of runlength + const char headerSecondByte = static_cast<char>(variableRunLength & 0xff); + + // if the min value is negative toggle the sign + const bool isNegative = (option.min < 0); + if (isNegative) { + option.min = -option.min; + } + + // find the number of bytes required for base and shift it by 5 bits + // to accommodate patch width. The additional bit is used to store the sign + // of the base value. + const uint32_t baseWidth = findClosestNumBits(option.min) + 1; + const uint32_t baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; + const uint32_t bb = (baseBytes - 1) << 5; + + // if the base value is negative then set MSB to 1 + if (isNegative) { + option.min |= (1LL << ((baseBytes * 8) - 1)); + } + + // third byte contains 3 bits for number of bytes occupied by base + // and 5 bits for patchWidth + const char headerThirdByte = static_cast<char>(bb | encodeBitWidth(option.patchWidth)); + + // fourth byte contains 3 bits for page gap width and 5 bits for + // patch length + const char headerFourthByte = static_cast<char>((option.patchGapWidth - 1) << 5 | option.patchLength); + + // write header + writeByte(headerFirstByte); + writeByte(headerSecondByte); + writeByte(headerThirdByte); + writeByte(headerFourthByte); + + // write the base value using fixed bytes in big endian order + for(int32_t i = static_cast<int32_t>(baseBytes - 1); i >= 0; i--) { + char b = static_cast<char>(((option.min >> (i * 8)) & 0xff)); + writeByte(b); + } + + // base reduced literals are bit packed + uint32_t closestFixedBits = getClosestFixedBits(option.brBits95p); + + writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits); + + // write patch list + closestFixedBits = getClosestFixedBits(option.patchGapWidth + option.patchWidth); + + writeInts(gapVsPatchList, 0, option.patchLength, closestFixedBits); + + // reset run length + variableRunLength = 0; +} + +void RleEncoderV2::writeDeltaValues(EncodingOption& option) { + uint32_t len = 0; + uint32_t fb = option.bitsDeltaMax; + uint32_t efb = 0; + + if (alignedBitPacking) { + fb = getClosestAlignedFixedBits(fb); + } + + if (option.isFixedDelta) { + // if fixed run length is greater than threshold then it will be fixed + // delta sequence with delta value 0 else fixed delta sequence with + // non-zero delta value + if (fixedRunLength > MIN_REPEAT) { + // ex. sequence: 2 2 2 2 2 2 2 2 + len = fixedRunLength - 1; + fixedRunLength = 0; + } else { + // ex. sequence: 4 6 8 10 12 14 16 + len = variableRunLength - 1; + variableRunLength = 0; + } + } else { + // fixed width 0 is used for long repeating values. + // sequences that require only 1 bit to encode will have an additional bit + if (fb == 1) { + fb = 2; + } + efb = encodeBitWidth(fb) << 1; + len = variableRunLength - 1; + variableRunLength = 0; + } + + // extract the 9th bit of run length + const uint32_t tailBits = (len & 0x100) >> 8; + + // create first byte of the header + const char headerFirstByte = static_cast<char>(getOpCode(DELTA) | efb | tailBits); + + // second byte of the header stores the remaining 8 bits of runlength + const char headerSecondByte = static_cast<char>(len & 0xff); + + // write header + writeByte(headerFirstByte); + writeByte(headerSecondByte); + + // store the first value from zigzag literal array + if (isSigned) { + writeVslong(literals[0]); + } else { + writeVulong(literals[0]); + } + + if (option.isFixedDelta) { + // if delta is fixed then we don't need to store delta blob + writeVslong(option.fixedDelta); + } else { + // store the first value as delta value using zigzag encoding + writeVslong(adjDeltas[0]); + + // adjacent delta values are bit packed. The length of adjDeltas array is + // always one less than the number of literals (delta difference for n + // elements is n-1). We have already written one element, write the + // remaining numLiterals - 2 elements here + writeInts(adjDeltas, 1, numLiterals - 2, fb); + } +} + +void RleEncoderV2::writeInts(int64_t* input, uint32_t offset, size_t len, uint32_t bitSize) { + if(input == nullptr || len < 1 || bitSize < 1) { + return; + } + + if (getClosestAlignedFixedBits(bitSize) == bitSize) { + uint32_t numBytes; + uint32_t endOffSet = static_cast<uint32_t>(offset + len); + if (bitSize < 8 ) {; + char bitMask = static_cast<char>((1 << bitSize) - 1); + uint32_t numHops = 8 / bitSize; + uint32_t remainder = static_cast<uint32_t>(len % numHops); + uint32_t endUnroll = endOffSet - remainder; + for (uint32_t i = offset; i < endUnroll; i+=numHops) { + char toWrite = 0; + for (uint32_t j = 0; j < numHops; ++j) { + toWrite |= static_cast<char>((input[i+j] & bitMask) << (8 - (j + 1) * bitSize)); + } + writeByte(toWrite); + } + + if (remainder > 0) { + uint32_t startShift = 8 - bitSize; + char toWrite = 0; + for (uint32_t i = endUnroll; i < endOffSet; ++i) { + toWrite |= static_cast<char>((input[i] & bitMask) << startShift); + startShift -= bitSize; + } + writeByte(toWrite); + } + + } else { + numBytes = bitSize / 8; + + for (uint32_t i = offset; i < endOffSet; ++i) { + for (uint32_t j = 0; j < numBytes; ++j) { + char toWrite = static_cast<char>((input[i] >> (8 * (numBytes - j - 1))) & 255); + writeByte(toWrite); + } + } + } + + return; + } + + // write for unaligned bit size + uint32_t bitsLeft = 8; + char current = 0; + for(uint32_t i = offset; i < (offset + len); i++) { + int64_t value = input[i]; + uint32_t bitsToWrite = bitSize; + while (bitsToWrite > bitsLeft) { + // add the bits to the bottom of the current word + current |= static_cast<char>(value >> (bitsToWrite - bitsLeft)); + // subtract out the bits we just added + bitsToWrite -= bitsLeft; + // zero out the bits above bitsToWrite + value &= (static_cast<uint64_t>(1) << bitsToWrite) - 1; + writeByte(current); + current = 0; + bitsLeft = 8; + } + bitsLeft -= bitsToWrite; + current |= static_cast<char>(value << bitsLeft); + if (bitsLeft == 0) { + writeByte(current); + current = 0; + bitsLeft = 8; + } + } + + // flush + if (bitsLeft != 8) { + writeByte(current); + } +} + +void RleEncoderV2::initializeLiterals(int64_t val) { + literals[numLiterals++] = val; + fixedRunLength = 1; + variableRunLength = 1; +} +} http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/Writer.cc ---------------------------------------------------------------------- diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc index 16c2af4..3a56360 100644 --- a/c++/src/Writer.cc +++ b/c++/src/Writer.cc @@ -40,7 +40,7 @@ namespace orc { bool enableIndex; WriterOptionsPrivate() : - fileVersion(FileVersion::v_0_11()) { // default to Hive_0_11 + fileVersion(FileVersion::v_0_12()) { // default to Hive_0_11 stripeSize = 64 * 1024 * 1024; // 64M compressionBlockSize = 64 * 1024; // 64K rowIndexStride = 10000; @@ -83,6 +83,14 @@ namespace orc { WriterOptions::~WriterOptions() { // PASS } + RleVersion WriterOptions::getRleVersion() const { + if(privateBits->fileVersion == FileVersion::v_0_11()) + { + return RleVersion_1; + } + + return RleVersion_2; + } WriterOptions& WriterOptions::setStripeSize(uint64_t size) { privateBits->stripeSize = size; @@ -122,8 +130,8 @@ namespace orc { } WriterOptions& WriterOptions::setFileVersion(const FileVersion& version) { - // Only Hive_0_11 version is supported currently - if (version.getMajor() == 0 && version.getMinor() == 11) { + // Only Hive_0_11 and Hive_0_12 version are supported currently + if (version.getMajor() == 0 && (version.getMinor() == 11 || version.getMinor() == 12)) { privateBits->fileVersion = version; return *this; } @@ -153,6 +161,10 @@ namespace orc { return privateBits->compressionStrategy; } + bool WriterOptions::getAlignedBitpacking() const { + return privateBits->compressionStrategy == CompressionStrategy ::CompressionStrategy_SPEED; + } + WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) { privateBits->paddingTolerance = tolerance; return *this; http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/test/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt index 0a04aaf..015814a 100644 --- a/c++/test/CMakeLists.txt +++ b/c++/test/CMakeLists.txt @@ -37,8 +37,8 @@ add_executable (orc-test TestDriver.cc TestInt128.cc TestReader.cc - TestRle.cc - TestRLEv1Encoder.cc + TestRleDecoder.cc + TestRleEncoder.cc TestStripeIndexStatistics.cc TestTimestampStatistics.cc TestTimezone.cc