This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 7fcad2c ARROW-3126: [Python] Make Buffered* IO classes available to Python, incorporate into input_stream, output_stream factory functions 7fcad2c is described below commit 7fcad2c29e3c3ac99b2f6c1f1fddc91c05b7f2b3 Author: Krisztián Szűcs <szucs.kriszt...@gmail.com> AuthorDate: Wed Jan 9 22:38:12 2019 -0600 ARROW-3126: [Python] Make Buffered* IO classes available to Python, incorporate into input_stream, output_stream factory functions We should add benchmarks too as a follow up PR. Author: Krisztián Szűcs <szucs.kriszt...@gmail.com> Author: Wes McKinney <wesm+...@apache.org> Closes #3252 from kszucs/ARROW-3126 and squashes the following commits: 50118a639 <Wes McKinney> Fix API in file-benchmark.cc d3917d9e5 <Wes McKinney> Code review comments, buffer_size=0 means unbuffered 88bed90ef <Krisztián Szűcs> lint 5842eae0e <Krisztián Szűcs> remove test runner script fd729abdb <Krisztián Szűcs> don't typehint _detect_compression 3d1e386ce <Krisztián Szűcs> tests 5e8b38551 <Krisztián Szűcs> fix failing test e458db5a6 <Krisztián Szűcs> python support for buffered input and output streams --- cpp/CMakeLists.txt | 2 +- cpp/src/arrow/io/api.h | 1 + cpp/src/arrow/io/buffered-test.cc | 5 +- cpp/src/arrow/io/buffered.cc | 30 +++--- cpp/src/arrow/io/buffered.h | 20 ++-- cpp/src/arrow/io/file-benchmark.cc | 9 +- python/pyarrow/includes/libarrow.pxd | 16 +++ python/pyarrow/io.pxi | 195 +++++++++++++++++------------------ python/pyarrow/tests/test_io.py | 86 +++++++++++++++ 9 files changed, 234 insertions(+), 130 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 0e4f395..08868af 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -80,7 +80,6 @@ if ("$ENV{CMAKE_EXPORT_COMPILE_COMMANDS}" STREQUAL "1" OR INFER_FOUND) # See http://clang.llvm.org/docs/JSONCompilationDatabase.html set(CMAKE_EXPORT_COMPILE_COMMANDS 1) endif() - # ---------------------------------------------------------------------- # cmake options @@ -358,6 +357,7 @@ endif() if (ARROW_USE_CCACHE) find_program(CCACHE_FOUND ccache) if(CCACHE_FOUND) + message(STATUS "Using ccache: ${CCACHE_FOUND}") set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ${CCACHE_FOUND}) set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ${CCACHE_FOUND}) endif(CCACHE_FOUND) diff --git a/cpp/src/arrow/io/api.h b/cpp/src/arrow/io/api.h index 0d5742a..cf1be33 100644 --- a/cpp/src/arrow/io/api.h +++ b/cpp/src/arrow/io/api.h @@ -18,6 +18,7 @@ #ifndef ARROW_IO_API_H #define ARROW_IO_API_H +#include "arrow/io/buffered.h" #include "arrow/io/compressed.h" #include "arrow/io/file.h" #include "arrow/io/hdfs.h" diff --git a/cpp/src/arrow/io/buffered-test.cc b/cpp/src/arrow/io/buffered-test.cc index 074833d..7b9ab0c 100644 --- a/cpp/src/arrow/io/buffered-test.cc +++ b/cpp/src/arrow/io/buffered-test.cc @@ -105,7 +105,8 @@ class TestBufferedOutputStream : public FileTestFixture<BufferedOutputStream> { lseek(fd_, 0, SEEK_END); #endif } - ASSERT_OK(BufferedOutputStream::Create(file, buffer_size, &buffered_)); + ASSERT_OK(BufferedOutputStream::Create(buffer_size, default_memory_pool(), file, + &buffered_)); } void WriteChunkwise(const std::string& datastr, const std::valarray<int64_t>& sizes) { @@ -321,7 +322,7 @@ class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> { std::shared_ptr<ReadableFile> file_in; ASSERT_OK(ReadableFile::Open(path_, &file_in)); raw_ = file_in; - ASSERT_OK(BufferedInputStream::Create(raw_, buffer_size, pool, &buffered_)); + ASSERT_OK(BufferedInputStream::Create(buffer_size, pool, raw_, &buffered_)); } protected: diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc index f3eae39..0b1431f 100644 --- a/cpp/src/arrow/io/buffered.cc +++ b/cpp/src/arrow/io/buffered.cc @@ -91,8 +91,8 @@ class BufferedBase { class BufferedOutputStream::Impl : public BufferedBase { public: - explicit Impl(std::shared_ptr<OutputStream> raw) - : BufferedBase(default_memory_pool()), raw_(std::move(raw)) {} + explicit Impl(std::shared_ptr<OutputStream> raw, MemoryPool* pool) + : BufferedBase(pool), raw_(std::move(raw)) {} Status Close() { std::lock_guard<std::mutex> guard(lock_); @@ -173,14 +173,16 @@ class BufferedOutputStream::Impl : public BufferedBase { std::shared_ptr<OutputStream> raw_; }; -BufferedOutputStream::BufferedOutputStream(std::shared_ptr<OutputStream> raw) - : impl_(new BufferedOutputStream::Impl(std::move(raw))) {} +BufferedOutputStream::BufferedOutputStream(std::shared_ptr<OutputStream> raw, + MemoryPool* pool) { + impl_.reset(new Impl(std::move(raw), pool)); +} -Status BufferedOutputStream::Create(std::shared_ptr<OutputStream> raw, - int64_t buffer_size, +Status BufferedOutputStream::Create(int64_t buffer_size, MemoryPool* pool, + std::shared_ptr<OutputStream> raw, std::shared_ptr<BufferedOutputStream>* out) { - auto result = - std::shared_ptr<BufferedOutputStream>(new BufferedOutputStream(std::move(raw))); + auto result = std::shared_ptr<BufferedOutputStream>( + new BufferedOutputStream(std::move(raw), pool)); RETURN_NOT_OK(result->SetBufferSize(buffer_size)); *out = std::move(result); return Status::OK(); @@ -217,12 +219,12 @@ std::shared_ptr<OutputStream> BufferedOutputStream::raw() const { return impl_-> // ---------------------------------------------------------------------- // BufferedInputStream implementation -class BufferedInputStream::BufferedInputStreamImpl : public BufferedBase { +class BufferedInputStream::Impl : public BufferedBase { public: - BufferedInputStreamImpl(std::shared_ptr<InputStream> raw, MemoryPool* pool) + Impl(std::shared_ptr<InputStream> raw, MemoryPool* pool) : BufferedBase(pool), raw_(std::move(raw)), bytes_buffered_(0) {} - ~BufferedInputStreamImpl() { DCHECK_OK(Close()); } + ~Impl() { DCHECK_OK(Close()); } Status Close() { std::lock_guard<std::mutex> guard(lock_); @@ -350,13 +352,13 @@ class BufferedInputStream::BufferedInputStreamImpl : public BufferedBase { BufferedInputStream::BufferedInputStream(std::shared_ptr<InputStream> raw, MemoryPool* pool) { - impl_.reset(new BufferedInputStreamImpl(std::move(raw), pool)); + impl_.reset(new Impl(std::move(raw), pool)); } BufferedInputStream::~BufferedInputStream() { DCHECK_OK(impl_->Close()); } -Status BufferedInputStream::Create(std::shared_ptr<InputStream> raw, int64_t buffer_size, - MemoryPool* pool, +Status BufferedInputStream::Create(int64_t buffer_size, MemoryPool* pool, + std::shared_ptr<InputStream> raw, std::shared_ptr<BufferedInputStream>* out) { auto result = std::shared_ptr<BufferedInputStream>(new BufferedInputStream(std::move(raw), pool)); diff --git a/cpp/src/arrow/io/buffered.h b/cpp/src/arrow/io/buffered.h index d507955..945915b 100644 --- a/cpp/src/arrow/io/buffered.h +++ b/cpp/src/arrow/io/buffered.h @@ -40,12 +40,13 @@ class ARROW_EXPORT BufferedOutputStream : public OutputStream { ~BufferedOutputStream() override; /// \brief Create a buffered output stream wrapping the given output stream. + /// \param[in] buffer_size the size of the temporary write buffer + /// \param[in] pool a MemoryPool to use for allocations /// \param[in] raw another OutputStream - /// \param[in] buffer_size the size of the temporary buffer. Allocates from - /// the default memory pool /// \param[out] out the created BufferedOutputStream /// \return Status - static Status Create(std::shared_ptr<OutputStream> raw, int64_t buffer_size, + static Status Create(int64_t buffer_size, MemoryPool* pool, + std::shared_ptr<OutputStream> raw, std::shared_ptr<BufferedOutputStream>* out); /// \brief Resize internal buffer @@ -79,7 +80,7 @@ class ARROW_EXPORT BufferedOutputStream : public OutputStream { std::shared_ptr<OutputStream> raw() const; private: - explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw); + explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw, MemoryPool* pool); class ARROW_NO_EXPORT Impl; std::unique_ptr<Impl> impl_; @@ -94,12 +95,13 @@ class ARROW_EXPORT BufferedInputStream : public InputStream { ~BufferedInputStream() override; /// \brief Create a BufferedInputStream from a raw InputStream - /// \param[in] raw a raw InputStream /// \param[in] buffer_size the size of the temporary read buffer /// \param[in] pool a MemoryPool to use for allocations + /// \param[in] raw a raw InputStream /// \param[out] out the created BufferedInputStream - static Status Create(std::shared_ptr<InputStream> raw, int64_t buffer_size, - MemoryPool* pool, std::shared_ptr<BufferedInputStream>* out); + static Status Create(int64_t buffer_size, MemoryPool* pool, + std::shared_ptr<InputStream> raw, + std::shared_ptr<BufferedInputStream>* out); /// \brief Resize internal read buffer; calls to Read(...) will read at least /// \param[in] new_buffer_size the new read buffer size @@ -138,8 +140,8 @@ class ARROW_EXPORT BufferedInputStream : public InputStream { private: explicit BufferedInputStream(std::shared_ptr<InputStream> raw, MemoryPool* pool); - class ARROW_NO_EXPORT BufferedInputStreamImpl; - std::unique_ptr<BufferedInputStreamImpl> impl_; + class ARROW_NO_EXPORT Impl; + std::unique_ptr<Impl> impl_; }; } // namespace io diff --git a/cpp/src/arrow/io/file-benchmark.cc b/cpp/src/arrow/io/file-benchmark.cc index c57fa6d..4439a18 100644 --- a/cpp/src/arrow/io/file-benchmark.cc +++ b/cpp/src/arrow/io/file-benchmark.cc @@ -163,7 +163,8 @@ static void BM_BufferedOutputStreamSmallWritesToNull( ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &file)); std::shared_ptr<io::BufferedOutputStream> buffered_file; - ABORT_NOT_OK(io::BufferedOutputStream::Create(file, kBufferSize, &buffered_file)); + ABORT_NOT_OK(io::BufferedOutputStream::Create(kBufferSize, default_memory_pool(), file, + &buffered_file)); BenchmarkStreamingWrites(state, small_sizes, buffered_file.get()); } @@ -196,7 +197,8 @@ static void BM_BufferedOutputStreamSmallWritesToPipe( SetupPipeWriter(&stream, &reader); std::shared_ptr<io::BufferedOutputStream> buffered_stream; - ABORT_NOT_OK(io::BufferedOutputStream::Create(stream, kBufferSize, &buffered_stream)); + ABORT_NOT_OK(io::BufferedOutputStream::Create(kBufferSize, default_memory_pool(), + stream, &buffered_stream)); BenchmarkStreamingWrites(state, small_sizes, buffered_stream.get(), reader.get()); } @@ -207,7 +209,8 @@ static void BM_BufferedOutputStreamLargeWritesToPipe( SetupPipeWriter(&stream, &reader); std::shared_ptr<io::BufferedOutputStream> buffered_stream; - ABORT_NOT_OK(io::BufferedOutputStream::Create(stream, kBufferSize, &buffered_stream)); + ABORT_NOT_OK(io::BufferedOutputStream::Create(kBufferSize, default_memory_pool(), + stream, &buffered_stream)); BenchmarkStreamingWrites(state, large_sizes, buffered_stream.get(), reader.get()); } diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index cc77ff4..97bc892 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -697,6 +697,22 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: CStatus Make(CCodec* codec, shared_ptr[OutputStream] raw, shared_ptr[CCompressedOutputStream]* out) + cdef cppclass CBufferedInputStream \ + " arrow::io::BufferedInputStream"(InputStream): + + @staticmethod + CStatus Create(int64_t buffer_size, CMemoryPool* pool, + shared_ptr[InputStream] raw, + shared_ptr[CBufferedInputStream]* out) + + cdef cppclass CBufferedOutputStream \ + " arrow::io::BufferedOutputStream"(OutputStream): + + @staticmethod + CStatus Create(int64_t buffer_size, CMemoryPool* pool, + shared_ptr[OutputStream] raw, + shared_ptr[CBufferedOutputStream]* out) + # ---------------------------------------------------------------------- # HDFS diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 97abde8..5212274 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1064,32 +1064,6 @@ cdef class BufferReader(NativeFile): self.is_readable = True -cdef shared_ptr[InputStream] _make_compressed_input_stream( - shared_ptr[InputStream] stream, - CompressionType compression_type) except *: - cdef: - shared_ptr[CCompressedInputStream] compressed_stream - unique_ptr[CCodec] codec - - check_status(CCodec.Create(compression_type, &codec)) - check_status(CCompressedInputStream.Make(codec.get(), stream, - &compressed_stream)) - return <shared_ptr[InputStream]> compressed_stream - - -cdef shared_ptr[OutputStream] _make_compressed_output_stream( - shared_ptr[OutputStream] stream, - CompressionType compression_type) except *: - cdef: - shared_ptr[CCompressedOutputStream] compressed_stream - unique_ptr[CCodec] codec - - check_status(CCodec.Create(compression_type, &codec)) - check_status(CCompressedOutputStream.Make(codec.get(), stream, - &compressed_stream)) - return <shared_ptr[OutputStream]> compressed_stream - - cdef class CompressedInputStream(NativeFile): """ An input stream wrapper which decompresses data on the fly. @@ -1104,26 +1078,19 @@ cdef class CompressedInputStream(NativeFile): def __init__(self, NativeFile stream, compression): cdef: CompressionType compression_type + unique_ptr[CCodec] codec + shared_ptr[CCompressedInputStream] compressed_stream compression_type = _get_compression_type(compression) if compression_type == CompressionType_UNCOMPRESSED: - raise ValueError("Invalid value for compression: %r" - % (compression,)) - self._init(stream, compression_type) + raise ValueError('Invalid value for compression: {!r}' + .format(compression)) - @staticmethod - cdef create(NativeFile stream, CompressionType compression_type): - cdef: - CompressedInputStream self - - self = CompressedInputStream.__new__(CompressedInputStream) - self._init(stream, compression_type) - return self + check_status(CCodec.Create(compression_type, &codec)) + check_status(CCompressedInputStream.Make( + codec.get(), stream.get_input_stream(), &compressed_stream)) - cdef _init(self, NativeFile stream, CompressionType compression_type): - self.set_input_stream( - _make_compressed_input_stream(stream.get_input_stream(), - compression_type)) + self.set_input_stream(<shared_ptr[InputStream]> compressed_stream) self.is_readable = True @@ -1138,29 +1105,55 @@ cdef class CompressedOutputStream(NativeFile): The compression type ("bz2", "brotli", "gzip", "lz4", "snappy" or "zstd") """ + def __init__(self, NativeFile stream, compression): cdef: CompressionType compression_type + unique_ptr[CCodec] codec + shared_ptr[CCompressedOutputStream] compressed_stream compression_type = _get_compression_type(compression) if compression_type == CompressionType_UNCOMPRESSED: - raise ValueError("Invalid value for compression: %r" - % (compression,)) - self._init(stream, compression_type) + raise ValueError('Invalid value for compression: {!r}' + .format(compression)) - @staticmethod - cdef create(NativeFile stream, CompressionType compression_type): - cdef: - CompressedOutputStream self + check_status(CCodec.Create(compression_type, &codec)) + check_status(CCompressedOutputStream.Make( + codec.get(), stream.get_output_stream(), &compressed_stream)) - self = CompressedOutputStream.__new__(CompressedOutputStream) - self._init(stream, compression_type) - return self + self.set_output_stream(<shared_ptr[OutputStream]> compressed_stream) + self.is_writable = True + + +cdef class BufferedInputStream(NativeFile): + + def __init__(self, NativeFile stream, int buffer_size, + MemoryPool memory_pool=None): + cdef shared_ptr[CBufferedInputStream] buffered_stream + + if buffer_size <= 0: + raise ValueError('Buffer size must be larger than zero') + check_status(CBufferedInputStream.Create( + buffer_size, maybe_unbox_memory_pool(memory_pool), + stream.get_input_stream(), &buffered_stream)) + + self.set_input_stream(<shared_ptr[InputStream]> buffered_stream) + self.is_readable = True + + +cdef class BufferedOutputStream(NativeFile): + + def __init__(self, NativeFile stream, int buffer_size, + MemoryPool memory_pool=None): + cdef shared_ptr[CBufferedOutputStream] buffered_stream + + if buffer_size <= 0: + raise ValueError('Buffer size must be larger than zero') + check_status(CBufferedOutputStream.Create( + buffer_size, maybe_unbox_memory_pool(memory_pool), + stream.get_output_stream(), &buffered_stream)) - cdef _init(self, NativeFile stream, CompressionType compression_type): - self.set_output_stream( - _make_compressed_output_stream(stream.get_output_stream(), - compression_type)) + self.set_output_stream(<shared_ptr[OutputStream]> buffered_stream) self.is_writable = True @@ -1232,24 +1225,27 @@ cdef get_input_stream(object source, c_bool use_memory_map, """ cdef: NativeFile nf + unique_ptr[CCodec] codec shared_ptr[InputStream] input_stream shared_ptr[CCompressedInputStream] compressed_stream - CompressionType compression_type = CompressionType_UNCOMPRESSED - unique_ptr[CCodec] codec + CompressionType compression_type try: source_path = _stringify_path(source) except TypeError: - pass + compression = None else: - compression_type = _get_compression_type_by_filename(source_path) + compression = _detect_compression(source_path) + compression_type = _get_compression_type(compression) nf = _get_native_file(source, use_memory_map) input_stream = nf.get_input_stream() if compression_type != CompressionType_UNCOMPRESSED: - input_stream = _make_compressed_input_stream(input_stream, - compression_type) + check_status(CCodec.Create(compression_type, &codec)) + check_status(CCompressedInputStream.Make(codec.get(), input_stream, + &compressed_stream)) + input_stream = <shared_ptr[InputStream]> compressed_stream out[0] = input_stream @@ -1292,21 +1288,19 @@ cdef CompressionType _get_compression_type(object name) except *: elif name == 'zstd': return CompressionType_ZSTD else: - raise ValueError("Unrecognized compression type: {0}" - .format(str(name))) + raise ValueError('Unrecognized compression type: {}'.format(name)) -cdef CompressionType _get_compression_type_by_filename(filename) except *: - if filename.endswith('.bz2'): - return CompressionType_BZ2 - elif filename.endswith('.gz'): - return CompressionType_GZIP - elif filename.endswith('.lz4'): - return CompressionType_LZ4 - elif filename.endswith('.zst'): - return CompressionType_ZSTD - else: - return CompressionType_UNCOMPRESSED +def _detect_compression(path): + if isinstance(path, six.string_types): + if path.endswith('.bz2'): + return 'bz2' + elif path.endswith('.gz'): + return 'gzip' + elif path.endswith('.lz4'): + return 'lz4' + elif path.endswith('.zst'): + return 'zstd' def compress(object buf, codec='lz4', asbytes=False, memory_pool=None): @@ -1427,18 +1421,7 @@ def decompress(object buf, decompressed_size=None, codec='lz4', return pybuf if asbytes else out_buf -cdef CompressionType _stream_compression_argument( - compression, source_path) except *: - if compression == 'detect': - if source_path is not None: - return _get_compression_type_by_filename(source_path) - else: - return CompressionType_UNCOMPRESSED - else: - return _get_compression_type(compression) - - -def input_stream(source, compression='detect'): +def input_stream(source, compression='detect', buffer_size=None): """ Create an Arrow input stream. @@ -1452,18 +1435,17 @@ def input_stream(source, compression='detect'): chosen based on the file extension. If None, no compression will be applied. Otherwise, a well-known algorithm name must be supplied (e.g. "gzip") + buffer_size: int, default None + If None or 0, no buffering will happen. Otherwise the size of the + temporary read buffer. """ - cdef: - CompressionType compression_type - NativeFile stream + cdef NativeFile stream try: source_path = _stringify_path(source) except TypeError: source_path = None - compression_type = _stream_compression_argument(compression, source_path) - if isinstance(source, NativeFile): stream = source elif source_path is not None: @@ -1479,13 +1461,19 @@ def input_stream(source, compression='detect'): raise TypeError("pa.input_stream() called with instance of '{}'" .format(source.__class__)) - if compression_type != CompressionType_UNCOMPRESSED: - stream = CompressedInputStream.create(stream, compression_type) + if compression == 'detect': + compression = _detect_compression(source_path) + + if buffer_size is not None and buffer_size != 0: + stream = BufferedInputStream(stream, buffer_size) + + if compression is not None: + stream = CompressedInputStream(stream, compression) return stream -def output_stream(source, compression='detect'): +def output_stream(source, compression='detect', buffer_size=None): """ Create an Arrow output stream. @@ -1499,18 +1487,17 @@ def output_stream(source, compression='detect'): chosen based on the file extension. If None, no compression will be applied. Otherwise, a well-known algorithm name must be supplied (e.g. "gzip") + buffer_size: int, default None + If None or 0, no buffering will happen. Otherwise the size of the + temporary write buffer. """ - cdef: - CompressionType compression_type - NativeFile stream + cdef NativeFile stream try: source_path = _stringify_path(source) except TypeError: source_path = None - compression_type = _stream_compression_argument(compression, source_path) - if isinstance(source, NativeFile): stream = source elif source_path is not None: @@ -1526,7 +1513,13 @@ def output_stream(source, compression='detect'): raise TypeError("pa.output_stream() called with instance of '{}'" .format(source.__class__)) - if compression_type != CompressionType_UNCOMPRESSED: - stream = CompressedOutputStream.create(stream, compression_type) + if compression == 'detect': + compression = _detect_compression(source_path) + + if buffer_size is not None and buffer_size != 0: + stream = BufferedOutputStream(stream, buffer_size) + + if compression is not None: + stream = CompressedOutputStream(stream, compression) return stream diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index f54f03a..77ed70c 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -1134,6 +1134,44 @@ def test_input_stream_file_path_compressed(tmpdir): assert stream.read() == gz_data +def test_input_stream_file_path_buffered(tmpdir): + data = b"some test data\n" * 10 + b"eof\n" + file_path = tmpdir / 'input_stream.buffered' + with open(str(file_path), 'wb') as f: + f.write(data) + + stream = pa.input_stream(file_path, buffer_size=32) + assert stream.read() == data + stream = pa.input_stream(str(file_path), buffer_size=64) + assert stream.read() == data + stream = pa.input_stream(pathlib.Path(str(file_path)), buffer_size=1024) + assert stream.read() == data + + unbuffered_stream = pa.input_stream(file_path, buffer_size=0) + assert isinstance(unbuffered_stream, pa.OSFile) + + msg = 'Buffer size must be larger than zero' + with pytest.raises(ValueError, match=msg): + pa.input_stream(file_path, buffer_size=-1) + with pytest.raises(TypeError): + pa.input_stream(file_path, buffer_size='million') + + +def test_input_stream_file_path_compressed_and_buffered(tmpdir): + data = b"some test data\n" * 100 + b"eof\n" + gz_data = gzip_compress(data) + file_path = tmpdir / 'input_stream_compressed_and_buffered.gz' + with open(str(file_path), 'wb') as f: + f.write(gz_data) + + stream = pa.input_stream(file_path, buffer_size=32, compression='gzip') + assert stream.read() == data + stream = pa.input_stream(str(file_path), buffer_size=64) + assert stream.read() == data + stream = pa.input_stream(pathlib.Path(str(file_path)), buffer_size=1024) + assert stream.read() == data + + def test_input_stream_python_file(tmpdir): data = b"some test data\n" * 10 + b"eof\n" bio = BytesIO(data) @@ -1232,6 +1270,54 @@ def test_output_stream_file_path_compressed(tmpdir): check_data(file_path, data, compression='gzip')) == data assert check_data(file_path, data, compression=None) == data + with pytest.raises(ValueError, match='Unrecognized compression type'): + assert check_data(file_path, data, compression='rabbit') == data + + +def test_output_stream_file_path_buffered(tmpdir): + data = b"some test data\n" * 10 + b"eof\n" + file_path = tmpdir / 'output_stream.buffered' + + def check_data(file_path, data, **kwargs): + with pa.output_stream(file_path, **kwargs) as stream: + stream.write(data) + with open(str(file_path), 'rb') as f: + return f.read() + + unbuffered_stream = pa.output_stream(file_path, buffer_size=0) + assert isinstance(unbuffered_stream, pa.OSFile) + + msg = 'Buffer size must be larger than zero' + with pytest.raises(ValueError, match=msg): + assert check_data(file_path, data, buffer_size=-128) == data + + assert check_data(file_path, data, buffer_size=32) == data + assert check_data(file_path, data, buffer_size=1024) == data + assert check_data(str(file_path), data, buffer_size=32) == data + + result = check_data(pathlib.Path(str(file_path)), data, buffer_size=32) + assert result == data + + +def test_output_stream_file_path_compressed_and_buffered(tmpdir): + data = b"some test data\n" * 100 + b"eof\n" + file_path = tmpdir / 'output_stream_compressed_and_buffered.gz' + + def check_data(file_path, data, **kwargs): + with pa.output_stream(file_path, **kwargs) as stream: + stream.write(data) + with open(str(file_path), 'rb') as f: + return f.read() + + result = check_data(file_path, data, buffer_size=32) + assert gzip_decompress(result) == data + + result = check_data(file_path, data, buffer_size=1024) + assert gzip_decompress(result) == data + + result = check_data(file_path, data, buffer_size=1024, compression='gzip') + assert gzip_decompress(result) == data + def test_output_stream_python_file(tmpdir): data = b"some test data\n" * 10 + b"eof\n"