ORC-343: Enable C++ writer to support RleV2

Fixes #273

Signed-off-by: Gang Wu <gan...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/f31c80bd
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/f31c80bd
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/f31c80bd

Branch: refs/heads/master
Commit: f31c80bd8d786695a4e1e3f3146e42703ec1b207
Parents: 533e0c4
Author: Yurui Zhou <yurui....@alibaba-inc.com>
Authored: Fri Mar 9 13:23:37 2018 +0800
Committer: Gang Wu <gan...@alibaba-inc.com>
Committed: Tue Jun 12 15:59:10 2018 -0700

----------------------------------------------------------------------
 c++/include/orc/Writer.hh    |   16 +
 c++/src/CMakeLists.txt       |    6 +-
 c++/src/ColumnWriter.cc      |   64 +-
 c++/src/RLE.cc               |   56 +-
 c++/src/RLE.hh               |   45 +-
 c++/src/RLEV2Util.hh         |  145 ++
 c++/src/RLEv1.cc             |   64 +-
 c++/src/RLEv1.hh             |   43 +-
 c++/src/RLEv2.cc             |  484 -------
 c++/src/RLEv2.hh             |   81 +-
 c++/src/RleDecoderV2.cc      |  426 ++++++
 c++/src/RleEncoderV2.cc      |  767 +++++++++++
 c++/src/Writer.cc            |   18 +-
 c++/test/CMakeLists.txt      |    4 +-
 c++/test/TestRLEv1Encoder.cc |  246 ----
 c++/test/TestRle.cc          | 2656 -------------------------------------
 c++/test/TestRleDecoder.cc   | 2656 +++++++++++++++++++++++++++++++++++++
 c++/test/TestRleEncoder.cc   |  286 ++++
 c++/test/TestWriter.cc       |  107 +-
 19 files changed, 4605 insertions(+), 3565 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/include/orc/Writer.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index c91399a..cdda922 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -38,6 +38,11 @@ namespace orc {
     CompressionStrategy_COMPRESSION
   };
 
+  enum RleVersion {
+    RleVersion_1 = 0,
+    RleVersion_2 = 1
+  };
+
   class Timezone;
 
   /**
@@ -132,6 +137,12 @@ namespace orc {
     CompressionStrategy getCompressionStrategy() const;
 
     /**
+     * Get if the bitpacking should be aligned.
+     * @return true if should be aligned, return false otherwise
+     */
+    bool getAlignedBitpacking() const;
+
+    /**
      * Set the padding tolerance.
      */
     WriterOptions& setPaddingTolerance(double tolerance);
@@ -165,6 +176,11 @@ namespace orc {
     std::ostream * getErrorStream() const;
 
     /**
+     * Get the RLE version.
+     */
+    RleVersion getRleVersion() const;
+
+    /**
      * Get whether or not to write row group index
      * @return if not set, the default is false
      */

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index 695a0ae..c3d5993 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -179,15 +179,15 @@ set(SOURCE_FILES
   OrcFile.cc
   Reader.cc
   RLEv1.cc
-  RLEv2.cc
+  RleDecoderV2.cc
+  RleEncoderV2.cc
   RLE.cc
   Statistics.cc
   StripeStream.cc
   Timezone.cc
   TypeImpl.cc
   Vector.cc
-  Writer.cc
-  )
+  Writer.cc)
 
 if(ORC_CXX_HAS_THREAD_LOCAL AND BUILD_LIBHDFSPP)
   set(SOURCE_FILES ${SOURCE_FILES} OrcHdfsFile.cc)

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/ColumnWriter.cc
----------------------------------------------------------------------
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 8b82783..eb2fc40 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -72,6 +72,19 @@ namespace orc {
     // PASS
   }
 
