[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user asfgit closed the pull request at: https://github.com/apache/orc/pull/122 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117825065 --- Diff: c++/src/Compression.cc --- @@ -636,6 +884,33 @@ DIAGNOSTIC_POP return static_cast(result); } + std::unique_ptr + createCompressor( + CompressionKind kind, + OutputStream * outStream, + CompressionStrategy strategy, + uint64_t bufferCapacity, + uint64_t blockSize, + MemoryPool& pool) { +switch (static_cast(kind)) { +case CompressionKind_NONE: { + return std::unique_ptr +(new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize)); +} +case CompressionKind_ZLIB: { + int level = (strategy == CompressionStrategy_SPEED) ? -1 : 9; --- End diff -- You are right! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user xndai commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117822601 --- Diff: c++/src/Compression.cc --- @@ -636,6 +884,33 @@ DIAGNOSTIC_POP return static_cast(result); } + std::unique_ptr + createCompressor( + CompressionKind kind, + OutputStream * outStream, + CompressionStrategy strategy, + uint64_t bufferCapacity, + uint64_t blockSize, + MemoryPool& pool) { +switch (static_cast(kind)) { +case CompressionKind_NONE: { + return std::unique_ptr +(new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize)); +} +case CompressionKind_ZLIB: { + int level = (strategy == CompressionStrategy_SPEED) ? -1 : 9; --- End diff -- According to this - https://orc.apache.org/docs/hive-config.html, there are only two compression strategy defined: SPEED and COMPRESSION. I also checked Java implementation, SPEED maps to zlib level Z_BEST_SPEED + 1, and COMPRESSION maps to Z_DEFAULT_COMPRESSION. I will do the same for C++. Java implementation for your reference - WriterImpl.java ` CompressionCodec result = physicalWriter.getCompressionCodec(); if (result != null) { switch (kind) { case BLOOM_FILTER: case DATA: case DICTIONARY_DATA: case BLOOM_FILTER_UTF8: if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) { result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST, CompressionCodec.Modifier.TEXT)); } else { result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT, CompressionCodec.Modifier.TEXT)); } break; case LENGTH: case DICTIONARY_COUNT: case PRESENT: case ROW_INDEX: case SECONDARY: // easily compressed using the fastest modes result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST, CompressionCodec.Modifier.BINARY)); break; default: LOG.info("Missing ORC compression modifiers for " + kind); break; } } ` ZlibCodec.java ` public CompressionCodec modify(/* @Nullable */ EnumSet modifiers) { if (modifiers == null) { return this; } int l = this.level; int s = this.strategy; for (Modifier m : modifiers) { switch (m) { case BINARY: /* filtered == less LZ77, more huffman */ s = Deflater.FILTERED; break; case TEXT: s = Deflater.DEFAULT_STRATEGY; break; case FASTEST: // deflate_fast looking for 8 byte patterns l = Deflater.BEST_SPEED; break; case FAST: // deflate_fast looking for 16 byte patterns l = Deflater.BEST_SPEED + 1; break; case DEFAULT: // deflate_slow looking for 128 byte patterns l = Deflater.DEFAULT_COMPRESSION; break; default: break; } } return new ZlibCodec(l, s); } ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117631322 --- Diff: c++/src/Compression.cc --- @@ -636,6 +884,33 @@ DIAGNOSTIC_POP return static_cast(result); } + std::unique_ptr + createCompressor( + CompressionKind kind, + OutputStream * outStream, + CompressionStrategy strategy, + uint64_t bufferCapacity, + uint64_t blockSize, + MemoryPool& pool) { +switch (static_cast(kind)) { +case CompressionKind_NONE: { + return std::unique_ptr +(new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize)); +} +case CompressionKind_ZLIB: { + int level = (strategy == CompressionStrategy_SPEED) ? -1 : 9; --- End diff -- You can see the compression levels here http://www.zlib.net/manual.html ``` #define Z_NO_COMPRESSION 0 #define Z_BEST_SPEED 1 #define Z_BEST_COMPRESSION 9 #define Z_DEFAULT_COMPRESSION (-1) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user xndai commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117622782 --- Diff: c++/src/Compression.cc --- @@ -636,6 +884,33 @@ DIAGNOSTIC_POP return static_cast(result); } + std::unique_ptr + createCompressor( + CompressionKind kind, + OutputStream * outStream, + CompressionStrategy strategy, + uint64_t bufferCapacity, + uint64_t blockSize, + MemoryPool& pool) { +switch (static_cast(kind)) { +case CompressionKind_NONE: { + return std::unique_ptr +(new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize)); +} +case CompressionKind_ZLIB: { + int level = (strategy == CompressionStrategy_SPEED) ? -1 : 9; --- End diff -- what are the corresponding levels for these three settings? Are they documented anywhere? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117609935 --- Diff: c++/src/Compression.hh --- @@ -20,9 +20,15 @@ #define ORC_COMPRESSION_HH #include "io/InputStream.hh" +#include "io/OutputStream.hh" namespace orc { + enum CompressionStrategy { +CompressionStrategy_SPEED = 0, --- End diff -- Can you add a `CompressionStrategy_DEFAULT` here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117609952 --- Diff: c++/src/Compression.cc --- @@ -636,6 +884,33 @@ DIAGNOSTIC_POP return static_cast(result); } + std::unique_ptr + createCompressor( + CompressionKind kind, + OutputStream * outStream, + CompressionStrategy strategy, + uint64_t bufferCapacity, + uint64_t blockSize, + MemoryPool& pool) { +switch (static_cast(kind)) { +case CompressionKind_NONE: { + return std::unique_ptr +(new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize)); +} +case CompressionKind_ZLIB: { + int level = (strategy == CompressionStrategy_SPEED) ? -1 : 9; --- End diff -- Compare the three options, `CompressionStrategy_DEFAULT`, `CompressionStrategy_SPEED`, `CompressionStrategy_COMPRESSION` and set the level accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117388912 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); --- End diff -- yes! use std::runtime_error for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user xndai commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117381049 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); --- End diff -- For now, I will just use std::runtime_error in compression path. We can come back and fix that later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117380455 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); --- End diff -- Filed ORC-196 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user xndai commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117380313 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); --- End diff -- Or we keep RuntimeError and LogicError both defined in orc namespace to differentiate different types of errors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117379977 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); --- End diff -- Sounds good! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user xndai commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117379895 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); --- End diff -- We also throw std::logic_error in other places. If that's the goal, I'd suggest we use one single type of exception for all Orc code, e.g. OrcException. You can open a JIRA on this, and it can be done in a separate change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117379212 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); --- End diff -- ParseError is extended from std::runtime_error. Applications that integrate with the ORC library would want to distinguish exceptions thrown by the ORC library versus thrown by other libraries. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user xndai commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117378369 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); --- End diff -- how about just using std::runtime_error? I still don't see a need to create yet another exception class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user xndai commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117363862 --- Diff: c++/src/Compression.cc --- @@ -636,6 +884,33 @@ DIAGNOSTIC_POP return static_cast(result); } + std::unique_ptr + createCompressor( + CompressionKind kind, + OutputStream * outStream, + CompressionStrategy strategy, + uint64_t bufferCapacity, + uint64_t blockSize, + MemoryPool& pool) { +switch (static_cast(kind)) { +case CompressionKind_NONE: { + return std::unique_ptr +(new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize)); +} +case CompressionKind_ZLIB: { + int level = strategy == CompressionStrategy_SPEED ? 1 : 9; --- End diff -- Will do. Thx. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117354764 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); +} +BufferedOutputStream::BackUp(outputSize - outputPosition); +bufferSize = outputSize = outputPosition = 0; +return BufferedOutputStream::flush(); + } + + uint64_t CompressionStreamBase::getSize() const { +return BufferedOutputStream::getSize() - + static_cast(outputSize - outputPosition); + } + + /** + * Streaming compression base class + */ + class CompressionStream: public CompressionStreamBase { + public: +CompressionStream(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override; +virtual std::string getName() const override = 0; + + protected: +// return total compressed size +virtual uint64_t doStreamingCompression() = 0; + }; + + CompressionStream::CompressionStream(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : + CompressionStreamBase(outStream, + compressionLevel, + capacity, + blockSize, +
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117355243 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); +} +BufferedOutputStream::BackUp(outputSize - outputPosition); +bufferSize = outputSize = outputPosition = 0; +return BufferedOutputStream::flush(); + } + + uint64_t CompressionStreamBase::getSize() const { +return BufferedOutputStream::getSize() - + static_cast(outputSize - outputPosition); + } + + /** + * Streaming compression base class + */ + class CompressionStream: public CompressionStreamBase { + public: +CompressionStream(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override; +virtual std::string getName() const override = 0; + + protected: +// return total compressed size +virtual uint64_t doStreamingCompression() = 0; + }; + + CompressionStream::CompressionStream(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : + CompressionStreamBase(outStream, + compressionLevel, + capacity, + blockSize, +
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117354986 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); +} +BufferedOutputStream::BackUp(outputSize - outputPosition); +bufferSize = outputSize = outputPosition = 0; +return BufferedOutputStream::flush(); + } + + uint64_t CompressionStreamBase::getSize() const { +return BufferedOutputStream::getSize() - + static_cast(outputSize - outputPosition); + } + + /** + * Streaming compression base class + */ + class CompressionStream: public CompressionStreamBase { + public: +CompressionStream(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override; +virtual std::string getName() const override = 0; + + protected: +// return total compressed size +virtual uint64_t doStreamingCompression() = 0; + }; + + CompressionStream::CompressionStream(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : + CompressionStreamBase(outStream, + compressionLevel, + capacity, + blockSize, +
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117357368 --- Diff: c++/src/Compression.cc --- @@ -636,6 +884,33 @@ DIAGNOSTIC_POP return static_cast(result); } + std::unique_ptr + createCompressor( + CompressionKind kind, + OutputStream * outStream, + CompressionStrategy strategy, + uint64_t bufferCapacity, + uint64_t blockSize, + MemoryPool& pool) { +switch (static_cast(kind)) { +case CompressionKind_NONE: { + return std::unique_ptr +(new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize)); +} +case CompressionKind_ZLIB: { + int level = strategy == CompressionStrategy_SPEED ? 1 : 9; --- End diff -- Adding braces here improves readbility. `(strategy == CompressionStrategy_SPEED)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117360126 --- Diff: c++/src/Compression.cc --- @@ -636,6 +884,33 @@ DIAGNOSTIC_POP return static_cast(result); } + std::unique_ptr + createCompressor( + CompressionKind kind, + OutputStream * outStream, + CompressionStrategy strategy, + uint64_t bufferCapacity, + uint64_t blockSize, + MemoryPool& pool) { +switch (static_cast(kind)) { +case CompressionKind_NONE: { + return std::unique_ptr +(new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize)); +} +case CompressionKind_ZLIB: { + int level = strategy == CompressionStrategy_SPEED ? 1 : 9; --- End diff -- We should keep the default level to `Z_DEFAULT_COMPRESSION` (-1). Only if `strategy` is specified, we must set accordingly. I think the Java code does something similar. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/122#discussion_r117355066 --- Diff: c++/src/Compression.cc --- @@ -33,6 +33,254 @@ namespace orc { + class CompressionStreamBase: public BufferedOutputStream { + public: +CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool); + +virtual bool Next(void** data, int*size) override = 0; +virtual void BackUp(int count) override; + +virtual std::string getName() const override = 0; +virtual uint64_t flush() override; + +virtual bool isCompressed() const override { return true; } +virtual uint64_t getSize() const override; + + protected: +void writeHeader(char * buffer, size_t compressedSize, bool original) { + buffer[0] = static_cast((compressedSize << 1) + (original ? 1 : 0)); + buffer[1] = static_cast(compressedSize >> 7); + buffer[2] = static_cast(compressedSize >> 15); +} + +// Buffer to hold uncompressed data until user calls Next() +DataBuffer rawInputBuffer; + +// Compress level +int level; + +// Compressed data output buffer +char * outputBuffer; + +// Size for compressionBuffer +int bufferSize; + +// Compress output position +int outputPosition; + +// Compress output buffer size +int outputSize; + }; + + CompressionStreamBase::CompressionStreamBase(OutputStream * outStream, + int compressionLevel, + uint64_t capacity, + uint64_t blockSize, + MemoryPool& pool) : +BufferedOutputStream(pool, + outStream, + capacity, + blockSize), +rawInputBuffer(pool, blockSize), +level(compressionLevel), +outputBuffer(nullptr), +bufferSize(0), +outputPosition(0), +outputSize(0) { +// PASS + } + + void CompressionStreamBase::BackUp(int count) { +if (count > bufferSize) { + throw std::logic_error("Can't backup that much!"); +} +bufferSize -= count; + } + + uint64_t CompressionStreamBase::flush() { +void * data; +int size; +if (!Next(, )) { + throw std::logic_error("Failed to flush compression buffer."); --- End diff -- use `ParseError` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
GitHub user xndai opened a pull request: https://github.com/apache/orc/pull/122 ORC-192 Implement zlib compresion stream Implement zlib compressor based on the output stream. Add corresponding UTs. Also rename the existing test suite from TestCompression into TestDecompression. Change-Id: I6c594cc8f8a769078bbe561056adfd5d0389dbc1 You can merge this pull request into a Git repository by running: $ git pull https://github.com/xndai/orc dev_compression Alternatively you can review and apply these changes as the patch at: https://github.com/apache/orc/pull/122.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #122 commit d4fcfe4a14386041f3080dfad95036c7fe5f8e9a Author: Xiening.DaiDate: 2017-05-16T22:29:01Z ORC-192 Implement zlib compresion stream Implement zlib compressor based on the output stream. Add corresponding UTs. Also rename the existing test suite from TestCompression into TestDecompression. Change-Id: I6c594cc8f8a769078bbe561056adfd5d0389dbc1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---