This is an automated email from the ASF dual-hosted git repository. fsaintjacques pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new a39f7b2 ARROW-5974: [C++] Support reading concatenated compressed streams a39f7b2 is described below commit a39f7b27e2c76a323f7d05b7f9aa6331add86be6 Author: Antoine Pitrou <anto...@python.org> AuthorDate: Thu Aug 1 10:04:42 2019 -0400 ARROW-5974: [C++] Support reading concatenated compressed streams Multiple concatenated compressed streams are read back as a single decompressed stream. Closes #4923 from pitrou/ARROW-5974-concatenated-compressed-streams and squashes the following commits: 582380301 <Antoine Pitrou> Refactor CompressedInputStream implementation to make control flow clearer c5d42b070 <Antoine Pitrou> Use unique_ptr<T>::reset() 8913fda00 <Antoine Pitrou> Workaround for old lz4 versions dea8b99da <Antoine Pitrou> Use dedicated reset functions where available 918502a35 <Antoine Pitrou> ARROW-5974: Support reading concatenated compressed streams Authored-by: Antoine Pitrou <anto...@python.org> Signed-off-by: François Saint-Jacques <fsaintjacq...@gmail.com> --- cpp/src/arrow/io/compressed-test.cc | 24 ++++++ cpp/src/arrow/io/compressed.cc | 127 +++++++++++++++++++------------ cpp/src/arrow/util/compression-test.cc | 39 ++++++++-- cpp/src/arrow/util/compression.h | 3 + cpp/src/arrow/util/compression_brotli.cc | 7 ++ cpp/src/arrow/util/compression_bz2.cc | 8 ++ cpp/src/arrow/util/compression_lz4.cc | 15 ++++ cpp/src/arrow/util/compression_zlib.cc | 23 ++++-- cpp/src/arrow/util/compression_zstd.cc | 2 + python/pyarrow/tests/test_csv.py | 25 ++++-- python/pyarrow/tests/test_io.py | 17 +++++ 11 files changed, 225 insertions(+), 65 deletions(-) diff --git a/cpp/src/arrow/io/compressed-test.cc b/cpp/src/arrow/io/compressed-test.cc index fb4dcac..4bf9d08 100644 --- a/cpp/src/arrow/io/compressed-test.cc +++ b/cpp/src/arrow/io/compressed-test.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include <algorithm> +#include <iterator> #include <memory> #include <random> #include <string> @@ -190,6 +192,28 @@ TEST_P(CompressedInputStreamTest, InvalidData) { ASSERT_RAISES(IOError, stream->Read(1024, &out_buf)); } +TEST_P(CompressedInputStreamTest, ConcatenatedStreams) { + // ARROW-5974: just like the "gunzip", "bzip2" and "xz" commands, + // decompressing concatenated compressed streams should yield the entire + // original data. + auto codec = MakeCodec(); + auto data1 = MakeCompressibleData(100); + auto data2 = MakeCompressibleData(200); + auto compressed1 = CompressDataOneShot(codec.get(), data1); + auto compressed2 = CompressDataOneShot(codec.get(), data2); + + std::shared_ptr<Buffer> concatenated; + ASSERT_OK(ConcatenateBuffers({compressed1, compressed2}, default_memory_pool(), + &concatenated)); + std::vector<uint8_t> decompressed, expected; + ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, &decompressed)); + std::copy(data1.begin(), data1.end(), std::back_inserter(expected)); + std::copy(data2.begin(), data2.end(), std::back_inserter(expected)); + + ASSERT_EQ(decompressed.size(), expected.size()); + ASSERT_EQ(decompressed, expected); +} + // NOTE: Snappy doesn't support streaming decompression // NOTE: BZ2 doesn't support one-shot compression diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 43de132..10e7fd7 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -43,13 +43,13 @@ namespace io { class CompressedOutputStream::Impl { public: - Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<OutputStream>& raw) - : pool_(pool), raw_(raw), codec_(codec), is_open_(true), compressed_pos_(0) {} + Impl(MemoryPool* pool, const std::shared_ptr<OutputStream>& raw) + : pool_(pool), raw_(raw), is_open_(true), compressed_pos_(0) {} ~Impl() { ARROW_CHECK_OK(Close()); } - Status Init() { - RETURN_NOT_OK(codec_->MakeCompressor(&compressor_)); + Status Init(Codec* codec) { + RETURN_NOT_OK(codec->MakeCompressor(&compressor_)); RETURN_NOT_OK(AllocateResizableBuffer(pool_, kChunkSize, &compressed_)); compressed_pos_ = 0; return Status::OK(); @@ -180,7 +180,6 @@ class CompressedOutputStream::Impl { MemoryPool* pool_; std::shared_ptr<OutputStream> raw_; - Codec* codec_; bool is_open_; std::shared_ptr<Compressor> compressor_; std::shared_ptr<ResizableBuffer> compressed_; @@ -198,9 +197,10 @@ Status CompressedOutputStream::Make(util::Codec* codec, Status CompressedOutputStream::Make(MemoryPool* pool, util::Codec* codec, const std::shared_ptr<OutputStream>& raw, std::shared_ptr<CompressedOutputStream>* out) { + // CAUTION: codec is not owned std::shared_ptr<CompressedOutputStream> res(new CompressedOutputStream); - res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw))); - RETURN_NOT_OK(res->impl_->Init()); + res->impl_.reset(new Impl(pool, std::move(raw))); + RETURN_NOT_OK(res->impl_->Init(codec)); *out = res; return Status::OK(); } @@ -228,16 +228,16 @@ std::shared_ptr<OutputStream> CompressedOutputStream::raw() const { return impl_ class CompressedInputStream::Impl { public: - Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<InputStream>& raw) + Impl(MemoryPool* pool, const std::shared_ptr<InputStream>& raw) : pool_(pool), raw_(raw), - codec_(codec), is_open_(true), compressed_pos_(0), decompressed_pos_(0) {} - Status Init() { - RETURN_NOT_OK(codec_->MakeDecompressor(&decompressor_)); + Status Init(Codec* codec) { + RETURN_NOT_OK(codec->MakeDecompressor(&decompressor_)); + fresh_decompressor_ = true; return Status::OK(); } @@ -273,6 +273,8 @@ class CompressedInputStream::Impl { return Status::OK(); } + // Decompress some data from the compressed_ buffer. + // Call this function only if the decompressed_ buffer is empty. Status DecompressData() { int64_t decompress_size = kDecompressSize; @@ -291,6 +293,9 @@ class CompressedInputStream::Impl { &bytes_read, &bytes_written, &need_more_output)); compressed_pos_ += bytes_read; + if (bytes_read > 0) { + fresh_decompressor_ = false; + } if (bytes_written > 0 || !need_more_output || input_len == 0) { RETURN_NOT_OK(decompressed_->Resize(bytes_written)); break; @@ -302,51 +307,73 @@ class CompressedInputStream::Impl { return Status::OK(); } - Status Read(int64_t nbytes, int64_t* bytes_read, void* out) { - std::lock_guard<std::mutex> guard(lock_); + // Read a given number of bytes from the decompressed_ buffer. + int64_t ReadFromDecompressed(int64_t nbytes, uint8_t* out) { + int64_t readable = decompressed_ ? (decompressed_->size() - decompressed_pos_) : 0; + int64_t read_bytes = std::min(readable, nbytes); - *bytes_read = 0; - auto out_data = reinterpret_cast<uint8_t*>(out); + if (read_bytes > 0) { + memcpy(out, decompressed_->data() + decompressed_pos_, read_bytes); + decompressed_pos_ += read_bytes; - while (nbytes > 0) { - int64_t avail = decompressed_ ? (decompressed_->size() - decompressed_pos_) : 0; - if (avail > 0) { - // Pending decompressed data is available, use it - avail = std::min(avail, nbytes); - memcpy(out_data, decompressed_->data() + decompressed_pos_, avail); - decompressed_pos_ += avail; - out_data += avail; - *bytes_read += avail; - nbytes -= avail; - if (decompressed_pos_ == decompressed_->size()) { - // Decompressed data is exhausted, release buffer - decompressed_.reset(); - } - if (nbytes == 0) { - // We're done - break; - } + if (decompressed_pos_ == decompressed_->size()) { + // Decompressed data is exhausted, release buffer + decompressed_.reset(); } + } - // At this point, no more decompressed data remains, - // so we need to decompress more + return read_bytes; + } + + // Try to feed more data into the decompressed_ buffer. + Status RefillDecompressed(bool* has_data) { + // First try to read data from the decompressor + if (compressed_) { if (decompressor_->IsFinished()) { - break; - } - // First try to read data from the decompressor - if (compressed_) { - RETURN_NOT_OK(DecompressData()); + // We just went over the end of a previous compressed stream. + RETURN_NOT_OK(decompressor_->Reset()); + fresh_decompressor_ = true; } - if (!decompressed_ || decompressed_->size() == 0) { - // Got nothing, need to read more compressed data - RETURN_NOT_OK(EnsureCompressedData()); - if (compressed_pos_ == compressed_->size()) { - // Compressed stream unexpectedly exhausted + RETURN_NOT_OK(DecompressData()); + } + if (!decompressed_ || decompressed_->size() == 0) { + // Got nothing, need to read more compressed data + RETURN_NOT_OK(EnsureCompressedData()); + if (compressed_pos_ == compressed_->size()) { + // No more data to decompress + if (!fresh_decompressor_) { return Status::IOError("Truncated compressed stream"); } - RETURN_NOT_OK(DecompressData()); + *has_data = false; + return Status::OK(); } + RETURN_NOT_OK(DecompressData()); } + *has_data = true; + return Status::OK(); + } + + Status Read(int64_t nbytes, int64_t* bytes_read, void* out) { + std::lock_guard<std::mutex> guard(lock_); + + auto out_data = reinterpret_cast<uint8_t*>(out); + + int64_t total_read = 0; + bool decompressor_has_data = true; + + while (nbytes - total_read > 0 && decompressor_has_data) { + total_read += ReadFromDecompressed(nbytes - total_read, out_data + total_read); + + if (nbytes == total_read) { + break; + } + + // At this point, no more decompressed data remains, so we need to + // decompress more + RETURN_NOT_OK(RefillDecompressed(&decompressor_has_data)); + } + + *bytes_read = total_read; return Status::OK(); } @@ -370,13 +397,14 @@ class CompressedInputStream::Impl { MemoryPool* pool_; std::shared_ptr<InputStream> raw_; - Codec* codec_; bool is_open_; std::shared_ptr<Decompressor> decompressor_; std::shared_ptr<Buffer> compressed_; int64_t compressed_pos_; std::shared_ptr<ResizableBuffer> decompressed_; int64_t decompressed_pos_; + // True if the decompressor hasn't read any data yet. + bool fresh_decompressor_; mutable std::mutex lock_; }; @@ -389,9 +417,10 @@ Status CompressedInputStream::Make(Codec* codec, const std::shared_ptr<InputStre Status CompressedInputStream::Make(MemoryPool* pool, Codec* codec, const std::shared_ptr<InputStream>& raw, std::shared_ptr<CompressedInputStream>* out) { + // CAUTION: codec is not owned std::shared_ptr<CompressedInputStream> res(new CompressedInputStream); - res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw))); - RETURN_NOT_OK(res->impl_->Init()); + res->impl_.reset(new Impl(pool, std::move(raw))); + RETURN_NOT_OK(res->impl_->Init(codec)); *out = res; return Status::OK(); } diff --git a/cpp/src/arrow/util/compression-test.cc b/cpp/src/arrow/util/compression-test.cc index 795e0cc..5f28f9c 100644 --- a/cpp/src/arrow/util/compression-test.cc +++ b/cpp/src/arrow/util/compression-test.cc @@ -233,12 +233,9 @@ void CheckStreamingDecompressor(Codec* codec, const std::vector<uint8_t>& data) // Check the streaming compressor and decompressor together -void CheckStreamingRoundtrip(Codec* codec, const std::vector<uint8_t>& data) { - std::shared_ptr<Compressor> compressor; - std::shared_ptr<Decompressor> decompressor; - ASSERT_OK(codec->MakeCompressor(&compressor)); - ASSERT_OK(codec->MakeDecompressor(&decompressor)); - +void CheckStreamingRoundtrip(std::shared_ptr<Compressor> compressor, + std::shared_ptr<Decompressor> decompressor, + const std::vector<uint8_t>& data) { std::default_random_engine engine(42); std::uniform_int_distribution<int> buf_size_distribution(10, 40); @@ -322,6 +319,15 @@ void CheckStreamingRoundtrip(Codec* codec, const std::vector<uint8_t>& data) { ASSERT_EQ(data, decompressed); } +void CheckStreamingRoundtrip(Codec* codec, const std::vector<uint8_t>& data) { + std::shared_ptr<Compressor> compressor; + std::shared_ptr<Decompressor> decompressor; + ASSERT_OK(codec->MakeCompressor(&compressor)); + ASSERT_OK(codec->MakeDecompressor(&decompressor)); + + CheckStreamingRoundtrip(compressor, decompressor, data); +} + class CodecTest : public ::testing::TestWithParam<Compression::type> { protected: Compression::type GetCompression() { return GetParam(); } @@ -450,6 +456,27 @@ TEST_P(CodecTest, StreamingRoundtrip) { } } +TEST_P(CodecTest, StreamingDecompressorReuse) { + if (GetCompression() == Compression::SNAPPY) { + // SKIP: snappy doesn't support streaming decompression + return; + } + + auto codec = MakeCodec(); + std::shared_ptr<Compressor> compressor; + std::shared_ptr<Decompressor> decompressor; + ASSERT_OK(codec->MakeCompressor(&compressor)); + ASSERT_OK(codec->MakeDecompressor(&decompressor)); + + std::vector<uint8_t> data = MakeRandomData(100); + CheckStreamingRoundtrip(compressor, decompressor, data); + // Decompressor::Reset() should allow reusing decompressor for a new stream + ASSERT_OK(codec->MakeCompressor(&compressor)); + ASSERT_OK(decompressor->Reset()); + data = MakeRandomData(200); + CheckStreamingRoundtrip(compressor, decompressor, data); +} + INSTANTIATE_TEST_CASE_P(TestGZip, CodecTest, ::testing::Values(Compression::GZIP)); INSTANTIATE_TEST_CASE_P(TestSnappy, CodecTest, ::testing::Values(Compression::SNAPPY)); diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index 43174f4..8665374 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -87,6 +87,9 @@ class ARROW_EXPORT Decompressor { /// simply be that the underlying library isn't able to provide the information. virtual bool IsFinished() = 0; + /// \brief Reinitialize decompressor, making it ready for a new compressed stream. + virtual Status Reset() = 0; + // XXX add methods for buffer size heuristics? }; diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index 051b8c0..5f47db0 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -56,6 +56,13 @@ class BrotliDecompressor : public Decompressor { return Status::OK(); } + Status Reset() override { + if (state_ != nullptr) { + BrotliDecoderDestroyInstance(state_); + } + return Init(); + } + Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output, int64_t* bytes_read, int64_t* bytes_written, bool* need_more_output) override { diff --git a/cpp/src/arrow/util/compression_bz2.cc b/cpp/src/arrow/util/compression_bz2.cc index a26bb3e..63f0308 100644 --- a/cpp/src/arrow/util/compression_bz2.cc +++ b/cpp/src/arrow/util/compression_bz2.cc @@ -108,6 +108,14 @@ class BZ2Decompressor : public Decompressor { return Status::OK(); } + Status Reset() override { + if (initialized_) { + ARROW_UNUSED(BZ2_bzDecompressEnd(&stream_)); + initialized_ = false; + } + return Init(); + } + Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output, int64_t* bytes_read, int64_t* bytes_written, bool* need_more_output) override { diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index 1efd4c6..356932d 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -63,6 +63,21 @@ class LZ4Decompressor : public Decompressor { } } + Status Reset() override { +#if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800 + // LZ4F_resetDecompressionContext appeared in 1.8.0 + DCHECK_NE(ctx_, nullptr); + LZ4F_resetDecompressionContext(ctx_); + finished_ = false; + return Status::OK(); +#else + if (ctx_ != nullptr) { + ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_)); + } + return Init(); +#endif + } + Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output, int64_t* bytes_read, int64_t* bytes_written, bool* need_more_output) override { diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index cd8b912..a44a890 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -84,7 +84,8 @@ static Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) { class GZipDecompressor : public Decompressor { public: - GZipDecompressor() : initialized_(false), finished_(false) {} + explicit GZipDecompressor(GZipCodec::Format format) + : format_(format), initialized_(false), finished_(false) {} ~GZipDecompressor() override { if (initialized_) { @@ -92,13 +93,13 @@ class GZipDecompressor : public Decompressor { } } - Status Init(GZipCodec::Format format) { + Status Init() { DCHECK(!initialized_); memset(&stream_, 0, sizeof(stream_)); finished_ = false; int ret; - int window_bits = DecompressionWindowBitsForFormat(format); + int window_bits = DecompressionWindowBitsForFormat(format_); if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { return ZlibError("zlib inflateInit failed: "); } else { @@ -107,6 +108,17 @@ class GZipDecompressor : public Decompressor { } } + Status Reset() override { + DCHECK(initialized_); + finished_ = false; + int ret; + if ((ret = inflateReset(&stream_)) != Z_OK) { + return ZlibError("zlib inflateReset failed: "); + } else { + return Status::OK(); + } + } + Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output, int64_t* bytes_read, int64_t* bytes_written, bool* need_more_output) override { @@ -149,6 +161,7 @@ class GZipDecompressor : public Decompressor { } z_stream stream_; + GZipCodec::Format format_; bool initialized_; bool finished_; }; @@ -318,8 +331,8 @@ class GZipCodec::GZipCodecImpl { } Status MakeDecompressor(std::shared_ptr<Decompressor>* out) { - auto ptr = std::make_shared<GZipDecompressor>(); - RETURN_NOT_OK(ptr->Init(format_)); + auto ptr = std::make_shared<GZipDecompressor>(format_); + RETURN_NOT_OK(ptr->Init()); *out = ptr; return Status::OK(); } diff --git a/cpp/src/arrow/util/compression_zstd.cc b/cpp/src/arrow/util/compression_zstd.cc index 24a7329..87faf00 100644 --- a/cpp/src/arrow/util/compression_zstd.cc +++ b/cpp/src/arrow/util/compression_zstd.cc @@ -82,6 +82,8 @@ class ZSTDDecompressor : public Decompressor { return Status::OK(); } + Status Reset() override { return Init(); } + bool IsFinished() override { return finished_; } protected: diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 0515365..213fbc5 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -584,15 +584,17 @@ class BaseTestCompressedCSVRead: def tearDown(self): shutil.rmtree(self.tmpdir) + def read_csv(self, csv_path): + try: + return read_csv(csv_path) + except pa.ArrowNotImplementedError as e: + pytest.skip(str(e)) + def test_random_csv(self): csv, expected = make_random_csv(num_cols=2, num_rows=100) csv_path = os.path.join(self.tmpdir, self.csv_filename) self.write_file(csv_path, csv) - try: - table = read_csv(csv_path) - except pa.ArrowNotImplementedError as e: - pytest.skip(str(e)) - return + table = self.read_csv(csv_path) table._validate() assert table.schema == expected.schema assert table.equals(expected) @@ -606,6 +608,19 @@ class TestGZipCSVRead(BaseTestCompressedCSVRead, unittest.TestCase): with gzip.open(path, 'wb', 3) as f: f.write(contents) + def test_concatenated(self): + # ARROW-5974 + csv_path = os.path.join(self.tmpdir, self.csv_filename) + with gzip.open(csv_path, 'wb', 3) as f: + f.write(b"ab,cd\nef,gh\n") + with gzip.open(csv_path, 'ab', 3) as f: + f.write(b"ij,kl\nmn,op\n") + table = self.read_csv(csv_path) + assert table.to_pydict() == { + 'ab': ['ef', 'ij', 'mn'], + 'cd': ['gh', 'kl', 'op'], + } + class TestBZ2CSVRead(BaseTestCompressedCSVRead, unittest.TestCase): csv_filename = "compressed.csv.bz2" diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index 5eb4e13..11131cd 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -1017,6 +1017,23 @@ def test_compressed_input_bz2(tmpdir): pytest.skip(str(e)) +def check_compressed_concatenated(data, fn, compression): + raw = pa.OSFile(fn, mode="rb") + with pa.CompressedInputStream(raw, compression) as compressed: + got = compressed.read() + assert got == data + + +def test_compressed_concatenated_gzip(tmpdir): + data = b"some test data\n" * 10 + b"eof\n" + fn = str(tmpdir / "compressed_input_test2.gz") + with gzip.open(fn, "wb") as f: + f.write(data[:50]) + with gzip.open(fn, "ab") as f: + f.write(data[50:]) + check_compressed_concatenated(data, fn, "gzip") + + def test_compressed_input_invalid(): data = b"foo" * 10 raw = pa.BufferReader(data)