+  proto::ColumnEncoding_Kind RleVersionMapper(RleVersion rleVersion)
+  {
+    switch (rleVersion)
+    {
+      case RleVersion_1:
+        return proto::ColumnEncoding_Kind_DIRECT;
+      case RleVersion_2:
+        return proto::ColumnEncoding_Kind_DIRECT_V2;
+      default:
+        throw InvalidArgument("Invalid param");
+    }
+  }
+
   ColumnWriter::ColumnWriter(
                              const Type& type,
                              const StreamsFactory& factory,
@@ -399,14 +412,15 @@ namespace orc {
                            const StreamsFactory& factory,
                            const WriterOptions& options) :
                              ColumnWriter(type, factory, options),
-                             rleVersion(RleVersion_1) {
+                             rleVersion(options.getRleVersion()) {
     std::unique_ptr<BufferedOutputStream> dataStream =
       factory.createStream(proto::Stream_Kind_DATA);
     rleEncoder = createRleEncoder(
                                   std::move(dataStream),
                                   true,
                                   rleVersion,
-                                  memPool);
+                                  memPool,
+                                  options.getAlignedBitpacking());
 
     if (enableIndex) {
       recordPosition();
@@ -469,9 +483,7 @@ namespace orc {
   void IntegerColumnWriter::getColumnEncoding(
                        std::vector<proto::ColumnEncoding>& encodings) const {
     proto::ColumnEncoding encoding;
-    encoding.set_kind(rleVersion == RleVersion_1 ?
-                                proto::ColumnEncoding_Kind_DIRECT :
-                                proto::ColumnEncoding_Kind_DIRECT_V2);
+    encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
     encodings.push_back(encoding);
   }
@@ -840,13 +852,14 @@ namespace orc {
                           const StreamsFactory& factory,
                           const WriterOptions& options) :
                               ColumnWriter(type, factory, options),
-                              rleVersion(RleVersion_1) {
+                              rleVersion(options.getRleVersion()) {
     std::unique_ptr<BufferedOutputStream> lengthStream =
         factory.createStream(proto::Stream_Kind_LENGTH);
     lengthEncoder = createRleEncoder(std::move(lengthStream),
                                      false,
                                      rleVersion,
-                                     memPool);
+                                     memPool,
+                                     options.getAlignedBitpacking());
     dataStream.reset(new AppendOnlyBufferedStream(
         factory.createStream(proto::Stream_Kind_DATA)));
 
@@ -916,9 +929,7 @@ namespace orc {
   void StringColumnWriter::getColumnEncoding(
     std::vector<proto::ColumnEncoding>& encodings) const {
     proto::ColumnEncoding encoding;
-    encoding.set_kind(rleVersion == RleVersion_1 ?
-                      proto::ColumnEncoding_Kind_DIRECT :
-                      proto::ColumnEncoding_Kind_DIRECT_V2);
+    encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
     encodings.push_back(encoding);
   }
@@ -1131,7 +1142,7 @@ namespace orc {
                              const StreamsFactory& factory,
                              const WriterOptions& options) :
                                  ColumnWriter(type, factory, options),
-                                 rleVersion(RleVersion_1),
+                                 rleVersion(options.getRleVersion()),
                                  timezone(getTimezoneByName("GMT")){
     std::unique_ptr<BufferedOutputStream> dataStream =
         factory.createStream(proto::Stream_Kind_DATA);
@@ -1140,11 +1151,13 @@ namespace orc {
     secRleEncoder = createRleEncoder(std::move(dataStream),
                                      true,
                                      rleVersion,
-                                     memPool);
+                                     memPool,
+                                     options.getAlignedBitpacking());
     nanoRleEncoder = createRleEncoder(std::move(secondaryStream),
                                       false,
                                       rleVersion,
-                                      memPool);
+                                      memPool,
+                                      options.getAlignedBitpacking());
 
     if (enableIndex) {
       recordPosition();
@@ -1241,9 +1254,7 @@ namespace orc {
   void TimestampColumnWriter::getColumnEncoding(
     std::vector<proto::ColumnEncoding>& encodings) const {
     proto::ColumnEncoding encoding;
-    encoding.set_kind(rleVersion == RleVersion_1 ?
-                      proto::ColumnEncoding_Kind_DIRECT :
-                      proto::ColumnEncoding_Kind_DIRECT_V2);
+    encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
     encodings.push_back(encoding);
   }
@@ -1344,7 +1355,7 @@ namespace orc {
                              const StreamsFactory& factory,
                              const WriterOptions& options) :
                                  ColumnWriter(type, factory, options),
-                                 rleVersion(RleVersion_1),
+                                 rleVersion(options.getRleVersion()),
                                  precision(type.getPrecision()),
                                  scale(type.getScale()) {
     valueStream.reset(new AppendOnlyBufferedStream(
@@ -1354,7 +1365,8 @@ namespace orc {
     scaleEncoder = createRleEncoder(std::move(scaleStream),
                                     true,
                                     rleVersion,
-                                    memPool);
+                                    memPool,
+                                    options.getAlignedBitpacking());
 
     if (enableIndex) {
       recordPosition();
@@ -1435,7 +1447,7 @@ namespace orc {
   void Decimal64ColumnWriter::getColumnEncoding(
     std::vector<proto::ColumnEncoding>& encodings) const {
     proto::ColumnEncoding encoding;
-    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+    encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
     encodings.push_back(encoding);
   }
@@ -1575,14 +1587,15 @@ namespace orc {
                                      const StreamsFactory& factory,
                                      const WriterOptions& options) :
                                        ColumnWriter(type, factory, options),
-                                       rleVersion(RleVersion_1){
+                                       rleVersion(options.getRleVersion()){
 
     std::unique_ptr<BufferedOutputStream> lengthStream =
       factory.createStream(proto::Stream_Kind_LENGTH);
     lengthEncoder = createRleEncoder(std::move(lengthStream),
                                      false,
                                      rleVersion,
-                                     memPool);
+                                     memPool,
+                                     options.getAlignedBitpacking());
 
     if (type.getSubtypeCount() == 1) {
       child = buildWriter(*type.getSubtype(0), factory, options);
@@ -1675,7 +1688,7 @@ namespace orc {
   void ListColumnWriter::getColumnEncoding(
                     std::vector<proto::ColumnEncoding>& encodings) const {
     proto::ColumnEncoding encoding;
-    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+    encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
     encodings.push_back(encoding);
     if (child.get()) {
@@ -1771,13 +1784,14 @@ namespace orc {
                                    const StreamsFactory& factory,
                                    const WriterOptions& options) :
                                      ColumnWriter(type, factory, options),
-                                     rleVersion(RleVersion_1){
+                                     rleVersion(options.getRleVersion()){
     std::unique_ptr<BufferedOutputStream> lengthStream =
       factory.createStream(proto::Stream_Kind_LENGTH);
     lengthEncoder = createRleEncoder(std::move(lengthStream),
                                      false,
                                      rleVersion,
-                                     memPool);
+                                     memPool,
+                                     options.getAlignedBitpacking());
 
     if (type.getSubtypeCount() > 0) {
       keyWriter = buildWriter(*type.getSubtype(0), factory, options);
@@ -1888,7 +1902,7 @@ namespace orc {
   void MapColumnWriter::getColumnEncoding(
                    std::vector<proto::ColumnEncoding>& encodings) const {
     proto::ColumnEncoding encoding;
-    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+    encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
     encodings.push_back(encoding);
     if (keyWriter.get()) {

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLE.cc
----------------------------------------------------------------------
diff --git a/c++/src/RLE.cc b/c++/src/RLE.cc
index f03c3ac..21f9082 100644
--- a/c++/src/RLE.cc
+++ b/c++/src/RLE.cc
@@ -34,13 +34,16 @@ namespace orc {
                          (std::unique_ptr<BufferedOutputStream> output,
                           bool isSigned,
                           RleVersion version,
-                          MemoryPool&) {
+                          MemoryPool&,
+                          bool alignedBitpacking) {
     switch (static_cast<int64_t>(version)) {
     case RleVersion_1:
       // We don't have std::make_unique() yet.
       return std::unique_ptr<RleEncoder>(new RleEncoderV1(std::move(output),
                                                           isSigned));
     case RleVersion_2:
+      return std::unique_ptr<RleEncoder>(new RleEncoderV2(std::move(output),
+                                                            isSigned, 
alignedBitpacking));
     default:
       throw NotImplementedYet("Not implemented yet");
     }
@@ -64,4 +67,55 @@ namespace orc {
     }
   }
 
+  void RleEncoder::add(const int64_t* data, uint64_t numValues,
+                         const char* notNull) {
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        write(data[i]);
+      }
+    }
+  }
+
+  void RleEncoder::writeVslong(int64_t val) {
+    writeVulong((val << 1) ^ (val >> 63));
+  }
+
+  void RleEncoder::writeVulong(int64_t val) {
+    while (true) {
+      if ((val & ~0x7f) == 0) {
+        writeByte(static_cast<char>(val));
+        return;
+      } else {
+        writeByte(static_cast<char>(0x80 | (val & 0x7f)));
+        // cast val to unsigned so as to force 0-fill right shift
+        val = (static_cast<uint64_t>(val) >> 7);
+      }
+    }
+  }
+
+  void RleEncoder::writeByte(char c) {
+    if (bufferPosition == bufferLength) {
+      int addedSize = 0;
+      if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) 
{
+        throw std::bad_alloc();
+      }
+      bufferPosition = 0;
+      bufferLength = static_cast<size_t>(addedSize);
+    }
+    buffer[bufferPosition++] = c;
+  }
+
+  void RleEncoder::recordPosition(PositionRecorder* recorder) const {
+    uint64_t flushedSize = outputStream->getSize();
+    uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
+    if (outputStream->isCompressed()) {
+      recorder->add(flushedSize);
+      recorder->add(unflushedSize);
+    } else {
+      flushedSize -= static_cast<uint64_t>(bufferLength);
+      recorder->add(flushedSize + unflushedSize);
+    }
+    recorder->add(static_cast<uint64_t>(numLiterals));
+  }
+
 }  // namespace orc

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLE.hh
----------------------------------------------------------------------
diff --git a/c++/src/RLE.hh b/c++/src/RLE.hh
index b1d654c..3eb3489 100644
--- a/c++/src/RLE.hh
+++ b/c++/src/RLE.hh
@@ -26,11 +26,6 @@
 
 namespace orc {
 
-  enum RleVersion {
-    RleVersion_1,
-    RleVersion_2
-  };
-
   inline int64_t zigZag(int64_t value) {
     return (value << 1) ^ (value >> 63);
   }
@@ -44,6 +39,18 @@ namespace orc {
     // must be non-inline!
     virtual ~RleEncoder();
 
+    RleEncoder(
+            std::unique_ptr<BufferedOutputStream> outStream,
+            bool hasSigned):
+            outputStream(std::move(outStream)),
+            bufferPosition(0),
+            bufferLength(0),
+            numLiterals(0),
+            isSigned(hasSigned),
+            buffer(nullptr){
+      //pass
+    }
+
     /**
      * Encode the next batch of values.
      * @param data the array to read from
@@ -52,12 +59,14 @@ namespace orc {
      *    pointer is not null, positions that are false are skipped.
      */
     virtual void add(const int64_t* data, uint64_t numValues,
-                      const char* notNull) = 0;
+                      const char* notNull);
 
     /**
      * Get size of buffer used so far.
      */
-    virtual uint64_t getBufferSize() const = 0;
+    uint64_t getBufferSize() const {
+        return outputStream->getSize();
+    }
 
     /**
      * Flushing underlying BufferedOutputStream
@@ -68,7 +77,24 @@ namespace orc {
      * record current position
      * @param recorder use the recorder to record current positions
      */
-    virtual void recordPosition(PositionRecorder* recorder) const = 0;
+    virtual void recordPosition(PositionRecorder* recorder) const;
+
+  protected:
+    std::unique_ptr<BufferedOutputStream> outputStream;
+    size_t bufferPosition;
+    size_t bufferLength;
+    size_t numLiterals;
+    int64_t* literals;
+    bool isSigned;
+    char* buffer;
+
+    virtual void write(int64_t val) = 0;
+
+    virtual void writeByte(char c);
+
+    virtual void writeVulong(int64_t val);
+
+    virtual void writeVslong(int64_t val);
   };
 
   class RleDecoder {
@@ -108,7 +134,8 @@ namespace orc {
                          (std::unique_ptr<BufferedOutputStream> output,
                           bool isSigned,
                           RleVersion version,
-                          MemoryPool& pool);
+                          MemoryPool& pool,
+                          bool alignedBitpacking);
 
   /**
    * Create an RLE decoder.

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLEV2Util.hh
----------------------------------------------------------------------
diff --git a/c++/src/RLEV2Util.hh b/c++/src/RLEV2Util.hh
new file mode 100644
index 0000000..7928c17
--- /dev/null
+++ b/c++/src/RLEV2Util.hh
@@ -0,0 +1,145 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef ORC_RLEV2UTIL_HH
+#define ORC_RLEV2UTIL_HH
+
+#include "RLEv2.hh"
+
+namespace orc {
+  inline uint32_t decodeBitWidth(uint32_t n) {
+    if (n <= FixedBitSizes::TWENTYFOUR) {
+      return n + 1;
+    } else if (n == FixedBitSizes::TWENTYSIX) {
+      return 26;
+    } else if (n == FixedBitSizes::TWENTYEIGHT) {
+      return 28;
+    } else if (n == FixedBitSizes::THIRTY) {
+      return 30;
+    } else if (n == FixedBitSizes::THIRTYTWO) {
+      return 32;
+    } else if (n == FixedBitSizes::FORTY) {
+      return 40;
+    } else if (n == FixedBitSizes::FORTYEIGHT) {
+      return 48;
+    } else if (n == FixedBitSizes::FIFTYSIX) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
+  inline uint32_t getClosestFixedBits(uint32_t n) {
+    if (n == 0) {
+      return 1;
+    }
+
+    if (n >= 1 && n <= 24) {
+      return n;
+    } else if (n <= 26) {
+      return 26;
+    } else if (n <= 28) {
+      return 28;
+    } else if (n <= 30) {
+      return 30;
+    } else if (n <= 32) {
+      return 32;
+    } else if (n <= 40) {
+      return 40;
+    } else if (n <= 48) {
+      return 48;
+    } else if (n <= 56) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
+  inline uint32_t getClosestAlignedFixedBits(uint32_t n) {
+    if (n == 0 ||  n == 1) {
+      return 1;
+    } else if (n <= 2) {
+      return 2;
+    } else if (n <= 4) {
+      return 4;
+    } else if (n <= 8) {
+      return 8;
+    } else if (n <= 16) {
+      return 16;
+    } else if (n <= 24) {
+      return 24;
+    } else if (n <= 32) {
+      return 32;
+    } else if (n <= 40) {
+      return 40;
+    } else if (n <= 48) {
+      return 48;
+    } else if (n <= 56) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
+  inline uint32_t encodeBitWidth(uint32_t n) {
+    n = getClosestFixedBits(n);
+
+    if (n >= 1 && n <= 24) {
+      return n - 1;
+    } else if (n <= 26) {
+      return FixedBitSizes::TWENTYSIX;
+    } else if (n <= 28) {
+      return FixedBitSizes::TWENTYEIGHT;
+    } else if (n <= 30) {
+      return FixedBitSizes::THIRTY;
+    } else if (n <= 32) {
+      return FixedBitSizes::THIRTYTWO;
+    } else if (n <= 40) {
+      return FixedBitSizes::FORTY;
+    } else if (n <= 48) {
+      return FixedBitSizes::FORTYEIGHT;
+    } else if (n <= 56) {
+      return FixedBitSizes::FIFTYSIX;
+    } else {
+      return FixedBitSizes::SIXTYFOUR;
+    }
+  }
+
+  inline uint32_t findClosestNumBits(int64_t value) {
+    if (value < 0) {
+      return getClosestFixedBits(64);
+    }
+
+    uint32_t count = 0;
+    while (value != 0) {
+      count++;
+      value = value >> 1;
+    }
+    return getClosestFixedBits(count);
+  }
+
+  inline bool isSafeSubtract(int64_t left, int64_t right) {
+    return ((left ^ right) >= 0) | ((left ^ (left - right)) >= 0);
+  }
+
+  inline uint32_t RleEncoderV2::getOpCode(EncodingType encoding) {
+    return static_cast<uint32_t >(encoding << 6);
+  }
+}
+
+#endif //ORC_RLEV2UTIL_HH

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLEv1.cc
----------------------------------------------------------------------
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index e588e6a..d8b5541 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -37,43 +37,17 @@ const int MAX_LITERAL_SIZE = 128;
 RleEncoderV1::RleEncoderV1(
                           std::unique_ptr<BufferedOutputStream> outStream,
                           bool hasSigned):
-                          outputStream(std::move(outStream)) {
-  isSigned = hasSigned;
+                          RleEncoder(std::move(outStream), hasSigned) {
   literals = new int64_t[MAX_LITERAL_SIZE];
-  numLiterals = 0;
   delta = 0;
   repeat = false;
   tailRunLength = 0;
-  bufferPosition = 0;
-  bufferLength = 0;
-  buffer = nullptr;
 }
 
 RleEncoderV1::~RleEncoderV1() {
   delete [] literals;
 }
 
-void RleEncoderV1::add(const int64_t* data, uint64_t numValues,
-                          const char* notNull) {
-  for (uint64_t i = 0; i < numValues; ++i) {
-    if (!notNull || notNull[i]) {
-      write(data[i]);
-    }
-  }
-}
-
-void RleEncoderV1::writeByte(char c) {
-  if (bufferPosition == bufferLength) {
-    int addedSize = 0;
-    if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) {
-      throw std::bad_alloc();
-    }
-    bufferPosition = 0;
-    bufferLength = addedSize;
-  }
-  buffer[bufferPosition++] = c;
-}
-
 void RleEncoderV1::writeValues() {
   if (numLiterals != 0) {
     if (repeat) {
@@ -87,7 +61,7 @@ void RleEncoderV1::writeValues() {
       }
     } else {
       writeByte(static_cast<char>(-numLiterals));
-      for(int i=0; i < numLiterals; ++i) {
+      for(size_t i=0; i < numLiterals; ++i) {
         if (isSigned) {
           writeVslong(literals[i]);
         } else {
@@ -103,7 +77,7 @@ void RleEncoderV1::writeValues() {
 
 uint64_t RleEncoderV1::flush() {
   writeValues();
-  outputStream->BackUp(bufferLength - bufferPosition);
+  outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
   uint64_t dataSize = outputStream->flush();
   bufferLength = bufferPosition = 0;
   return dataSize;
@@ -114,7 +88,7 @@ void RleEncoderV1::write(int64_t value) {
     literals[numLiterals++] = value;
     tailRunLength = 1;
   } else if (repeat) {
-    if (value == literals[0] + delta * numLiterals) {
+    if (value == literals[0] + delta * static_cast<int64_t>(numLiterals)) {
       numLiterals += 1;
       if (numLiterals == MAXIMUM_REPEAT) {
         writeValues();
@@ -163,36 +137,6 @@ void RleEncoderV1::write(int64_t value) {
   }
 }
 
-void RleEncoderV1::writeVslong(int64_t val) {
-  writeVulong((val << 1) ^ (val >> 63));
-}
-
-void RleEncoderV1::writeVulong(int64_t val) {
-  while (true) {
-    if ((val & ~BASE_128_MASK) == 0) {
-      writeByte(static_cast<char>(val));
-      return;
-    } else {
-      writeByte(static_cast<char>(0x80 | (val & BASE_128_MASK)));
-      // cast val to unsigned so as to force 0-fill right shift
-      val = (static_cast<uint64_t>(val) >> 7);
-    }
-  }
-}
-
-void RleEncoderV1::recordPosition(PositionRecorder* recorder) const {
-  uint64_t flushedSize = outputStream->getSize();
-  uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
-  if (outputStream->isCompressed()) {
-    recorder->add(flushedSize);
-    recorder->add(unflushedSize);
-  } else {
-    flushedSize -= static_cast<uint64_t>(bufferLength);
-    recorder->add(flushedSize + unflushedSize);
-  }
-  recorder->add(static_cast<uint64_t>(numLiterals));
-}
-
 signed char RleDecoderV1::readByte() {
   if (bufferStart == bufferEnd) {
     int bufferLength;

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLEv1.hh
----------------------------------------------------------------------
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index a4be19a..b83c51c 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -30,52 +30,19 @@ class RleEncoderV1 : public RleEncoder {
 public:
     RleEncoderV1(std::unique_ptr<BufferedOutputStream> outStream,
                  bool hasSigned);
-    ~RleEncoderV1() override;
-
-    /**
-     * Encode the next batch of values.
-     * @param data the array to be written
-     * @param numValues the number of values to write
-     * @param notNull If the pointer is null, all values are writen. If the
-     *    pointer is not null, positions that are false are skipped.
-     */
-    void add(const int64_t* data, uint64_t numValues,
-             const char* notNull) override;
-
-    /**
-     * Get size of buffer used so far.
-     */
-    uint64_t getBufferSize() const override {
-        return outputStream->getSize();
-    }
+    ~RleEncoderV1() override ;
 
     /**
      * Flushing underlying BufferedOutputStream
      */
     uint64_t flush() override;
 
-    /**
-     * record current position
-     * @param recorder use the recorder to record current positions
-     */
-    virtual void recordPosition(PositionRecorder* recorder) const override;
-
 private:
-    std::unique_ptr<BufferedOutputStream> outputStream;
-    bool isSigned;
-    int64_t* literals;
-    int numLiterals;
     int64_t delta;
     bool repeat;
     int tailRunLength;
-    int bufferPosition;
-    int bufferLength;
-    char* buffer;
-
-    void write(int64_t val);
-    void writeByte(char c);
-    void writeVulong(int64_t val);
-    void writeVslong(int64_t val);
+
+    void write(int64_t val) override;
     void writeValues();
 };
 
@@ -113,8 +80,8 @@ private:
     const bool isSigned;
     uint64_t remainingValues;
     int64_t value;
-    const char* bufferStart;
-    const char* bufferEnd;
+    const char *bufferStart;
+    const char *bufferEnd;
     int64_t delta;
     bool repeating;
 };

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLEv2.cc
----------------------------------------------------------------------
diff --git a/c++/src/RLEv2.cc b/c++/src/RLEv2.cc
deleted file mode 100644
index 10c6190..0000000
--- a/c++/src/RLEv2.cc
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "Adaptor.hh"
-#include "Compression.hh"
-#include "RLEv2.hh"
-
-#define MIN_REPEAT 3
-
-namespace orc {
-
-struct FixedBitSizes {
-  enum FBS {
-    ONE = 0, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, 
TWELVE,
-    THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
-    TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
-    TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR
-  };
-};
-
-inline uint32_t decodeBitWidth(uint32_t n) {
-  if (n <= FixedBitSizes::TWENTYFOUR) {
-    return n + 1;
-  } else if (n == FixedBitSizes::TWENTYSIX) {
-    return 26;
-  } else if (n == FixedBitSizes::TWENTYEIGHT) {
-    return 28;
-  } else if (n == FixedBitSizes::THIRTY) {
-    return 30;
-  } else if (n == FixedBitSizes::THIRTYTWO) {
-    return 32;
-  } else if (n == FixedBitSizes::FORTY) {
-    return 40;
-  } else if (n == FixedBitSizes::FORTYEIGHT) {
-    return 48;
-  } else if (n == FixedBitSizes::FIFTYSIX) {
-    return 56;
-  } else {
-    return 64;
-  }
-}
-
-inline uint32_t getClosestFixedBits(uint32_t n) {
-  if (n == 0) {
-    return 1;
-  }
-
-  if (n >= 1 && n <= 24) {
-    return n;
-  } else if (n > 24 && n <= 26) {
-    return 26;
-  } else if (n > 26 && n <= 28) {
-    return 28;
-  } else if (n > 28 && n <= 30) {
-    return 30;
-  } else if (n > 30 && n <= 32) {
-    return 32;
-  } else if (n > 32 && n <= 40) {
-    return 40;
-  } else if (n > 40 && n <= 48) {
-    return 48;
-  } else if (n > 48 && n <= 56) {
-    return 56;
-  } else {
-    return 64;
-  }
-}
-
-int64_t RleDecoderV2::readLongBE(uint64_t bsz) {
-  int64_t ret = 0, val;
-  uint64_t n = bsz;
-  while (n > 0) {
-    n--;
-    val = readByte();
-    ret |= (val << (n * 8));
-  }
-  return ret;
-}
-
-inline int64_t RleDecoderV2::readVslong() {
-  return unZigZag(readVulong());
-}
-
-uint64_t RleDecoderV2::readVulong() {
-  uint64_t ret = 0, b;
-  uint64_t offset = 0;
-  do {
-    b = readByte();
-    ret |= (0x7f & b) << offset;
-    offset += 7;
-  } while (b >= 0x80);
-  return ret;
-}
-
-RleDecoderV2::RleDecoderV2(std::unique_ptr<SeekableInputStream> input,
-                           bool _isSigned, MemoryPool& pool
-                           ): inputStream(std::move(input)),
-                              isSigned(_isSigned),
-                              firstByte(0),
-                              runLength(0),
-                              runRead(0),
-                              bufferStart(nullptr),
-                              bufferEnd(bufferStart),
-                              deltaBase(0),
-                              byteSize(0),
-                              firstValue(0),
-                              prevValue(0),
-                              bitSize(0),
-                              bitsLeft(0),
-                              curByte(0),
-                              patchBitSize(0),
-                              unpackedIdx(0),
-                              patchIdx(0),
-                              base(0),
-                              curGap(0),
-                              curPatch(0),
-                              patchMask(0),
-                              actualGap(0),
-                              unpacked(pool, 0),
-                              unpackedPatch(pool, 0) {
-  // PASS
-}
-
-void RleDecoderV2::seek(PositionProvider& location) {
-  // move the input stream
-  inputStream->seek(location);
-  // clear state
-  bufferEnd = bufferStart = nullptr;
-  runRead = runLength = 0;
-  // skip ahead the given number of records
-  skip(location.next());
-}
-
-void RleDecoderV2::skip(uint64_t numValues) {
-  // simple for now, until perf tests indicate something encoding specific is
-  // needed
-  const uint64_t N = 64;
-  int64_t dummy[N];
-
-  while (numValues) {
-    uint64_t nRead = std::min(N, numValues);
-    next(dummy, nRead, nullptr);
-    numValues -= nRead;
-  }
-}
-
-void RleDecoderV2::next(int64_t* const data,
-                        const uint64_t numValues,
-                        const char* const notNull) {
-  uint64_t nRead = 0;
-
-  while (nRead < numValues) {
-    // Skip any nulls before attempting to read first byte.
-    while (notNull && !notNull[nRead]) {
-      if (++nRead == numValues) {
-        return; // ended with null values
-      }
-    }
-
-    if (runRead == runLength) {
-      resetRun();
-      firstByte = readByte();
-    }
-
-    uint64_t offset = nRead, length = numValues - nRead;
-
-    EncodingType enc = static_cast<EncodingType>
-        ((firstByte >> 6) & 0x03);
-    switch(static_cast<int64_t>(enc)) {
-    case SHORT_REPEAT:
-      nRead += nextShortRepeats(data, offset, length, notNull);
-      break;
-    case DIRECT:
-      nRead += nextDirect(data, offset, length, notNull);
-      break;
-    case PATCHED_BASE:
-      nRead += nextPatched(data, offset, length, notNull);
-      break;
-    case DELTA:
-      nRead += nextDelta(data, offset, length, notNull);
-      break;
-    default:
-      throw ParseError("unknown encoding");
-    }
-  }
-}
-
-uint64_t RleDecoderV2::nextShortRepeats(int64_t* const data,
-                                        uint64_t offset,
-                                        uint64_t numValues,
-                                        const char* const notNull) {
-  if (runRead == runLength) {
-    // extract the number of fixed bytes
-    byteSize = (firstByte >> 3) & 0x07;
-    byteSize += 1;
-
-    runLength = firstByte & 0x07;
-    // run lengths values are stored only after MIN_REPEAT value is met
-    runLength += MIN_REPEAT;
-    runRead = 0;
-
-    // read the repeated value which is store using fixed bytes
-    firstValue = readLongBE(byteSize);
-
-    if (isSigned) {
-      firstValue = unZigZag(static_cast<uint64_t>(firstValue));
-    }
-  }
-
-  uint64_t nRead = std::min(runLength - runRead, numValues);
-
-  if (notNull) {
-    for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
-      if (notNull[pos]) {
-        data[pos] = firstValue;
-        ++runRead;
-      }
-    }
-  } else {
-    for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
-      data[pos] = firstValue;
-      ++runRead;
-    }
-  }
-
-  return nRead;
-}
-
-uint64_t RleDecoderV2::nextDirect(int64_t* const data,
-                                  uint64_t offset,
-                                  uint64_t numValues,
-                                  const char* const notNull) {
-  if (runRead == runLength) {
-    // extract the number of fixed bits
-    unsigned char fbo = (firstByte >> 1) & 0x1f;
-    bitSize = decodeBitWidth(fbo);
-
-    // extract the run length
-    runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
-    runLength |= readByte();
-    // runs are one off
-    runLength += 1;
-    runRead = 0;
-  }
-
-  uint64_t nRead = std::min(runLength - runRead, numValues);
-
-  runRead += readLongs(data, offset, nRead, bitSize, notNull);
-
-  if (isSigned) {
-    if (notNull) {
-      for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
-        if (notNull[pos]) {
-          data[pos] = unZigZag(static_cast<uint64_t>(data[pos]));
-        }
-      }
-    } else {
-      for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
-        data[pos] = unZigZag(static_cast<uint64_t>(data[pos]));
-      }
-    }
-  }
-
-  return nRead;
-}
-
-uint64_t RleDecoderV2::nextPatched(int64_t* const data,
-                                   uint64_t offset,
-                                   uint64_t numValues,
-                                   const char* const notNull) {
-  if (runRead == runLength) {
-    // extract the number of fixed bits
-    unsigned char fbo = (firstByte >> 1) & 0x1f;
-    bitSize = decodeBitWidth(fbo);
-
-    // extract the run length
-    runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
-    runLength |= readByte();
-    // runs are one off
-    runLength += 1;
-    runRead = 0;
-
-    // extract the number of bytes occupied by base
-    uint64_t thirdByte = readByte();
-    byteSize = (thirdByte >> 5) & 0x07;
-    // base width is one off
-    byteSize += 1;
-
-    // extract patch width
-    uint32_t pwo = thirdByte & 0x1f;
-    patchBitSize = decodeBitWidth(pwo);
-
-    // read fourth byte and extract patch gap width
-    uint64_t fourthByte = readByte();
-    uint32_t pgw = (fourthByte >> 5) & 0x07;
-    // patch gap width is one off
-    pgw += 1;
-
-    // extract the length of the patch list
-    size_t pl = fourthByte & 0x1f;
-    if (pl == 0) {
-      throw ParseError("Corrupt PATCHED_BASE encoded data (pl==0)!");
-    }
-
-    // read the next base width number of bytes to extract base value
-    base = readLongBE(byteSize);
-    int64_t mask = (static_cast<int64_t>(1) << ((byteSize * 8) - 1));
-    // if mask of base value is 1 then base is negative value else positive
-    if ((base & mask) != 0) {
-      base = base & ~mask;
-      base = -base;
-    }
-
-    // TODO: something more efficient than resize
-    unpacked.resize(runLength);
-    unpackedIdx = 0;
-    readLongs(unpacked.data(), 0, runLength, bitSize);
-    // any remaining bits are thrown out
-    resetReadLongs();
-
-    // TODO: something more efficient than resize
-    unpackedPatch.resize(pl);
-    patchIdx = 0;
-    // TODO: Skip corrupt?
-    //    if ((patchBitSize + pgw) > 64 && !skipCorrupt) {
-    if ((patchBitSize + pgw) > 64) {
-      throw ParseError("Corrupt PATCHED_BASE encoded data "
-                       "(patchBitSize + pgw > 64)!");
-    }
-    uint32_t cfb = getClosestFixedBits(patchBitSize + pgw);
-    readLongs(unpackedPatch.data(), 0, pl, cfb);
-    // any remaining bits are thrown out
-    resetReadLongs();
-
-    // apply the patch directly when decoding the packed data
-    patchMask = ((static_cast<int64_t>(1) << patchBitSize) - 1);
-
-    adjustGapAndPatch();
-  }
-
-  uint64_t nRead = std::min(runLength - runRead, numValues);
-
-  for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
-    // skip null positions
-    if (notNull && !notNull[pos]) {
-      continue;
-    }
-    if (static_cast<int64_t>(unpackedIdx) != actualGap) {
-      // no patching required. add base to unpacked value to get final value
-      data[pos] = base + unpacked[unpackedIdx];
-    } else {
-      // extract the patch value
-      int64_t patchedVal = unpacked[unpackedIdx] | (curPatch << bitSize);
-
-      // add base to patched value
-      data[pos] = base + patchedVal;
-
-      // increment the patch to point to next entry in patch list
-      ++patchIdx;
-
-      if (patchIdx < unpackedPatch.size()) {
-        adjustGapAndPatch();
-
-        // next gap is relative to the current gap
-        actualGap += unpackedIdx;
-      }
-    }
-
-    ++runRead;
-    ++unpackedIdx;
-  }
-
-  return nRead;
-}
-
-uint64_t RleDecoderV2::nextDelta(int64_t* const data,
-                                 uint64_t offset,
-                                 uint64_t numValues,
-                                 const char* const notNull) {
-  if (runRead == runLength) {
-    // extract the number of fixed bits
-    unsigned char fbo = (firstByte >> 1) & 0x1f;
-    if (fbo != 0) {
-      bitSize = decodeBitWidth(fbo);
-    } else {
-      bitSize = 0;
-    }
-
-    // extract the run length
-    runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
-    runLength |= readByte();
-    ++runLength; // account for first value
-    runRead = deltaBase = 0;
-
-    // read the first value stored as vint
-    if (isSigned) {
-      firstValue = static_cast<int64_t>(readVslong());
-    } else {
-      firstValue = static_cast<int64_t>(readVulong());
-    }
-
-    prevValue = firstValue;
-
-    // read the fixed delta value stored as vint (deltas can be negative even
-    // if all number are positive)
-    deltaBase = static_cast<int64_t>(readVslong());
-  }
-
-  uint64_t nRead = std::min(runLength - runRead, numValues);
-
-  uint64_t pos = offset;
-  for ( ; pos < offset + nRead; ++pos) {
-    // skip null positions
-    if (!notNull || notNull[pos]) break;
-  }
-  if (runRead == 0 && pos < offset + nRead) {
-    data[pos++] = firstValue;
-    ++runRead;
-  }
-
-  if (bitSize == 0) {
-    // add fixed deltas to adjacent values
-    for ( ; pos < offset + nRead; ++pos) {
-      // skip null positions
-      if (notNull && !notNull[pos]) {
-        continue;
-      }
-      prevValue = data[pos] = prevValue + deltaBase;
-      ++runRead;
-    }
-  } else {
-    for ( ; pos < offset + nRead; ++pos) {
-      // skip null positions
-      if (!notNull || notNull[pos]) break;
-    }
-    if (runRead < 2 && pos < offset + nRead) {
-      // add delta base and first value
-      prevValue = data[pos++] = firstValue + deltaBase;
-      ++runRead;
-    }
-
-    // write the unpacked values, add it to previous value and store final
-    // value to result buffer. if the delta base value is negative then it
-    // is a decreasing sequence else an increasing sequence
-    uint64_t remaining = (offset + nRead) - pos;
-    runRead += readLongs(data, pos, remaining, bitSize, notNull);
-
-    if (deltaBase < 0) {
-      for ( ; pos < offset + nRead; ++pos) {
-        // skip null positions
-        if (notNull && !notNull[pos]) {
-          continue;
-        }
-        prevValue = data[pos] = prevValue - data[pos];
-      }
-    } else {
-      for ( ; pos < offset + nRead; ++pos) {
-        // skip null positions
-        if (notNull && !notNull[pos]) {
-          continue;
-        }
-        prevValue = data[pos] = prevValue + data[pos];
-      }
-    }
-  }
-  return nRead;
-}
-
-}  // namespace orc

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RLEv2.hh
----------------------------------------------------------------------
diff --git a/c++/src/RLEv2.hh b/c++/src/RLEv2.hh
index 5b86abc..a51b276 100644
--- a/c++/src/RLEv2.hh
+++ b/c++/src/RLEv2.hh
@@ -25,13 +25,89 @@
 
 #include <vector>
 
+#define MIN_REPEAT 3
+#define HIST_LEN 32
 namespace orc {
 
-class RleDecoderV2 : public RleDecoder {
+struct FixedBitSizes {
+    enum FBS {
+        ONE = 0, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, 
TWELVE,
+        THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
+        TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
+        TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, 
SIXTYFOUR, SIZE
+    };
+};
+
+enum EncodingType { SHORT_REPEAT=0, DIRECT=1, PATCHED_BASE=2, DELTA=3 };
+
+struct EncodingOption {
+  EncodingType encoding;
+  int64_t fixedDelta;
+  int64_t gapVsPatchListCount;
+  int64_t zigzagLiteralsCount;
+  int64_t baseRedLiteralsCount;
+  int64_t adjDeltasCount;
+  uint32_t zzBits90p;
+  uint32_t zzBits100p;
+  uint32_t brBits95p;
+  uint32_t brBits100p;
+  uint32_t bitsDeltaMax;
+  uint32_t patchWidth;
+  uint32_t patchGapWidth;
+  uint32_t patchLength;
+  int64_t min;
+  bool isFixedDelta;
+};
+
+class RleEncoderV2 : public RleEncoder {
 public:
+    RleEncoderV2(std::unique_ptr<BufferedOutputStream> outStream, bool 
hasSigned, bool alignBitPacking = true);
+
+    ~RleEncoderV2() override {
+      delete [] literals;
+      delete [] gapVsPatchList;
+      delete [] zigzagLiterals;
+      delete [] baseRedLiterals;
+      delete [] adjDeltas;
+    }
+    /**
+     * Flushing underlying BufferedOutputStream
+     */
+    uint64_t flush() override;
 
-  enum EncodingType { SHORT_REPEAT=0, DIRECT=1, PATCHED_BASE=2, DELTA=3 };
+private:
+
+    const bool alignedBitPacking;
+    uint32_t fixedRunLength;
+    uint32_t variableRunLength;
+    int64_t prevDelta;
+    int32_t histgram[HIST_LEN];
+
+    // The four list below should actually belong to EncodingOption since it 
only holds temporal values in write(int64_t val),
+    // it is move here for performance consideration.
+    int64_t* gapVsPatchList;
+    int64_t*  zigzagLiterals;
+    int64_t*  baseRedLiterals;
+    int64_t*  adjDeltas;
+
+    uint32_t getOpCode(EncodingType encoding);
+    void determineEncoding(EncodingOption& option);
+    void computeZigZagLiterals(EncodingOption& option);
+    void preparePatchedBlob(EncodingOption& option);
+
+    void write(int64_t val) override ;
+    void writeInts(int64_t* input, uint32_t offset, size_t len, uint32_t 
bitSize);
+    void initializeLiterals(int64_t val);
+    void writeValues(EncodingOption& option);
+    void writeShortRepeatValues(EncodingOption& option);
+    void writeDirectValues(EncodingOption& option);
+    void writePatchedBasedValues(EncodingOption& option);
+    void writeDeltaValues(EncodingOption& option);
+    uint32_t percentileBits(int64_t* data, size_t offset, size_t length, 
double p, bool reuseHist = false);
+};
 
+class RleDecoderV2 : public RleDecoder {
+public:
   RleDecoderV2(std::unique_ptr<SeekableInputStream> input,
                bool isSigned, MemoryPool& pool);
 
@@ -134,7 +210,6 @@ private:
   return ret;
 }
 
-
   uint64_t nextShortRepeats(int64_t* data, uint64_t offset, uint64_t numValues,
                             const char* notNull);
   uint64_t nextDirect(int64_t* data, uint64_t offset, uint64_t numValues,

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RleDecoderV2.cc
----------------------------------------------------------------------
diff --git a/c++/src/RleDecoderV2.cc b/c++/src/RleDecoderV2.cc
new file mode 100644
index 0000000..c5c6f6a
--- /dev/null
+++ b/c++/src/RleDecoderV2.cc
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Adaptor.hh"
+#include "Compression.hh"
+#include "RLEv2.hh"
+#include "RLEV2Util.hh"
+
+namespace orc {
+
+int64_t RleDecoderV2::readLongBE(uint64_t bsz) {
+  int64_t ret = 0, val;
+  uint64_t n = bsz;
+  while (n > 0) {
+    n--;
+    val = readByte();
+    ret |= (val << (n * 8));
+  }
+  return ret;
+}
+
+inline int64_t RleDecoderV2::readVslong() {
+  return unZigZag(readVulong());
+}
+
+uint64_t RleDecoderV2::readVulong() {
+  uint64_t ret = 0, b;
+  uint64_t offset = 0;
+  do {
+    b = readByte();
+    ret |= (0x7f & b) << offset;
+    offset += 7;
+  } while (b >= 0x80);
+  return ret;
+}
+
+RleDecoderV2::RleDecoderV2(std::unique_ptr<SeekableInputStream> input,
+                           bool _isSigned, MemoryPool& pool
+                           ): inputStream(std::move(input)),
+                              isSigned(_isSigned),
+                              firstByte(0),
+                              runLength(0),
+                              runRead(0),
+                              bufferStart(nullptr),
+                              bufferEnd(bufferStart),
+                              deltaBase(0),
+                              byteSize(0),
+                              firstValue(0),
+                              prevValue(0),
+                              bitSize(0),
+                              bitsLeft(0),
+                              curByte(0),
+                              patchBitSize(0),
+                              unpackedIdx(0),
+                              patchIdx(0),
+                              base(0),
+                              curGap(0),
+                              curPatch(0),
+                              patchMask(0),
+                              actualGap(0),
+                              unpacked(pool, 0),
+                              unpackedPatch(pool, 0) {
+  // PASS
+}
+
+void RleDecoderV2::seek(PositionProvider& location) {
+  // move the input stream
+  inputStream->seek(location);
+  // clear state
+  bufferEnd = bufferStart = nullptr;
+  runRead = runLength = 0;
+  // skip ahead the given number of records
+  skip(location.next());
+}
+
+void RleDecoderV2::skip(uint64_t numValues) {
+  // simple for now, until perf tests indicate something encoding specific is
+  // needed
+  const uint64_t N = 64;
+  int64_t dummy[N];
+
+  while (numValues) {
+    uint64_t nRead = std::min(N, numValues);
+    next(dummy, nRead, nullptr);
+    numValues -= nRead;
+  }
+}
+
+void RleDecoderV2::next(int64_t* const data,
+                        const uint64_t numValues,
+                        const char* const notNull) {
+  uint64_t nRead = 0;
+
+  while (nRead < numValues) {
+    // Skip any nulls before attempting to read first byte.
+    while (notNull && !notNull[nRead]) {
+      if (++nRead == numValues) {
+        return; // ended with null values
+      }
+    }
+
+    if (runRead == runLength) {
+      resetRun();
+      firstByte = readByte();
+    }
+
+    uint64_t offset = nRead, length = numValues - nRead;
+
+    EncodingType enc = static_cast<EncodingType>
+        ((firstByte >> 6) & 0x03);
+    switch(static_cast<int64_t>(enc)) {
+    case SHORT_REPEAT:
+      nRead += nextShortRepeats(data, offset, length, notNull);
+      break;
+    case DIRECT:
+      nRead += nextDirect(data, offset, length, notNull);
+      break;
+    case PATCHED_BASE:
+      nRead += nextPatched(data, offset, length, notNull);
+      break;
+    case DELTA:
+      nRead += nextDelta(data, offset, length, notNull);
+      break;
+    default:
+      throw ParseError("unknown encoding");
+    }
+  }
+}
+
+uint64_t RleDecoderV2::nextShortRepeats(int64_t* const data,
+                                        uint64_t offset,
+                                        uint64_t numValues,
+                                        const char* const notNull) {
+  if (runRead == runLength) {
+    // extract the number of fixed bytes
+    byteSize = (firstByte >> 3) & 0x07;
+    byteSize += 1;
+
+    runLength = firstByte & 0x07;
+    // run lengths values are stored only after MIN_REPEAT value is met
+    runLength += MIN_REPEAT;
+    runRead = 0;
+
+    // read the repeated value which is store using fixed bytes
+    firstValue = readLongBE(byteSize);
+
+    if (isSigned) {
+      firstValue = unZigZag(static_cast<uint64_t>(firstValue));
+    }
+  }
+
+  uint64_t nRead = std::min(runLength - runRead, numValues);
+
+  if (notNull) {
+    for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
+      if (notNull[pos]) {
+        data[pos] = firstValue;
+        ++runRead;
+      }
+    }
+  } else {
+    for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
+      data[pos] = firstValue;
+      ++runRead;
+    }
+  }
+
+  return nRead;
+}
+
+uint64_t RleDecoderV2::nextDirect(int64_t* const data,
+                                  uint64_t offset,
+                                  uint64_t numValues,
+                                  const char* const notNull) {
+  if (runRead == runLength) {
+    // extract the number of fixed bits
+    unsigned char fbo = (firstByte >> 1) & 0x1f;
+    bitSize = decodeBitWidth(fbo);
+
+    // extract the run length
+    runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
+    runLength |= readByte();
+    // runs are one off
+    runLength += 1;
+    runRead = 0;
+  }
+
+  uint64_t nRead = std::min(runLength - runRead, numValues);
+
+  runRead += readLongs(data, offset, nRead, bitSize, notNull);
+
+  if (isSigned) {
+    if (notNull) {
+      for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
+        if (notNull[pos]) {
+          data[pos] = unZigZag(static_cast<uint64_t>(data[pos]));
+        }
+      }
+    } else {
+      for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
+        data[pos] = unZigZag(static_cast<uint64_t>(data[pos]));
+      }
+    }
+  }
+
+  return nRead;
+}
+
+uint64_t RleDecoderV2::nextPatched(int64_t* const data,
+                                   uint64_t offset,
+                                   uint64_t numValues,
+                                   const char* const notNull) {
+  if (runRead == runLength) {
+    // extract the number of fixed bits
+    unsigned char fbo = (firstByte >> 1) & 0x1f;
+    bitSize = decodeBitWidth(fbo);
+
+    // extract the run length
+    runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
+    runLength |= readByte();
+    // runs are one off
+    runLength += 1;
+    runRead = 0;
+
+    // extract the number of bytes occupied by base
+    uint64_t thirdByte = readByte();
+    byteSize = (thirdByte >> 5) & 0x07;
+    // base width is one off
+    byteSize += 1;
+
+    // extract patch width
+    uint32_t pwo = thirdByte & 0x1f;
+    patchBitSize = decodeBitWidth(pwo);
+
+    // read fourth byte and extract patch gap width
+    uint64_t fourthByte = readByte();
+    uint32_t pgw = (fourthByte >> 5) & 0x07;
+    // patch gap width is one off
+    pgw += 1;
+
+    // extract the length of the patch list
+    size_t pl = fourthByte & 0x1f;
+    if (pl == 0) {
+      throw ParseError("Corrupt PATCHED_BASE encoded data (pl==0)!");
+    }
+
+    // read the next base width number of bytes to extract base value
+    base = readLongBE(byteSize);
+    int64_t mask = (static_cast<int64_t>(1) << ((byteSize * 8) - 1));
+    // if mask of base value is 1 then base is negative value else positive
+    if ((base & mask) != 0) {
+      base = base & ~mask;
+      base = -base;
+    }
+
+    // TODO: something more efficient than resize
+    unpacked.resize(runLength);
+    unpackedIdx = 0;
+    readLongs(unpacked.data(), 0, runLength, bitSize);
+    // any remaining bits are thrown out
+    resetReadLongs();
+
+    // TODO: something more efficient than resize
+    unpackedPatch.resize(pl);
+    patchIdx = 0;
+    // TODO: Skip corrupt?
+    //    if ((patchBitSize + pgw) > 64 && !skipCorrupt) {
+    if ((patchBitSize + pgw) > 64) {
+      throw ParseError("Corrupt PATCHED_BASE encoded data "
+                       "(patchBitSize + pgw > 64)!");
+    }
+    uint32_t cfb = getClosestFixedBits(patchBitSize + pgw);
+    readLongs(unpackedPatch.data(), 0, pl, cfb);
+    // any remaining bits are thrown out
+    resetReadLongs();
+
+    // apply the patch directly when decoding the packed data
+    patchMask = ((static_cast<int64_t>(1) << patchBitSize) - 1);
+
+    adjustGapAndPatch();
+  }
+
+  uint64_t nRead = std::min(runLength - runRead, numValues);
+
+  for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
+    // skip null positions
+    if (notNull && !notNull[pos]) {
+      continue;
+    }
+    if (static_cast<int64_t>(unpackedIdx) != actualGap) {
+      // no patching required. add base to unpacked value to get final value
+      data[pos] = base + unpacked[unpackedIdx];
+    } else {
+      // extract the patch value
+      int64_t patchedVal = unpacked[unpackedIdx] | (curPatch << bitSize);
+
+      // add base to patched value
+      data[pos] = base + patchedVal;
+
+      // increment the patch to point to next entry in patch list
+      ++patchIdx;
+
+      if (patchIdx < unpackedPatch.size()) {
+        adjustGapAndPatch();
+
+        // next gap is relative to the current gap
+        actualGap += unpackedIdx;
+      }
+    }
+
+    ++runRead;
+    ++unpackedIdx;
+  }
+
+  return nRead;
+}
+
+uint64_t RleDecoderV2::nextDelta(int64_t* const data,
+                                 uint64_t offset,
+                                 uint64_t numValues,
+                                 const char* const notNull) {
+  if (runRead == runLength) {
+    // extract the number of fixed bits
+    unsigned char fbo = (firstByte >> 1) & 0x1f;
+    if (fbo != 0) {
+      bitSize = decodeBitWidth(fbo);
+    } else {
+      bitSize = 0;
+    }
+
+    // extract the run length
+    runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
+    runLength |= readByte();
+    ++runLength; // account for first value
+    runRead = deltaBase = 0;
+
+    // read the first value stored as vint
+    if (isSigned) {
+      firstValue = static_cast<int64_t>(readVslong());
+    } else {
+      firstValue = static_cast<int64_t>(readVulong());
+    }
+
+    prevValue = firstValue;
+
+    // read the fixed delta value stored as vint (deltas can be negative even
+    // if all number are positive)
+    deltaBase = static_cast<int64_t>(readVslong());
+  }
+
+  uint64_t nRead = std::min(runLength - runRead, numValues);
+
+  uint64_t pos = offset;
+  for ( ; pos < offset + nRead; ++pos) {
+    // skip null positions
+    if (!notNull || notNull[pos]) break;
+  }
+  if (runRead == 0 && pos < offset + nRead) {
+    data[pos++] = firstValue;
+    ++runRead;
+  }
+
+  if (bitSize == 0) {
+    // add fixed deltas to adjacent values
+    for ( ; pos < offset + nRead; ++pos) {
+      // skip null positions
+      if (notNull && !notNull[pos]) {
+        continue;
+      }
+      prevValue = data[pos] = prevValue + deltaBase;
+      ++runRead;
+    }
+  } else {
+    for ( ; pos < offset + nRead; ++pos) {
+      // skip null positions
+      if (!notNull || notNull[pos]) break;
+    }
+    if (runRead < 2 && pos < offset + nRead) {
+      // add delta base and first value
+      prevValue = data[pos++] = firstValue + deltaBase;
+      ++runRead;
+    }
+
+    // write the unpacked values, add it to previous value and store final
+    // value to result buffer. if the delta base value is negative then it
+    // is a decreasing sequence else an increasing sequence
+    uint64_t remaining = (offset + nRead) - pos;
+    runRead += readLongs(data, pos, remaining, bitSize, notNull);
+
+    if (deltaBase < 0) {
+      for ( ; pos < offset + nRead; ++pos) {
+        // skip null positions
+        if (notNull && !notNull[pos]) {
+          continue;
+        }
+        prevValue = data[pos] = prevValue - data[pos];
+      }
+    } else {
+      for ( ; pos < offset + nRead; ++pos) {
+        // skip null positions
+        if (notNull && !notNull[pos]) {
+          continue;
+        }
+        prevValue = data[pos] = prevValue + data[pos];
+      }
+    }
+  }
+  return nRead;
+}
+
+}  // namespace orc

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/RleEncoderV2.cc
----------------------------------------------------------------------
diff --git a/c++/src/RleEncoderV2.cc b/c++/src/RleEncoderV2.cc
new file mode 100644
index 0000000..4bd4c9d
--- /dev/null
+++ b/c++/src/RleEncoderV2.cc
@@ -0,0 +1,767 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with option work for additional information
+ * regarding copyright ownership.  The ASF licenses option file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use option file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Adaptor.hh"
+#include "Compression.hh"
+#include "RLEv2.hh"
+#include "RLEV2Util.hh"
+
+#define MAX_LITERAL_SIZE 512
+#define MAX_SHORT_REPEAT_LENGTH 10
+
+namespace orc {
+
+/**
+ * Compute the bits required to represent pth percentile value
+ * @param data - array
+ * @param p - percentile value (>=0.0 to <=1.0)
+ * @return pth percentile bits
+ */
+uint32_t RleEncoderV2::percentileBits(int64_t* data, size_t offset, size_t 
length, double p, bool reuseHist) {
+    if ((p > 1.0) || (p <= 0.0)) {
+        throw InvalidArgument("Invalid p value: " + std::to_string(p));
+    }
+
+    if (!reuseHist) {
+        // histogram that store the encoded bit requirement for each values.
+        // maximum number of bits that can encoded is 32 (refer FixedBitSizes)
+        memset(histgram, 0, FixedBitSizes::SIZE * sizeof(int32_t));
+        // compute the histogram
+        for(size_t i = offset; i < (offset + length); i++) {
+            uint32_t idx = encodeBitWidth(findClosestNumBits(data[i]));
+            histgram[idx] += 1;
+        }
+    }
+
+    int32_t perLen = static_cast<int32_t>(static_cast<double>(length) * (1.0 - 
p));
+
+    // return the bits required by pth percentile length
+    for(int32_t i = HIST_LEN - 1; i >= 0; i--) {
+        perLen -= histgram[i];
+        if (perLen < 0) {
+            return decodeBitWidth(static_cast<uint32_t>(i));
+        }
+    }
+    return 0;
+}
+
+RleEncoderV2::RleEncoderV2(std::unique_ptr<BufferedOutputStream> outStream,
+                           bool hasSigned, bool alignBitPacking) :
+        RleEncoder(std::move(outStream), hasSigned),
+        alignedBitPacking(alignBitPacking),
+        prevDelta(0){
+    literals = new int64_t[MAX_LITERAL_SIZE];
+    gapVsPatchList = new int64_t[MAX_LITERAL_SIZE];
+    zigzagLiterals = new int64_t[MAX_LITERAL_SIZE];
+    baseRedLiterals = new int64_t[MAX_LITERAL_SIZE];
+    adjDeltas = new int64_t[MAX_LITERAL_SIZE];
+}
+
+void RleEncoderV2::write(int64_t val) {
+    if(numLiterals == 0) {
+        initializeLiterals(val);
+        return;
+    }
+
+    if(numLiterals == 1) {
+        prevDelta = val - literals[0];
+        literals[numLiterals++] = val;
+
+        if(val == literals[0]) {
+            fixedRunLength = 2;
+            variableRunLength = 0;
+        } else {
+            fixedRunLength = 0;
+            variableRunLength = 2;
+        }
+        return;
+    }
+
+    int64_t currentDelta = val - literals[numLiterals - 1];
+    EncodingOption option = {};
+    if (prevDelta == 0 && currentDelta == 0) {
+        // case 1: fixed delta run
+        literals[numLiterals++] = val;
+
+        if (variableRunLength > 0) {
+            // if variable run is non-zero then we are seeing repeating
+            // values at the end of variable run in which case fixed Run
+            // length is 2
+            fixedRunLength = 2;
+        }
+        fixedRunLength++;
+
+        // if fixed run met the minimum condition and if variable
+        // run is non-zero then flush the variable run and shift the
+        // tail fixed runs to start of the buffer
+        if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+            numLiterals -= MIN_REPEAT;
+            variableRunLength -= (MIN_REPEAT - 1);
+
+            int64_t tailVals[MIN_REPEAT] = {0};
+
+            memcpy(tailVals, literals + numLiterals, sizeof(int64_t) * 
MIN_REPEAT);
+            determineEncoding(option);
+            writeValues(option);
+
+            // shift tail fixed runs to beginning of the buffer
+            for (size_t i = 0; i < MIN_REPEAT; ++i) {
+                literals[numLiterals++] = tailVals[i];
+            }
+        }
+
+        if (fixedRunLength == MAX_LITERAL_SIZE) {;
+            determineEncoding(option);
+            writeValues(option);
+        }
+        return;
+    }
+
+    // case 2: variable delta run
+
+    // if fixed run length is non-zero and if it satisfies the
+    // short repeat conditions then write the values as short repeats
+    // else use delta encoding
+    if (fixedRunLength >= MIN_REPEAT) {
+        if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+            option.encoding = SHORT_REPEAT;
+        } else {
+            option.encoding = DELTA;
+            option.isFixedDelta = true;
+        }
+        writeValues(option);
+    }
+
+    // if fixed run length is <MIN_REPEAT and current value is
+    // different from previous then treat it as variable run
+    if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT && val != 
literals[numLiterals - 1]) {
+        variableRunLength = fixedRunLength;
+        fixedRunLength = 0;
+    }
+
+    // after writing values re-initialize the variables
+    if (numLiterals == 0) {
+        initializeLiterals(val);
+    } else {
+        prevDelta = val - literals[numLiterals - 1];
+        literals[numLiterals++] = val;
+        variableRunLength++;
+
+        if (variableRunLength == MAX_LITERAL_SIZE) {
+            determineEncoding(option);
+            writeValues(option);
+        }
+    }
+}
+
+void RleEncoderV2::computeZigZagLiterals(EncodingOption &option) {
+    int64_t zzEncVal = 0;
+    for (size_t i = 0; i < numLiterals; i++) {
+        if (isSigned) {
+            zzEncVal = zigZag(literals[i]);
+        } else {
+            zzEncVal = literals[i];
+        }
+        zigzagLiterals[option.zigzagLiteralsCount++] = zzEncVal;
+    }
+}
+
+void RleEncoderV2::preparePatchedBlob(EncodingOption& option) {
+    // mask will be max value beyond which patch will be generated
+    int64_t mask = static_cast<int64_t>(static_cast<uint64_t>(1) << 
option.brBits95p) - 1;
+
+    // since we are considering only 95 percentile, the size of gap and
+    // patch array can contain only be 5% values
+    option.patchLength = static_cast<uint32_t>(std::ceil((numLiterals / 20)));
+
+    // #bit for patch
+    option.patchWidth = option.brBits100p - option.brBits95p;
+    option.patchWidth = getClosestFixedBits(option.patchWidth);
+
+    // if patch bit requirement is 64 then it will not possible to pack
+    // gap and patch together in a long. To make sure gap and patch can be
+    // packed together adjust the patch width
+    if (option.patchWidth == 64) {
+        option.patchWidth = 56;
+        option.brBits95p = 8;
+        mask = static_cast<int64_t>(static_cast<uint64_t>(1) << 
option.brBits95p) - 1;
+    }
+
+    uint32_t gapIdx = 0;
+    uint32_t patchIdx = 0;
+    size_t prev = 0;
+    size_t maxGap = 0;
+
+    std::vector<int64_t> gapList;
+    std::vector<int64_t> patchList;
+
+    for(size_t i = 0; i < numLiterals; i++) {
+        // if value is above mask then create the patch and record the gap
+        if (baseRedLiterals[i] > mask) {
+            size_t gap = i - prev;
+            if (gap > maxGap) {
+                maxGap = gap;
+            }
+
+            // gaps are relative, so store the previous patched value index
+            prev = i;
+            gapList.push_back(static_cast<int64_t>(gap));
+            gapIdx++;
+
+            // extract the most significant bits that are over mask bits
+            int64_t patch = baseRedLiterals[i] >> option.brBits95p;
+            patchList.push_back(patch);
+            patchIdx++;
+
+            // strip off the MSB to enable safe bit packing
+            baseRedLiterals[i] &= mask;
+        }
+    }
+
+    // adjust the patch length to number of entries in gap list
+    option.patchLength = gapIdx;
+
+    // if the element to be patched is the first and only element then
+    // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+    if (maxGap == 0 && option.patchLength != 0) {
+        option.patchGapWidth = 1;
+    } else {
+        option.patchGapWidth = 
findClosestNumBits(static_cast<int64_t>(maxGap));
+    }
+
+    // special case: if the patch gap width is greater than 256, then
+    // we need 9 bits to encode the gap width. But we only have 3 bits in
+    // header to record the gap width. To deal with this case, we will save
+    // two entries in patch list in the following way
+    // 256 gap width => 0 for patch value
+    // actual gap - 256 => actual patch value
+    // We will do the same for gap width = 511. If the element to be patched is
+    // the last element in the scope then gap width will be 511. In this case 
we
+    // will have 3 entries in the patch list in the following way
+    // 255 gap width => 0 for patch value
+    // 255 gap width => 0 for patch value
+    // 1 gap width => actual patch value
+    if (option.patchGapWidth > 8) {
+        option.patchGapWidth = 8;
+        // for gap = 511, we need two additional entries in patch list
+        if (maxGap == 511) {
+            option.patchLength += 2;
+        } else {
+            option.patchLength += 1;
+        }
+    }
+
+    // create gap vs patch list
+    gapIdx = 0;
+    patchIdx = 0;
+    for(size_t i = 0; i < option.patchLength; i++) {
+        int64_t g = gapList[gapIdx++];
+        int64_t p = patchList[patchIdx++];
+        while (g > 255) {
+            gapVsPatchList[option.gapVsPatchListCount++] = (255L << 
option.patchWidth);
+            i++;
+            g -= 255;
+        }
+
+        // store patch value in LSBs and gap in MSBs
+        gapVsPatchList[option.gapVsPatchListCount++] = ((g << 
option.patchWidth) | p);
+    }
+}
+
+void RleEncoderV2::determineEncoding(EncodingOption& option) {
+    // we need to compute zigzag values for DIRECT encoding if we decide to
+    // break early for delta overflows or for shorter runs
+    computeZigZagLiterals(option);
+
+    option.zzBits100p = percentileBits(zigzagLiterals, 0, numLiterals, 1.0);
+
+    // not a big win for shorter runs to determine encoding
+    if (numLiterals <= MIN_REPEAT) {
+        option.encoding = DIRECT;
+        return;
+    }
+
+    // DELTA encoding check
+
+    // for identifying monotonic sequences
+    bool isIncreasing = true;
+    bool isDecreasing = true;
+    option.isFixedDelta = true;
+
+    option.min = literals[0];
+    int64_t max = literals[0];
+    int64_t initialDelta = literals[1] - literals[0];
+    int64_t currDelta = 0;
+    int64_t deltaMax = 0;
+    adjDeltas[option.adjDeltasCount++] = initialDelta;
+
+    for (size_t i = 1; i < numLiterals; i++) {
+        const int64_t l1 = literals[i];
+        const int64_t l0 = literals[i - 1];
+        currDelta = l1 - l0;
+        option.min = std::min(option.min, l1);
+        max = std::max(max, l1);
+
+        isIncreasing &= (l0 <= l1);
+        isDecreasing &= (l0 >= l1);
+
+        option.isFixedDelta &= (currDelta == initialDelta);
+        if (i > 1) {
+            adjDeltas[option.adjDeltasCount++] = std::abs(currDelta);
+            deltaMax = std::max(deltaMax, adjDeltas[i - 1]);
+        }
+    }
+
+    // it's faster to exit under delta overflow condition without checking for
+    // PATCHED_BASE condition as encoding using DIRECT is faster and has less
+    // overhead than PATCHED_BASE
+    if (!isSafeSubtract(max, option.min)) {
+        option.encoding = DIRECT;
+        return;
+    }
+
+    // invariant - subtracting any number from any other in the literals after
+    // option point won't overflow
+
+    // if min is equal to max then the delta is 0, option condition happens for
+    // fixed values run >10 which cannot be encoded with SHORT_REPEAT
+    if (option.min == max) {
+        if (!option.isFixedDelta) {
+            throw InvalidArgument(std::to_string(option.min) + "==" + 
std::to_string(max) + ", isFixedDelta cannot be false");
+        }
+
+        if(currDelta != 0) {
+            throw InvalidArgument(std::to_string(option.min) + "==" + 
std::to_string(max) + ", currDelta should be zero");
+        }
+        option.fixedDelta = 0;
+        option.encoding = DELTA;
+        return;
+    }
+
+    if (option.isFixedDelta) {
+        if (currDelta != initialDelta) {
+            throw InvalidArgument("currDelta should be equal to initialDelta 
for fixed delta encoding");
+        }
+
+        option.encoding = DELTA;
+        option.fixedDelta = currDelta;
+        return;
+    }
+
+    // if initialDelta is 0 then we cannot delta encode as we cannot identify
+    // the sign of deltas (increasing or decreasing)
+    if (initialDelta != 0) {
+        // stores the number of bits required for packing delta blob in
+        // delta encoding
+        option.bitsDeltaMax = findClosestNumBits(deltaMax);
+
+        // monotonic condition
+        if (isIncreasing || isDecreasing) {
+            option.encoding = DELTA;
+            return;
+        }
+    }
+
+    // PATCHED_BASE encoding check
+
+    // percentile values are computed for the zigzag encoded values. if the
+    // number of bit requirement between 90th and 100th percentile varies
+    // beyond a threshold then we need to patch the values. if the variation
+    // is not significant then we can use direct encoding
+
+    option.zzBits90p = percentileBits(zigzagLiterals, 0, numLiterals, 0.9, 
true);
+    uint32_t diffBitsLH = option.zzBits100p - option.zzBits90p;
+
+    // if the difference between 90th percentile and 100th percentile fixed
+    // bits is > 1 then we need patch the values
+    if (diffBitsLH > 1) {
+
+        // patching is done only on base reduced values.
+        // remove base from literals
+        for (size_t i = 0; i < numLiterals; i++) {
+            baseRedLiterals[option.baseRedLiteralsCount++] = (literals[i] - 
option.min);
+        }
+
+        // 95th percentile width is used to determine max allowed value
+        // after which patching will be done
+        option.brBits95p = percentileBits(baseRedLiterals, 0, numLiterals, 
0.95);
+
+        // 100th percentile is used to compute the max patch width
+        option.brBits100p = percentileBits(baseRedLiterals, 0, numLiterals, 
1.0, true);
+
+        // after base reducing the values, if the difference in bits between
+        // 95th percentile and 100th percentile value is zero then there
+        // is no point in patching the values, in which case we will
+        // fallback to DIRECT encoding.
+        // The decision to use patched base was based on zigzag values, but the
+        // actual patching is done on base reduced literals.
+        if ((option.brBits100p - option.brBits95p) != 0) {
+            option.encoding = PATCHED_BASE;
+            preparePatchedBlob(option);
+            return;
+        } else {
+            option.encoding = DIRECT;
+            return;
+        }
+    } else {
+        // if difference in bits between 95th percentile and 100th percentile 
is
+        // 0, then patch length will become 0. Hence we will fallback to direct
+        option.encoding = DIRECT;
+        return;
+    }
+}
+
+uint64_t RleEncoderV2::flush() {
+    if (numLiterals != 0) {
+        EncodingOption option = {};
+        if (variableRunLength != 0) {
+            determineEncoding(option);
+            writeValues(option);
+        } else if (fixedRunLength != 0) {
+            if (fixedRunLength < MIN_REPEAT) {
+                variableRunLength = fixedRunLength;
+                fixedRunLength = 0;
+                determineEncoding(option);
+                writeValues(option);
+            } else if (fixedRunLength >= MIN_REPEAT
+                       && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+                option.encoding = SHORT_REPEAT;
+                writeValues(option);
+            } else {
+                option.encoding = DELTA;
+                option.isFixedDelta = true;
+                writeValues(option);
+            }
+        }
+    }
+
+    outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
+    uint64_t dataSize = outputStream->flush();
+    bufferLength = bufferPosition = 0;
+    return dataSize;
+}
+
+void RleEncoderV2::writeValues(EncodingOption& option) {
+    if (numLiterals != 0) {
+        switch (option.encoding) {
+            case SHORT_REPEAT:
+                writeShortRepeatValues(option);
+                break;
+            case DIRECT:
+                writeDirectValues(option);
+                break;
+            case PATCHED_BASE:
+                writePatchedBasedValues(option);
+                break;
+            case DELTA:
+                writeDeltaValues(option);
+                break;
+            default:
+                throw NotImplementedYet("Not implemented yet");
+        }
+
+        numLiterals = 0;
+        prevDelta = 0;
+    }
+}
+
+void RleEncoderV2::writeShortRepeatValues(EncodingOption&) {
+    int64_t repeatVal;
+    if (isSigned) {
+        repeatVal = zigZag(literals[0]);
+    } else {
+        repeatVal = literals[0];
+    }
+
+    const uint32_t numBitsRepeatVal = findClosestNumBits(repeatVal);
+    const uint32_t numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? 
(numBitsRepeatVal >> 3) : ((numBitsRepeatVal >> 3) + 1);
+
+    uint32_t header = getOpCode(SHORT_REPEAT);
+
+    fixedRunLength -= MIN_REPEAT;
+    header |= fixedRunLength;
+    header |= ((numBytesRepeatVal - 1) << 3);
+
+    writeByte(static_cast<char>(header));
+
+    for(int32_t i = static_cast<int32_t>(numBytesRepeatVal - 1); i >= 0; i--) {
+        int64_t b = ((repeatVal >> (i * 8)) & 0xff);
+        writeByte(static_cast<char>(b));
+    }
+
+    fixedRunLength = 0;
+}
+
+void RleEncoderV2::writeDirectValues(EncodingOption& option) {
+    // write the number of fixed bits required in next 5 bits
+    uint32_t fb = option.zzBits100p;
+    if (alignedBitPacking) {
+        fb = getClosestAlignedFixedBits(fb);
+    }
+
+    const uint32_t efb = encodeBitWidth(fb) << 1;
+
+    // adjust variable run length
+    variableRunLength -= 1;
+
+    // extract the 9th bit of run length
+    const uint32_t tailBits = (variableRunLength & 0x100) >> 8;
+
+    // create first byte of the header
+    const char headerFirstByte = static_cast<char>(getOpCode(DIRECT) | efb | 
tailBits);
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    const char headerSecondByte = static_cast<char>(variableRunLength & 0xff);
+
+    // write header
+    writeByte(headerFirstByte);
+    writeByte(headerSecondByte);
+
+    // bit packing the zigzag encoded literals
+    writeInts(zigzagLiterals, 0, numLiterals, fb);
+
+    // reset run length
+    variableRunLength = 0;
+}
+
+void RleEncoderV2::writePatchedBasedValues(EncodingOption& option) {
+    // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding
+    // because patch is applied to MSB bits. For example: If fixed bit width of
+    // base value is 7 bits and if patch is 3 bits, the actual value is
+    // constructed by shifting the patch to left by 7 positions.
+    // actual_value = patch << 7 | base_value
+    // So, if we align base_value then actual_value can not be reconstructed.
+
+    // write the number of fixed bits required in next 5 bits
+    const uint32_t efb = encodeBitWidth(option.brBits95p) << 1;
+
+    // adjust variable run length, they are one off
+    variableRunLength -= 1;
+
+    // extract the 9th bit of run length
+    const uint32_t tailBits = (variableRunLength & 0x100) >> 8;
+
+    // create first byte of the header
+    const char headerFirstByte = static_cast<char>(getOpCode(PATCHED_BASE) | 
efb | tailBits);
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    const char headerSecondByte = static_cast<char>(variableRunLength & 0xff);
+
+    // if the min value is negative toggle the sign
+    const bool isNegative = (option.min < 0);
+    if (isNegative) {
+        option.min = -option.min;
+    }
+
+    // find the number of bytes required for base and shift it by 5 bits
+    // to accommodate patch width. The additional bit is used to store the sign
+    // of the base value.
+    const uint32_t baseWidth = findClosestNumBits(option.min) + 1;
+    const uint32_t baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth 
/ 8) + 1;
+    const uint32_t bb = (baseBytes - 1) << 5;
+
+    // if the base value is negative then set MSB to 1
+    if (isNegative) {
+        option.min |= (1LL << ((baseBytes * 8) - 1));
+    }
+
+    // third byte contains 3 bits for number of bytes occupied by base
+    // and 5 bits for patchWidth
+    const char headerThirdByte = static_cast<char>(bb | 
encodeBitWidth(option.patchWidth));
+
+    // fourth byte contains 3 bits for page gap width and 5 bits for
+    // patch length
+    const char headerFourthByte = static_cast<char>((option.patchGapWidth - 1) 
<< 5 | option.patchLength);
+
+    // write header
+    writeByte(headerFirstByte);
+    writeByte(headerSecondByte);
+    writeByte(headerThirdByte);
+    writeByte(headerFourthByte);
+
+    // write the base value using fixed bytes in big endian order
+    for(int32_t i = static_cast<int32_t>(baseBytes - 1); i >= 0; i--) {
+        char b = static_cast<char>(((option.min >> (i * 8)) & 0xff));
+        writeByte(b);
+    }
+
+    // base reduced literals are bit packed
+    uint32_t closestFixedBits = getClosestFixedBits(option.brBits95p);
+
+    writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits);
+
+    // write patch list
+    closestFixedBits = getClosestFixedBits(option.patchGapWidth + 
option.patchWidth);
+
+    writeInts(gapVsPatchList, 0, option.patchLength, closestFixedBits);
+
+    // reset run length
+    variableRunLength = 0;
+}
+
+void RleEncoderV2::writeDeltaValues(EncodingOption& option) {
+    uint32_t len = 0;
+    uint32_t fb = option.bitsDeltaMax;
+    uint32_t efb = 0;
+
+    if (alignedBitPacking) {
+        fb = getClosestAlignedFixedBits(fb);
+    }
+
+    if (option.isFixedDelta) {
+        // if fixed run length is greater than threshold then it will be fixed
+        // delta sequence with delta value 0 else fixed delta sequence with
+        // non-zero delta value
+        if (fixedRunLength > MIN_REPEAT) {
+            // ex. sequence: 2 2 2 2 2 2 2 2
+            len = fixedRunLength - 1;
+            fixedRunLength = 0;
+        } else {
+            // ex. sequence: 4 6 8 10 12 14 16
+            len = variableRunLength - 1;
+            variableRunLength = 0;
+        }
+    } else {
+        // fixed width 0 is used for long repeating values.
+        // sequences that require only 1 bit to encode will have an additional 
bit
+        if (fb == 1) {
+            fb = 2;
+        }
+        efb = encodeBitWidth(fb) << 1;
+        len = variableRunLength - 1;
+        variableRunLength = 0;
+    }
+
+    // extract the 9th bit of run length
+    const uint32_t tailBits = (len & 0x100) >> 8;
+
+    // create first byte of the header
+    const char headerFirstByte = static_cast<char>(getOpCode(DELTA) | efb | 
tailBits);
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    const char headerSecondByte = static_cast<char>(len & 0xff);
+
+    // write header
+    writeByte(headerFirstByte);
+    writeByte(headerSecondByte);
+
+    // store the first value from zigzag literal array
+    if (isSigned) {
+        writeVslong(literals[0]);
+    } else {
+        writeVulong(literals[0]);
+    }
+
+    if (option.isFixedDelta) {
+        // if delta is fixed then we don't need to store delta blob
+        writeVslong(option.fixedDelta);
+    } else {
+        // store the first value as delta value using zigzag encoding
+        writeVslong(adjDeltas[0]);
+
+        // adjacent delta values are bit packed. The length of adjDeltas array 
is
+        // always one less than the number of literals (delta difference for n
+        // elements is n-1). We have already written one element, write the
+        // remaining numLiterals - 2 elements here
+        writeInts(adjDeltas, 1, numLiterals - 2, fb);
+    }
+}
+
+void RleEncoderV2::writeInts(int64_t* input, uint32_t offset, size_t len, 
uint32_t bitSize) {
+  if(input == nullptr || len < 1 || bitSize < 1) {
+      return;
+  }
+
+  if (getClosestAlignedFixedBits(bitSize) == bitSize) {
+    uint32_t numBytes;
+    uint32_t endOffSet = static_cast<uint32_t>(offset + len);
+    if (bitSize < 8 ) {;
+      char bitMask = static_cast<char>((1 << bitSize) - 1);
+      uint32_t numHops = 8 / bitSize;
+      uint32_t remainder = static_cast<uint32_t>(len % numHops);
+      uint32_t endUnroll = endOffSet - remainder;
+      for (uint32_t i = offset; i < endUnroll; i+=numHops) {
+        char toWrite = 0;
+        for (uint32_t j = 0; j < numHops; ++j) {
+          toWrite |= static_cast<char>((input[i+j] & bitMask) << (8 - (j + 1) 
* bitSize));
+        }
+        writeByte(toWrite);
+      }
+
+      if (remainder > 0) {
+        uint32_t startShift = 8 - bitSize;
+        char toWrite = 0;
+        for (uint32_t i = endUnroll; i < endOffSet; ++i) {
+          toWrite |= static_cast<char>((input[i] & bitMask) << startShift);
+          startShift -= bitSize;
+        }
+        writeByte(toWrite);
+      }
+
+    } else {
+      numBytes = bitSize / 8;
+
+      for (uint32_t i = offset; i < endOffSet; ++i) {
+        for (uint32_t j = 0; j < numBytes; ++j) {
+          char toWrite = static_cast<char>((input[i] >> (8 * (numBytes - j - 
1))) & 255);
+          writeByte(toWrite);
+        }
+      }
+    }
+
+    return;
+  }
+
+  // write for unaligned bit size
+  uint32_t bitsLeft = 8;
+  char current = 0;
+  for(uint32_t i = offset; i < (offset + len); i++) {
+    int64_t value = input[i];
+    uint32_t bitsToWrite = bitSize;
+    while (bitsToWrite > bitsLeft) {
+      // add the bits to the bottom of the current word
+      current |= static_cast<char>(value >> (bitsToWrite - bitsLeft));
+      // subtract out the bits we just added
+      bitsToWrite -= bitsLeft;
+      // zero out the bits above bitsToWrite
+      value &= (static_cast<uint64_t>(1) << bitsToWrite) - 1;
+      writeByte(current);
+      current = 0;
+      bitsLeft = 8;
+    }
+    bitsLeft -= bitsToWrite;
+    current |= static_cast<char>(value << bitsLeft);
+    if (bitsLeft == 0) {
+      writeByte(current);
+      current = 0;
+      bitsLeft = 8;
+    }
+  }
+
+  // flush
+  if (bitsLeft != 8) {
+    writeByte(current);
+  }
+}
+
+void RleEncoderV2::initializeLiterals(int64_t val) {
+    literals[numLiterals++] = val;
+    fixedRunLength = 1;
+    variableRunLength = 1;
+}
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/src/Writer.cc
----------------------------------------------------------------------
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 16c2af4..3a56360 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -40,7 +40,7 @@ namespace orc {
     bool enableIndex;
 
     WriterOptionsPrivate() :
-                            fileVersion(FileVersion::v_0_11()) { // default to 
Hive_0_11
+                            fileVersion(FileVersion::v_0_12()) { // default to 
Hive_0_11
       stripeSize = 64 * 1024 * 1024; // 64M
       compressionBlockSize = 64 * 1024; // 64K
       rowIndexStride = 10000;
@@ -83,6 +83,14 @@ namespace orc {
   WriterOptions::~WriterOptions() {
     // PASS
   }
+  RleVersion WriterOptions::getRleVersion() const {
+    if(privateBits->fileVersion == FileVersion::v_0_11())
+    {
+      return RleVersion_1;
+    }
+
+    return RleVersion_2;
+  }
 
   WriterOptions& WriterOptions::setStripeSize(uint64_t size) {
     privateBits->stripeSize = size;
@@ -122,8 +130,8 @@ namespace orc {
   }
 
   WriterOptions& WriterOptions::setFileVersion(const FileVersion& version) {
-    // Only Hive_0_11 version is supported currently
-    if (version.getMajor() == 0 && version.getMinor() == 11) {
+    // Only Hive_0_11 and Hive_0_12 version are supported currently
+    if (version.getMajor() == 0 && (version.getMinor() == 11 || 
version.getMinor() == 12)) {
       privateBits->fileVersion = version;
       return *this;
     }
@@ -153,6 +161,10 @@ namespace orc {
     return privateBits->compressionStrategy;
   }
 
+  bool WriterOptions::getAlignedBitpacking() const {
+    return privateBits->compressionStrategy == CompressionStrategy 
::CompressionStrategy_SPEED;
+  }
+
   WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) {
     privateBits->paddingTolerance = tolerance;
     return *this;

http://git-wip-us.apache.org/repos/asf/orc/blob/f31c80bd/c++/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index 0a04aaf..015814a 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -37,8 +37,8 @@ add_executable (orc-test
   TestDriver.cc
   TestInt128.cc
   TestReader.cc
-  TestRle.cc
-  TestRLEv1Encoder.cc
+  TestRleDecoder.cc
+  TestRleEncoder.cc
   TestStripeIndexStatistics.cc
   TestTimestampStatistics.cc
   TestTimezone.cc

Reply via email to