This is an automated email from the ASF dual-hosted git repository.

fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a39f7b2  ARROW-5974: [C++] Support reading concatenated compressed 
streams
a39f7b2 is described below

commit a39f7b27e2c76a323f7d05b7f9aa6331add86be6
Author: Antoine Pitrou <anto...@python.org>
AuthorDate: Thu Aug 1 10:04:42 2019 -0400

    ARROW-5974: [C++] Support reading concatenated compressed streams
    
    Multiple concatenated compressed streams are read back as a single 
decompressed stream.
    
    Closes #4923 from pitrou/ARROW-5974-concatenated-compressed-streams and 
squashes the following commits:
    
    582380301 <Antoine Pitrou> Refactor CompressedInputStream implementation to 
make control flow clearer
    c5d42b070 <Antoine Pitrou> Use unique_ptr<T>::reset()
    8913fda00 <Antoine Pitrou> Workaround for old lz4 versions
    dea8b99da <Antoine Pitrou> Use dedicated reset functions where available
    918502a35 <Antoine Pitrou> ARROW-5974:  Support reading concatenated 
compressed streams
    
    Authored-by: Antoine Pitrou <anto...@python.org>
    Signed-off-by: François Saint-Jacques <fsaintjacq...@gmail.com>
---
 cpp/src/arrow/io/compressed-test.cc      |  24 ++++++
 cpp/src/arrow/io/compressed.cc           | 127 +++++++++++++++++++------------
 cpp/src/arrow/util/compression-test.cc   |  39 ++++++++--
 cpp/src/arrow/util/compression.h         |   3 +
 cpp/src/arrow/util/compression_brotli.cc |   7 ++
 cpp/src/arrow/util/compression_bz2.cc    |   8 ++
 cpp/src/arrow/util/compression_lz4.cc    |  15 ++++
 cpp/src/arrow/util/compression_zlib.cc   |  23 ++++--
 cpp/src/arrow/util/compression_zstd.cc   |   2 +
 python/pyarrow/tests/test_csv.py         |  25 ++++--
 python/pyarrow/tests/test_io.py          |  17 +++++
 11 files changed, 225 insertions(+), 65 deletions(-)

diff --git a/cpp/src/arrow/io/compressed-test.cc 
b/cpp/src/arrow/io/compressed-test.cc
index fb4dcac..4bf9d08 100644
--- a/cpp/src/arrow/io/compressed-test.cc
+++ b/cpp/src/arrow/io/compressed-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
+#include <iterator>
 #include <memory>
 #include <random>
 #include <string>
@@ -190,6 +192,28 @@ TEST_P(CompressedInputStreamTest, InvalidData) {
   ASSERT_RAISES(IOError, stream->Read(1024, &out_buf));
 }
 
+TEST_P(CompressedInputStreamTest, ConcatenatedStreams) {
+  // ARROW-5974: just like the "gunzip", "bzip2" and "xz" commands,
+  // decompressing concatenated compressed streams should yield the entire
+  // original data.
+  auto codec = MakeCodec();
+  auto data1 = MakeCompressibleData(100);
+  auto data2 = MakeCompressibleData(200);
+  auto compressed1 = CompressDataOneShot(codec.get(), data1);
+  auto compressed2 = CompressDataOneShot(codec.get(), data2);
+
+  std::shared_ptr<Buffer> concatenated;
+  ASSERT_OK(ConcatenateBuffers({compressed1, compressed2}, 
default_memory_pool(),
+                               &concatenated));
+  std::vector<uint8_t> decompressed, expected;
+  ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, 
&decompressed));
+  std::copy(data1.begin(), data1.end(), std::back_inserter(expected));
+  std::copy(data2.begin(), data2.end(), std::back_inserter(expected));
+
+  ASSERT_EQ(decompressed.size(), expected.size());
+  ASSERT_EQ(decompressed, expected);
+}
+
 // NOTE: Snappy doesn't support streaming decompression
 
 // NOTE: BZ2 doesn't support one-shot compression
diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc
index 43de132..10e7fd7 100644
--- a/cpp/src/arrow/io/compressed.cc
+++ b/cpp/src/arrow/io/compressed.cc
@@ -43,13 +43,13 @@ namespace io {
 
 class CompressedOutputStream::Impl {
  public:
-  Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<OutputStream>& 
raw)
-      : pool_(pool), raw_(raw), codec_(codec), is_open_(true), 
compressed_pos_(0) {}
+  Impl(MemoryPool* pool, const std::shared_ptr<OutputStream>& raw)
+      : pool_(pool), raw_(raw), is_open_(true), compressed_pos_(0) {}
 
   ~Impl() { ARROW_CHECK_OK(Close()); }
 
-  Status Init() {
-    RETURN_NOT_OK(codec_->MakeCompressor(&compressor_));
+  Status Init(Codec* codec) {
+    RETURN_NOT_OK(codec->MakeCompressor(&compressor_));
     RETURN_NOT_OK(AllocateResizableBuffer(pool_, kChunkSize, &compressed_));
     compressed_pos_ = 0;
     return Status::OK();
@@ -180,7 +180,6 @@ class CompressedOutputStream::Impl {
 
   MemoryPool* pool_;
   std::shared_ptr<OutputStream> raw_;
-  Codec* codec_;
   bool is_open_;
   std::shared_ptr<Compressor> compressor_;
   std::shared_ptr<ResizableBuffer> compressed_;
@@ -198,9 +197,10 @@ Status CompressedOutputStream::Make(util::Codec* codec,
 Status CompressedOutputStream::Make(MemoryPool* pool, util::Codec* codec,
                                     const std::shared_ptr<OutputStream>& raw,
                                     std::shared_ptr<CompressedOutputStream>* 
out) {
+  // CAUTION: codec is not owned
   std::shared_ptr<CompressedOutputStream> res(new CompressedOutputStream);
-  res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw)));
-  RETURN_NOT_OK(res->impl_->Init());
+  res->impl_.reset(new Impl(pool, std::move(raw)));
+  RETURN_NOT_OK(res->impl_->Init(codec));
   *out = res;
   return Status::OK();
 }
@@ -228,16 +228,16 @@ std::shared_ptr<OutputStream> 
CompressedOutputStream::raw() const { return impl_
 
 class CompressedInputStream::Impl {
  public:
-  Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<InputStream>& raw)
+  Impl(MemoryPool* pool, const std::shared_ptr<InputStream>& raw)
       : pool_(pool),
         raw_(raw),
-        codec_(codec),
         is_open_(true),
         compressed_pos_(0),
         decompressed_pos_(0) {}
 
-  Status Init() {
-    RETURN_NOT_OK(codec_->MakeDecompressor(&decompressor_));
+  Status Init(Codec* codec) {
+    RETURN_NOT_OK(codec->MakeDecompressor(&decompressor_));
+    fresh_decompressor_ = true;
     return Status::OK();
   }
 
@@ -273,6 +273,8 @@ class CompressedInputStream::Impl {
     return Status::OK();
   }
 
+  // Decompress some data from the compressed_ buffer.
+  // Call this function only if the decompressed_ buffer is empty.
   Status DecompressData() {
     int64_t decompress_size = kDecompressSize;
 
@@ -291,6 +293,9 @@ class CompressedInputStream::Impl {
                                               &bytes_read, &bytes_written,
                                               &need_more_output));
       compressed_pos_ += bytes_read;
+      if (bytes_read > 0) {
+        fresh_decompressor_ = false;
+      }
       if (bytes_written > 0 || !need_more_output || input_len == 0) {
         RETURN_NOT_OK(decompressed_->Resize(bytes_written));
         break;
@@ -302,51 +307,73 @@ class CompressedInputStream::Impl {
     return Status::OK();
   }
 
-  Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
-    std::lock_guard<std::mutex> guard(lock_);
+  // Read a given number of bytes from the decompressed_ buffer.
+  int64_t ReadFromDecompressed(int64_t nbytes, uint8_t* out) {
+    int64_t readable = decompressed_ ? (decompressed_->size() - 
decompressed_pos_) : 0;
+    int64_t read_bytes = std::min(readable, nbytes);
 
-    *bytes_read = 0;
-    auto out_data = reinterpret_cast<uint8_t*>(out);
+    if (read_bytes > 0) {
+      memcpy(out, decompressed_->data() + decompressed_pos_, read_bytes);
+      decompressed_pos_ += read_bytes;
 
-    while (nbytes > 0) {
-      int64_t avail = decompressed_ ? (decompressed_->size() - 
decompressed_pos_) : 0;
-      if (avail > 0) {
-        // Pending decompressed data is available, use it
-        avail = std::min(avail, nbytes);
-        memcpy(out_data, decompressed_->data() + decompressed_pos_, avail);
-        decompressed_pos_ += avail;
-        out_data += avail;
-        *bytes_read += avail;
-        nbytes -= avail;
-        if (decompressed_pos_ == decompressed_->size()) {
-          // Decompressed data is exhausted, release buffer
-          decompressed_.reset();
-        }
-        if (nbytes == 0) {
-          // We're done
-          break;
-        }
+      if (decompressed_pos_ == decompressed_->size()) {
+        // Decompressed data is exhausted, release buffer
+        decompressed_.reset();
       }
+    }
 
-      // At this point, no more decompressed data remains,
-      // so we need to decompress more
+    return read_bytes;
+  }
+
+  // Try to feed more data into the decompressed_ buffer.
+  Status RefillDecompressed(bool* has_data) {
+    // First try to read data from the decompressor
+    if (compressed_) {
       if (decompressor_->IsFinished()) {
-        break;
-      }
-      // First try to read data from the decompressor
-      if (compressed_) {
-        RETURN_NOT_OK(DecompressData());
+        // We just went over the end of a previous compressed stream.
+        RETURN_NOT_OK(decompressor_->Reset());
+        fresh_decompressor_ = true;
       }
-      if (!decompressed_ || decompressed_->size() == 0) {
-        // Got nothing, need to read more compressed data
-        RETURN_NOT_OK(EnsureCompressedData());
-        if (compressed_pos_ == compressed_->size()) {
-          // Compressed stream unexpectedly exhausted
+      RETURN_NOT_OK(DecompressData());
+    }
+    if (!decompressed_ || decompressed_->size() == 0) {
+      // Got nothing, need to read more compressed data
+      RETURN_NOT_OK(EnsureCompressedData());
+      if (compressed_pos_ == compressed_->size()) {
+        // No more data to decompress
+        if (!fresh_decompressor_) {
           return Status::IOError("Truncated compressed stream");
         }
-        RETURN_NOT_OK(DecompressData());
+        *has_data = false;
+        return Status::OK();
       }
+      RETURN_NOT_OK(DecompressData());
     }
+    *has_data = true;
+    return Status::OK();
+  }
+
+  Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
+    std::lock_guard<std::mutex> guard(lock_);
+
+    auto out_data = reinterpret_cast<uint8_t*>(out);
+
+    int64_t total_read = 0;
+    bool decompressor_has_data = true;
+
+    while (nbytes - total_read > 0 && decompressor_has_data) {
+      total_read += ReadFromDecompressed(nbytes - total_read, out_data + 
total_read);
+
+      if (nbytes == total_read) {
+        break;
+      }
+
+      // At this point, no more decompressed data remains, so we need to
+      // decompress more
+      RETURN_NOT_OK(RefillDecompressed(&decompressor_has_data));
+    }
+
+    *bytes_read = total_read;
     return Status::OK();
   }
 
@@ -370,13 +397,14 @@ class CompressedInputStream::Impl {
 
   MemoryPool* pool_;
   std::shared_ptr<InputStream> raw_;
-  Codec* codec_;
   bool is_open_;
   std::shared_ptr<Decompressor> decompressor_;
   std::shared_ptr<Buffer> compressed_;
   int64_t compressed_pos_;
   std::shared_ptr<ResizableBuffer> decompressed_;
   int64_t decompressed_pos_;
+  // True if the decompressor hasn't read any data yet.
+  bool fresh_decompressor_;
 
   mutable std::mutex lock_;
 };
@@ -389,9 +417,10 @@ Status CompressedInputStream::Make(Codec* codec, const 
std::shared_ptr<InputStre
 Status CompressedInputStream::Make(MemoryPool* pool, Codec* codec,
                                    const std::shared_ptr<InputStream>& raw,
                                    std::shared_ptr<CompressedInputStream>* 
out) {
+  // CAUTION: codec is not owned
   std::shared_ptr<CompressedInputStream> res(new CompressedInputStream);
-  res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw)));
-  RETURN_NOT_OK(res->impl_->Init());
+  res->impl_.reset(new Impl(pool, std::move(raw)));
+  RETURN_NOT_OK(res->impl_->Init(codec));
   *out = res;
   return Status::OK();
 }
diff --git a/cpp/src/arrow/util/compression-test.cc 
b/cpp/src/arrow/util/compression-test.cc
index 795e0cc..5f28f9c 100644
--- a/cpp/src/arrow/util/compression-test.cc
+++ b/cpp/src/arrow/util/compression-test.cc
@@ -233,12 +233,9 @@ void CheckStreamingDecompressor(Codec* codec, const 
std::vector<uint8_t>& data)
 
 // Check the streaming compressor and decompressor together
 
-void CheckStreamingRoundtrip(Codec* codec, const std::vector<uint8_t>& data) {
-  std::shared_ptr<Compressor> compressor;
-  std::shared_ptr<Decompressor> decompressor;
-  ASSERT_OK(codec->MakeCompressor(&compressor));
-  ASSERT_OK(codec->MakeDecompressor(&decompressor));
-
+void CheckStreamingRoundtrip(std::shared_ptr<Compressor> compressor,
+                             std::shared_ptr<Decompressor> decompressor,
+                             const std::vector<uint8_t>& data) {
   std::default_random_engine engine(42);
   std::uniform_int_distribution<int> buf_size_distribution(10, 40);
 
@@ -322,6 +319,15 @@ void CheckStreamingRoundtrip(Codec* codec, const 
std::vector<uint8_t>& data) {
   ASSERT_EQ(data, decompressed);
 }
 
+void CheckStreamingRoundtrip(Codec* codec, const std::vector<uint8_t>& data) {
+  std::shared_ptr<Compressor> compressor;
+  std::shared_ptr<Decompressor> decompressor;
+  ASSERT_OK(codec->MakeCompressor(&compressor));
+  ASSERT_OK(codec->MakeDecompressor(&decompressor));
+
+  CheckStreamingRoundtrip(compressor, decompressor, data);
+}
+
 class CodecTest : public ::testing::TestWithParam<Compression::type> {
  protected:
   Compression::type GetCompression() { return GetParam(); }
@@ -450,6 +456,27 @@ TEST_P(CodecTest, StreamingRoundtrip) {
   }
 }
 
+TEST_P(CodecTest, StreamingDecompressorReuse) {
+  if (GetCompression() == Compression::SNAPPY) {
+    // SKIP: snappy doesn't support streaming decompression
+    return;
+  }
+
+  auto codec = MakeCodec();
+  std::shared_ptr<Compressor> compressor;
+  std::shared_ptr<Decompressor> decompressor;
+  ASSERT_OK(codec->MakeCompressor(&compressor));
+  ASSERT_OK(codec->MakeDecompressor(&decompressor));
+
+  std::vector<uint8_t> data = MakeRandomData(100);
+  CheckStreamingRoundtrip(compressor, decompressor, data);
+  // Decompressor::Reset() should allow reusing decompressor for a new stream
+  ASSERT_OK(codec->MakeCompressor(&compressor));
+  ASSERT_OK(decompressor->Reset());
+  data = MakeRandomData(200);
+  CheckStreamingRoundtrip(compressor, decompressor, data);
+}
+
 INSTANTIATE_TEST_CASE_P(TestGZip, CodecTest, 
::testing::Values(Compression::GZIP));
 
 INSTANTIATE_TEST_CASE_P(TestSnappy, CodecTest, 
::testing::Values(Compression::SNAPPY));
diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h
index 43174f4..8665374 100644
--- a/cpp/src/arrow/util/compression.h
+++ b/cpp/src/arrow/util/compression.h
@@ -87,6 +87,9 @@ class ARROW_EXPORT Decompressor {
   /// simply be that the underlying library isn't able to provide the 
information.
   virtual bool IsFinished() = 0;
 
+  /// \brief Reinitialize decompressor, making it ready for a new compressed 
stream.
+  virtual Status Reset() = 0;
+
   // XXX add methods for buffer size heuristics?
 };
 
diff --git a/cpp/src/arrow/util/compression_brotli.cc 
b/cpp/src/arrow/util/compression_brotli.cc
index 051b8c0..5f47db0 100644
--- a/cpp/src/arrow/util/compression_brotli.cc
+++ b/cpp/src/arrow/util/compression_brotli.cc
@@ -56,6 +56,13 @@ class BrotliDecompressor : public Decompressor {
     return Status::OK();
   }
 
+  Status Reset() override {
+    if (state_ != nullptr) {
+      BrotliDecoderDestroyInstance(state_);
+    }
+    return Init();
+  }
+
   Status Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
                     uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written,
                     bool* need_more_output) override {
diff --git a/cpp/src/arrow/util/compression_bz2.cc 
b/cpp/src/arrow/util/compression_bz2.cc
index a26bb3e..63f0308 100644
--- a/cpp/src/arrow/util/compression_bz2.cc
+++ b/cpp/src/arrow/util/compression_bz2.cc
@@ -108,6 +108,14 @@ class BZ2Decompressor : public Decompressor {
     return Status::OK();
   }
 
+  Status Reset() override {
+    if (initialized_) {
+      ARROW_UNUSED(BZ2_bzDecompressEnd(&stream_));
+      initialized_ = false;
+    }
+    return Init();
+  }
+
   Status Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
                     uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written,
                     bool* need_more_output) override {
diff --git a/cpp/src/arrow/util/compression_lz4.cc 
b/cpp/src/arrow/util/compression_lz4.cc
index 1efd4c6..356932d 100644
--- a/cpp/src/arrow/util/compression_lz4.cc
+++ b/cpp/src/arrow/util/compression_lz4.cc
@@ -63,6 +63,21 @@ class LZ4Decompressor : public Decompressor {
     }
   }
 
+  Status Reset() override {
+#if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800
+    // LZ4F_resetDecompressionContext appeared in 1.8.0
+    DCHECK_NE(ctx_, nullptr);
+    LZ4F_resetDecompressionContext(ctx_);
+    finished_ = false;
+    return Status::OK();
+#else
+    if (ctx_ != nullptr) {
+      ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_));
+    }
+    return Init();
+#endif
+  }
+
   Status Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
                     uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written,
                     bool* need_more_output) override {
diff --git a/cpp/src/arrow/util/compression_zlib.cc 
b/cpp/src/arrow/util/compression_zlib.cc
index cd8b912..a44a890 100644
--- a/cpp/src/arrow/util/compression_zlib.cc
+++ b/cpp/src/arrow/util/compression_zlib.cc
@@ -84,7 +84,8 @@ static Status ZlibErrorPrefix(const char* prefix_msg, const 
char* msg) {
 
 class GZipDecompressor : public Decompressor {
  public:
-  GZipDecompressor() : initialized_(false), finished_(false) {}
+  explicit GZipDecompressor(GZipCodec::Format format)
+      : format_(format), initialized_(false), finished_(false) {}
 
   ~GZipDecompressor() override {
     if (initialized_) {
@@ -92,13 +93,13 @@ class GZipDecompressor : public Decompressor {
     }
   }
 
-  Status Init(GZipCodec::Format format) {
+  Status Init() {
     DCHECK(!initialized_);
     memset(&stream_, 0, sizeof(stream_));
     finished_ = false;
 
     int ret;
-    int window_bits = DecompressionWindowBitsForFormat(format);
+    int window_bits = DecompressionWindowBitsForFormat(format_);
     if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
       return ZlibError("zlib inflateInit failed: ");
     } else {
@@ -107,6 +108,17 @@ class GZipDecompressor : public Decompressor {
     }
   }
 
+  Status Reset() override {
+    DCHECK(initialized_);
+    finished_ = false;
+    int ret;
+    if ((ret = inflateReset(&stream_)) != Z_OK) {
+      return ZlibError("zlib inflateReset failed: ");
+    } else {
+      return Status::OK();
+    }
+  }
+
   Status Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
                     uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written,
                     bool* need_more_output) override {
@@ -149,6 +161,7 @@ class GZipDecompressor : public Decompressor {
   }
 
   z_stream stream_;
+  GZipCodec::Format format_;
   bool initialized_;
   bool finished_;
 };
@@ -318,8 +331,8 @@ class GZipCodec::GZipCodecImpl {
   }
 
   Status MakeDecompressor(std::shared_ptr<Decompressor>* out) {
-    auto ptr = std::make_shared<GZipDecompressor>();
-    RETURN_NOT_OK(ptr->Init(format_));
+    auto ptr = std::make_shared<GZipDecompressor>(format_);
+    RETURN_NOT_OK(ptr->Init());
     *out = ptr;
     return Status::OK();
   }
diff --git a/cpp/src/arrow/util/compression_zstd.cc 
b/cpp/src/arrow/util/compression_zstd.cc
index 24a7329..87faf00 100644
--- a/cpp/src/arrow/util/compression_zstd.cc
+++ b/cpp/src/arrow/util/compression_zstd.cc
@@ -82,6 +82,8 @@ class ZSTDDecompressor : public Decompressor {
     return Status::OK();
   }
 
+  Status Reset() override { return Init(); }
+
   bool IsFinished() override { return finished_; }
 
  protected:
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index 0515365..213fbc5 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -584,15 +584,17 @@ class BaseTestCompressedCSVRead:
     def tearDown(self):
         shutil.rmtree(self.tmpdir)
 
+    def read_csv(self, csv_path):
+        try:
+            return read_csv(csv_path)
+        except pa.ArrowNotImplementedError as e:
+            pytest.skip(str(e))
+
     def test_random_csv(self):
         csv, expected = make_random_csv(num_cols=2, num_rows=100)
         csv_path = os.path.join(self.tmpdir, self.csv_filename)
         self.write_file(csv_path, csv)
-        try:
-            table = read_csv(csv_path)
-        except pa.ArrowNotImplementedError as e:
-            pytest.skip(str(e))
-            return
+        table = self.read_csv(csv_path)
         table._validate()
         assert table.schema == expected.schema
         assert table.equals(expected)
@@ -606,6 +608,19 @@ class TestGZipCSVRead(BaseTestCompressedCSVRead, 
unittest.TestCase):
         with gzip.open(path, 'wb', 3) as f:
             f.write(contents)
 
+    def test_concatenated(self):
+        # ARROW-5974
+        csv_path = os.path.join(self.tmpdir, self.csv_filename)
+        with gzip.open(csv_path, 'wb', 3) as f:
+            f.write(b"ab,cd\nef,gh\n")
+        with gzip.open(csv_path, 'ab', 3) as f:
+            f.write(b"ij,kl\nmn,op\n")
+        table = self.read_csv(csv_path)
+        assert table.to_pydict() == {
+            'ab': ['ef', 'ij', 'mn'],
+            'cd': ['gh', 'kl', 'op'],
+            }
+
 
 class TestBZ2CSVRead(BaseTestCompressedCSVRead, unittest.TestCase):
     csv_filename = "compressed.csv.bz2"
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 5eb4e13..11131cd 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -1017,6 +1017,23 @@ def test_compressed_input_bz2(tmpdir):
         pytest.skip(str(e))
 
 
+def check_compressed_concatenated(data, fn, compression):
+    raw = pa.OSFile(fn, mode="rb")
+    with pa.CompressedInputStream(raw, compression) as compressed:
+        got = compressed.read()
+        assert got == data
+
+
+def test_compressed_concatenated_gzip(tmpdir):
+    data = b"some test data\n" * 10 + b"eof\n"
+    fn = str(tmpdir / "compressed_input_test2.gz")
+    with gzip.open(fn, "wb") as f:
+        f.write(data[:50])
+    with gzip.open(fn, "ab") as f:
+        f.write(data[50:])
+    check_compressed_concatenated(data, fn, "gzip")
+
+
 def test_compressed_input_invalid():
     data = b"foo" * 10
     raw = pa.BufferReader(data)

Reply via email to