This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fcad2c  ARROW-3126: [Python] Make Buffered* IO classes available to 
Python, incorporate into input_stream, output_stream factory functions
7fcad2c is described below

commit 7fcad2c29e3c3ac99b2f6c1f1fddc91c05b7f2b3
Author: Krisztián Szűcs <szucs.kriszt...@gmail.com>
AuthorDate: Wed Jan 9 22:38:12 2019 -0600

    ARROW-3126: [Python] Make Buffered* IO classes available to Python, 
incorporate into input_stream, output_stream factory functions
    
    We should add benchmarks too as a follow up PR.
    
    Author: Krisztián Szűcs <szucs.kriszt...@gmail.com>
    Author: Wes McKinney <wesm+...@apache.org>
    
    Closes #3252 from kszucs/ARROW-3126 and squashes the following commits:
    
    50118a639 <Wes McKinney> Fix API in file-benchmark.cc
    d3917d9e5 <Wes McKinney> Code review comments, buffer_size=0 means 
unbuffered
    88bed90ef <Krisztián Szűcs> lint
    5842eae0e <Krisztián Szűcs> remove test runner script
    fd729abdb <Krisztián Szűcs> don't typehint _detect_compression
    3d1e386ce <Krisztián Szűcs> tests
    5e8b38551 <Krisztián Szűcs> fix failing test
    e458db5a6 <Krisztián Szűcs> python support for buffered input and output 
streams
---
 cpp/CMakeLists.txt                   |   2 +-
 cpp/src/arrow/io/api.h               |   1 +
 cpp/src/arrow/io/buffered-test.cc    |   5 +-
 cpp/src/arrow/io/buffered.cc         |  30 +++---
 cpp/src/arrow/io/buffered.h          |  20 ++--
 cpp/src/arrow/io/file-benchmark.cc   |   9 +-
 python/pyarrow/includes/libarrow.pxd |  16 +++
 python/pyarrow/io.pxi                | 195 +++++++++++++++++------------------
 python/pyarrow/tests/test_io.py      |  86 +++++++++++++++
 9 files changed, 234 insertions(+), 130 deletions(-)

diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 0e4f395..08868af 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -80,7 +80,6 @@ if ("$ENV{CMAKE_EXPORT_COMPILE_COMMANDS}" STREQUAL "1" OR 
INFER_FOUND)
   # See http://clang.llvm.org/docs/JSONCompilationDatabase.html
   set(CMAKE_EXPORT_COMPILE_COMMANDS 1)
 endif()
-
 # ----------------------------------------------------------------------
 # cmake options
 
@@ -358,6 +357,7 @@ endif()
 if (ARROW_USE_CCACHE)
   find_program(CCACHE_FOUND ccache)
   if(CCACHE_FOUND)
+    message(STATUS "Using ccache: ${CCACHE_FOUND}")
     set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ${CCACHE_FOUND})
     set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ${CCACHE_FOUND})
   endif(CCACHE_FOUND)
diff --git a/cpp/src/arrow/io/api.h b/cpp/src/arrow/io/api.h
index 0d5742a..cf1be33 100644
--- a/cpp/src/arrow/io/api.h
+++ b/cpp/src/arrow/io/api.h
@@ -18,6 +18,7 @@
 #ifndef ARROW_IO_API_H
 #define ARROW_IO_API_H
 
+#include "arrow/io/buffered.h"
 #include "arrow/io/compressed.h"
 #include "arrow/io/file.h"
 #include "arrow/io/hdfs.h"
diff --git a/cpp/src/arrow/io/buffered-test.cc 
b/cpp/src/arrow/io/buffered-test.cc
index 074833d..7b9ab0c 100644
--- a/cpp/src/arrow/io/buffered-test.cc
+++ b/cpp/src/arrow/io/buffered-test.cc
@@ -105,7 +105,8 @@ class TestBufferedOutputStream : public 
FileTestFixture<BufferedOutputStream> {
       lseek(fd_, 0, SEEK_END);
 #endif
     }
-    ASSERT_OK(BufferedOutputStream::Create(file, buffer_size, &buffered_));
+    ASSERT_OK(BufferedOutputStream::Create(buffer_size, default_memory_pool(), 
file,
+                                           &buffered_));
   }
 
   void WriteChunkwise(const std::string& datastr, const 
std::valarray<int64_t>& sizes) {
@@ -321,7 +322,7 @@ class TestBufferedInputStream : public 
FileTestFixture<BufferedInputStream> {
     std::shared_ptr<ReadableFile> file_in;
     ASSERT_OK(ReadableFile::Open(path_, &file_in));
     raw_ = file_in;
-    ASSERT_OK(BufferedInputStream::Create(raw_, buffer_size, pool, 
&buffered_));
+    ASSERT_OK(BufferedInputStream::Create(buffer_size, pool, raw_, 
&buffered_));
   }
 
  protected:
diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc
index f3eae39..0b1431f 100644
--- a/cpp/src/arrow/io/buffered.cc
+++ b/cpp/src/arrow/io/buffered.cc
@@ -91,8 +91,8 @@ class BufferedBase {
 
 class BufferedOutputStream::Impl : public BufferedBase {
  public:
-  explicit Impl(std::shared_ptr<OutputStream> raw)
-      : BufferedBase(default_memory_pool()), raw_(std::move(raw)) {}
+  explicit Impl(std::shared_ptr<OutputStream> raw, MemoryPool* pool)
+      : BufferedBase(pool), raw_(std::move(raw)) {}
 
   Status Close() {
     std::lock_guard<std::mutex> guard(lock_);
@@ -173,14 +173,16 @@ class BufferedOutputStream::Impl : public BufferedBase {
   std::shared_ptr<OutputStream> raw_;
 };
 
-BufferedOutputStream::BufferedOutputStream(std::shared_ptr<OutputStream> raw)
-    : impl_(new BufferedOutputStream::Impl(std::move(raw))) {}
+BufferedOutputStream::BufferedOutputStream(std::shared_ptr<OutputStream> raw,
+                                           MemoryPool* pool) {
+  impl_.reset(new Impl(std::move(raw), pool));
+}
 
-Status BufferedOutputStream::Create(std::shared_ptr<OutputStream> raw,
-                                    int64_t buffer_size,
+Status BufferedOutputStream::Create(int64_t buffer_size, MemoryPool* pool,
+                                    std::shared_ptr<OutputStream> raw,
                                     std::shared_ptr<BufferedOutputStream>* 
out) {
-  auto result =
-      std::shared_ptr<BufferedOutputStream>(new 
BufferedOutputStream(std::move(raw)));
+  auto result = std::shared_ptr<BufferedOutputStream>(
+      new BufferedOutputStream(std::move(raw), pool));
   RETURN_NOT_OK(result->SetBufferSize(buffer_size));
   *out = std::move(result);
   return Status::OK();
@@ -217,12 +219,12 @@ std::shared_ptr<OutputStream> BufferedOutputStream::raw() 
const { return impl_->
 // ----------------------------------------------------------------------
 // BufferedInputStream implementation
 
-class BufferedInputStream::BufferedInputStreamImpl : public BufferedBase {
+class BufferedInputStream::Impl : public BufferedBase {
  public:
-  BufferedInputStreamImpl(std::shared_ptr<InputStream> raw, MemoryPool* pool)
+  Impl(std::shared_ptr<InputStream> raw, MemoryPool* pool)
       : BufferedBase(pool), raw_(std::move(raw)), bytes_buffered_(0) {}
 
-  ~BufferedInputStreamImpl() { DCHECK_OK(Close()); }
+  ~Impl() { DCHECK_OK(Close()); }
 
   Status Close() {
     std::lock_guard<std::mutex> guard(lock_);
@@ -350,13 +352,13 @@ class BufferedInputStream::BufferedInputStreamImpl : 
public BufferedBase {
 
 BufferedInputStream::BufferedInputStream(std::shared_ptr<InputStream> raw,
                                          MemoryPool* pool) {
-  impl_.reset(new BufferedInputStreamImpl(std::move(raw), pool));
+  impl_.reset(new Impl(std::move(raw), pool));
 }
 
 BufferedInputStream::~BufferedInputStream() { DCHECK_OK(impl_->Close()); }
 
-Status BufferedInputStream::Create(std::shared_ptr<InputStream> raw, int64_t 
buffer_size,
-                                   MemoryPool* pool,
+Status BufferedInputStream::Create(int64_t buffer_size, MemoryPool* pool,
+                                   std::shared_ptr<InputStream> raw,
                                    std::shared_ptr<BufferedInputStream>* out) {
   auto result =
       std::shared_ptr<BufferedInputStream>(new 
BufferedInputStream(std::move(raw), pool));
diff --git a/cpp/src/arrow/io/buffered.h b/cpp/src/arrow/io/buffered.h
index d507955..945915b 100644
--- a/cpp/src/arrow/io/buffered.h
+++ b/cpp/src/arrow/io/buffered.h
@@ -40,12 +40,13 @@ class ARROW_EXPORT BufferedOutputStream : public 
OutputStream {
   ~BufferedOutputStream() override;
 
   /// \brief Create a buffered output stream wrapping the given output stream.
+  /// \param[in] buffer_size the size of the temporary write buffer
+  /// \param[in] pool a MemoryPool to use for allocations
   /// \param[in] raw another OutputStream
-  /// \param[in] buffer_size the size of the temporary buffer. Allocates from
-  /// the default memory pool
   /// \param[out] out the created BufferedOutputStream
   /// \return Status
-  static Status Create(std::shared_ptr<OutputStream> raw, int64_t buffer_size,
+  static Status Create(int64_t buffer_size, MemoryPool* pool,
+                       std::shared_ptr<OutputStream> raw,
                        std::shared_ptr<BufferedOutputStream>* out);
 
   /// \brief Resize internal buffer
@@ -79,7 +80,7 @@ class ARROW_EXPORT BufferedOutputStream : public OutputStream 
{
   std::shared_ptr<OutputStream> raw() const;
 
  private:
-  explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw);
+  explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw, MemoryPool* 
pool);
 
   class ARROW_NO_EXPORT Impl;
   std::unique_ptr<Impl> impl_;
@@ -94,12 +95,13 @@ class ARROW_EXPORT BufferedInputStream : public InputStream 
{
   ~BufferedInputStream() override;
 
   /// \brief Create a BufferedInputStream from a raw InputStream
-  /// \param[in] raw a raw InputStream
   /// \param[in] buffer_size the size of the temporary read buffer
   /// \param[in] pool a MemoryPool to use for allocations
+  /// \param[in] raw a raw InputStream
   /// \param[out] out the created BufferedInputStream
-  static Status Create(std::shared_ptr<InputStream> raw, int64_t buffer_size,
-                       MemoryPool* pool, std::shared_ptr<BufferedInputStream>* 
out);
+  static Status Create(int64_t buffer_size, MemoryPool* pool,
+                       std::shared_ptr<InputStream> raw,
+                       std::shared_ptr<BufferedInputStream>* out);
 
   /// \brief Resize internal read buffer; calls to Read(...) will read at least
   /// \param[in] new_buffer_size the new read buffer size
@@ -138,8 +140,8 @@ class ARROW_EXPORT BufferedInputStream : public InputStream 
{
  private:
   explicit BufferedInputStream(std::shared_ptr<InputStream> raw, MemoryPool* 
pool);
 
-  class ARROW_NO_EXPORT BufferedInputStreamImpl;
-  std::unique_ptr<BufferedInputStreamImpl> impl_;
+  class ARROW_NO_EXPORT Impl;
+  std::unique_ptr<Impl> impl_;
 };
 
 }  // namespace io
diff --git a/cpp/src/arrow/io/file-benchmark.cc 
b/cpp/src/arrow/io/file-benchmark.cc
index c57fa6d..4439a18 100644
--- a/cpp/src/arrow/io/file-benchmark.cc
+++ b/cpp/src/arrow/io/file-benchmark.cc
@@ -163,7 +163,8 @@ static void BM_BufferedOutputStreamSmallWritesToNull(
   ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &file));
 
   std::shared_ptr<io::BufferedOutputStream> buffered_file;
-  ABORT_NOT_OK(io::BufferedOutputStream::Create(file, kBufferSize, 
&buffered_file));
+  ABORT_NOT_OK(io::BufferedOutputStream::Create(kBufferSize, 
default_memory_pool(), file,
+                                                &buffered_file));
   BenchmarkStreamingWrites(state, small_sizes, buffered_file.get());
 }
 
@@ -196,7 +197,8 @@ static void BM_BufferedOutputStreamSmallWritesToPipe(
   SetupPipeWriter(&stream, &reader);
 
   std::shared_ptr<io::BufferedOutputStream> buffered_stream;
-  ABORT_NOT_OK(io::BufferedOutputStream::Create(stream, kBufferSize, 
&buffered_stream));
+  ABORT_NOT_OK(io::BufferedOutputStream::Create(kBufferSize, 
default_memory_pool(),
+                                                stream, &buffered_stream));
   BenchmarkStreamingWrites(state, small_sizes, buffered_stream.get(), 
reader.get());
 }
 
@@ -207,7 +209,8 @@ static void BM_BufferedOutputStreamLargeWritesToPipe(
   SetupPipeWriter(&stream, &reader);
 
   std::shared_ptr<io::BufferedOutputStream> buffered_stream;
-  ABORT_NOT_OK(io::BufferedOutputStream::Create(stream, kBufferSize, 
&buffered_stream));
+  ABORT_NOT_OK(io::BufferedOutputStream::Create(kBufferSize, 
default_memory_pool(),
+                                                stream, &buffered_stream));
 
   BenchmarkStreamingWrites(state, large_sizes, buffered_stream.get(), 
reader.get());
 }
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index cc77ff4..97bc892 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -697,6 +697,22 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" 
nogil:
         CStatus Make(CCodec* codec, shared_ptr[OutputStream] raw,
                      shared_ptr[CCompressedOutputStream]* out)
 
+    cdef cppclass CBufferedInputStream \
+            " arrow::io::BufferedInputStream"(InputStream):
+
+        @staticmethod
+        CStatus Create(int64_t buffer_size, CMemoryPool* pool,
+                       shared_ptr[InputStream] raw,
+                       shared_ptr[CBufferedInputStream]* out)
+
+    cdef cppclass CBufferedOutputStream \
+            " arrow::io::BufferedOutputStream"(OutputStream):
+
+        @staticmethod
+        CStatus Create(int64_t buffer_size, CMemoryPool* pool,
+                       shared_ptr[OutputStream] raw,
+                       shared_ptr[CBufferedOutputStream]* out)
+
     # ----------------------------------------------------------------------
     # HDFS
 
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 97abde8..5212274 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -1064,32 +1064,6 @@ cdef class BufferReader(NativeFile):
         self.is_readable = True
 
 
-cdef shared_ptr[InputStream] _make_compressed_input_stream(
-        shared_ptr[InputStream] stream,
-        CompressionType compression_type) except *:
-    cdef:
-        shared_ptr[CCompressedInputStream] compressed_stream
-        unique_ptr[CCodec] codec
-
-    check_status(CCodec.Create(compression_type, &codec))
-    check_status(CCompressedInputStream.Make(codec.get(), stream,
-                                             &compressed_stream))
-    return <shared_ptr[InputStream]> compressed_stream
-
-
-cdef shared_ptr[OutputStream] _make_compressed_output_stream(
-        shared_ptr[OutputStream] stream,
-        CompressionType compression_type) except *:
-    cdef:
-        shared_ptr[CCompressedOutputStream] compressed_stream
-        unique_ptr[CCodec] codec
-
-    check_status(CCodec.Create(compression_type, &codec))
-    check_status(CCompressedOutputStream.Make(codec.get(), stream,
-                                              &compressed_stream))
-    return <shared_ptr[OutputStream]> compressed_stream
-
-
 cdef class CompressedInputStream(NativeFile):
     """
     An input stream wrapper which decompresses data on the fly.
@@ -1104,26 +1078,19 @@ cdef class CompressedInputStream(NativeFile):
     def __init__(self, NativeFile stream, compression):
         cdef:
             CompressionType compression_type
+            unique_ptr[CCodec] codec
+            shared_ptr[CCompressedInputStream] compressed_stream
 
         compression_type = _get_compression_type(compression)
         if compression_type == CompressionType_UNCOMPRESSED:
-            raise ValueError("Invalid value for compression: %r"
-                             % (compression,))
-        self._init(stream, compression_type)
+            raise ValueError('Invalid value for compression: {!r}'
+                             .format(compression))
 
-    @staticmethod
-    cdef create(NativeFile stream, CompressionType compression_type):
-        cdef:
-            CompressedInputStream self
-
-        self = CompressedInputStream.__new__(CompressedInputStream)
-        self._init(stream, compression_type)
-        return self
+        check_status(CCodec.Create(compression_type, &codec))
+        check_status(CCompressedInputStream.Make(
+            codec.get(), stream.get_input_stream(), &compressed_stream))
 
-    cdef _init(self, NativeFile stream, CompressionType compression_type):
-        self.set_input_stream(
-            _make_compressed_input_stream(stream.get_input_stream(),
-                                          compression_type))
+        self.set_input_stream(<shared_ptr[InputStream]> compressed_stream)
         self.is_readable = True
 
 
@@ -1138,29 +1105,55 @@ cdef class CompressedOutputStream(NativeFile):
         The compression type ("bz2", "brotli", "gzip", "lz4", "snappy"
         or "zstd")
     """
+
     def __init__(self, NativeFile stream, compression):
         cdef:
             CompressionType compression_type
+            unique_ptr[CCodec] codec
+            shared_ptr[CCompressedOutputStream] compressed_stream
 
         compression_type = _get_compression_type(compression)
         if compression_type == CompressionType_UNCOMPRESSED:
-            raise ValueError("Invalid value for compression: %r"
-                             % (compression,))
-        self._init(stream, compression_type)
+            raise ValueError('Invalid value for compression: {!r}'
+                             .format(compression))
 
-    @staticmethod
-    cdef create(NativeFile stream, CompressionType compression_type):
-        cdef:
-            CompressedOutputStream self
+        check_status(CCodec.Create(compression_type, &codec))
+        check_status(CCompressedOutputStream.Make(
+            codec.get(), stream.get_output_stream(), &compressed_stream))
 
-        self = CompressedOutputStream.__new__(CompressedOutputStream)
-        self._init(stream, compression_type)
-        return self
+        self.set_output_stream(<shared_ptr[OutputStream]> compressed_stream)
+        self.is_writable = True
+
+
+cdef class BufferedInputStream(NativeFile):
+
+    def __init__(self, NativeFile stream, int buffer_size,
+                 MemoryPool memory_pool=None):
+        cdef shared_ptr[CBufferedInputStream] buffered_stream
+
+        if buffer_size <= 0:
+            raise ValueError('Buffer size must be larger than zero')
+        check_status(CBufferedInputStream.Create(
+            buffer_size, maybe_unbox_memory_pool(memory_pool),
+            stream.get_input_stream(), &buffered_stream))
+
+        self.set_input_stream(<shared_ptr[InputStream]> buffered_stream)
+        self.is_readable = True
+
+
+cdef class BufferedOutputStream(NativeFile):
+
+    def __init__(self, NativeFile stream, int buffer_size,
+                 MemoryPool memory_pool=None):
+        cdef shared_ptr[CBufferedOutputStream] buffered_stream
+
+        if buffer_size <= 0:
+            raise ValueError('Buffer size must be larger than zero')
+        check_status(CBufferedOutputStream.Create(
+            buffer_size, maybe_unbox_memory_pool(memory_pool),
+            stream.get_output_stream(), &buffered_stream))
 
-    cdef _init(self, NativeFile stream, CompressionType compression_type):
-        self.set_output_stream(
-            _make_compressed_output_stream(stream.get_output_stream(),
-                                           compression_type))
+        self.set_output_stream(<shared_ptr[OutputStream]> buffered_stream)
         self.is_writable = True
 
 
@@ -1232,24 +1225,27 @@ cdef get_input_stream(object source, c_bool 
use_memory_map,
     """
     cdef:
         NativeFile nf
+        unique_ptr[CCodec] codec
         shared_ptr[InputStream] input_stream
         shared_ptr[CCompressedInputStream] compressed_stream
-        CompressionType compression_type = CompressionType_UNCOMPRESSED
-        unique_ptr[CCodec] codec
+        CompressionType compression_type
 
     try:
         source_path = _stringify_path(source)
     except TypeError:
-        pass
+        compression = None
     else:
-        compression_type = _get_compression_type_by_filename(source_path)
+        compression = _detect_compression(source_path)
 
+    compression_type = _get_compression_type(compression)
     nf = _get_native_file(source, use_memory_map)
     input_stream = nf.get_input_stream()
 
     if compression_type != CompressionType_UNCOMPRESSED:
-        input_stream = _make_compressed_input_stream(input_stream,
-                                                     compression_type)
+        check_status(CCodec.Create(compression_type, &codec))
+        check_status(CCompressedInputStream.Make(codec.get(), input_stream,
+                                                 &compressed_stream))
+        input_stream = <shared_ptr[InputStream]> compressed_stream
 
     out[0] = input_stream
 
@@ -1292,21 +1288,19 @@ cdef CompressionType _get_compression_type(object name) 
except *:
     elif name == 'zstd':
         return CompressionType_ZSTD
     else:
-        raise ValueError("Unrecognized compression type: {0}"
-                         .format(str(name)))
+        raise ValueError('Unrecognized compression type: {}'.format(name))
 
 
-cdef CompressionType _get_compression_type_by_filename(filename) except *:
-    if filename.endswith('.bz2'):
-        return CompressionType_BZ2
-    elif filename.endswith('.gz'):
-        return CompressionType_GZIP
-    elif filename.endswith('.lz4'):
-        return CompressionType_LZ4
-    elif filename.endswith('.zst'):
-        return CompressionType_ZSTD
-    else:
-        return CompressionType_UNCOMPRESSED
+def _detect_compression(path):
+    if isinstance(path, six.string_types):
+        if path.endswith('.bz2'):
+            return 'bz2'
+        elif path.endswith('.gz'):
+            return 'gzip'
+        elif path.endswith('.lz4'):
+            return 'lz4'
+        elif path.endswith('.zst'):
+            return 'zstd'
 
 
 def compress(object buf, codec='lz4', asbytes=False, memory_pool=None):
@@ -1427,18 +1421,7 @@ def decompress(object buf, decompressed_size=None, 
codec='lz4',
     return pybuf if asbytes else out_buf
 
 
-cdef CompressionType _stream_compression_argument(
-        compression, source_path) except *:
-    if compression == 'detect':
-        if source_path is not None:
-            return _get_compression_type_by_filename(source_path)
-        else:
-            return CompressionType_UNCOMPRESSED
-    else:
-        return _get_compression_type(compression)
-
-
-def input_stream(source, compression='detect'):
+def input_stream(source, compression='detect', buffer_size=None):
     """
     Create an Arrow input stream.
 
@@ -1452,18 +1435,17 @@ def input_stream(source, compression='detect'):
         chosen based on the file extension.
         If None, no compression will be applied.
         Otherwise, a well-known algorithm name must be supplied (e.g. "gzip")
+    buffer_size: int, default None
+        If None or 0, no buffering will happen.  Otherwise the size of the
+        temporary read buffer.
     """
-    cdef:
-        CompressionType compression_type
-        NativeFile stream
+    cdef NativeFile stream
 
     try:
         source_path = _stringify_path(source)
     except TypeError:
         source_path = None
 
-    compression_type = _stream_compression_argument(compression, source_path)
-
     if isinstance(source, NativeFile):
         stream = source
     elif source_path is not None:
@@ -1479,13 +1461,19 @@ def input_stream(source, compression='detect'):
         raise TypeError("pa.input_stream() called with instance of '{}'"
                         .format(source.__class__))
 
-    if compression_type != CompressionType_UNCOMPRESSED:
-        stream = CompressedInputStream.create(stream, compression_type)
+    if compression == 'detect':
+        compression = _detect_compression(source_path)
+
+    if buffer_size is not None and buffer_size != 0:
+        stream = BufferedInputStream(stream, buffer_size)
+
+    if compression is not None:
+        stream = CompressedInputStream(stream, compression)
 
     return stream
 
 
-def output_stream(source, compression='detect'):
+def output_stream(source, compression='detect', buffer_size=None):
     """
     Create an Arrow output stream.
 
@@ -1499,18 +1487,17 @@ def output_stream(source, compression='detect'):
         chosen based on the file extension.
         If None, no compression will be applied.
         Otherwise, a well-known algorithm name must be supplied (e.g. "gzip")
+    buffer_size: int, default None
+        If None or 0, no buffering will happen.  Otherwise the size of the
+        temporary write buffer.
     """
-    cdef:
-        CompressionType compression_type
-        NativeFile stream
+    cdef NativeFile stream
 
     try:
         source_path = _stringify_path(source)
     except TypeError:
         source_path = None
 
-    compression_type = _stream_compression_argument(compression, source_path)
-
     if isinstance(source, NativeFile):
         stream = source
     elif source_path is not None:
@@ -1526,7 +1513,13 @@ def output_stream(source, compression='detect'):
         raise TypeError("pa.output_stream() called with instance of '{}'"
                         .format(source.__class__))
 
-    if compression_type != CompressionType_UNCOMPRESSED:
-        stream = CompressedOutputStream.create(stream, compression_type)
+    if compression == 'detect':
+        compression = _detect_compression(source_path)
+
+    if buffer_size is not None and buffer_size != 0:
+        stream = BufferedOutputStream(stream, buffer_size)
+
+    if compression is not None:
+        stream = CompressedOutputStream(stream, compression)
 
     return stream
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index f54f03a..77ed70c 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -1134,6 +1134,44 @@ def test_input_stream_file_path_compressed(tmpdir):
     assert stream.read() == gz_data
 
 
+def test_input_stream_file_path_buffered(tmpdir):
+    data = b"some test data\n" * 10 + b"eof\n"
+    file_path = tmpdir / 'input_stream.buffered'
+    with open(str(file_path), 'wb') as f:
+        f.write(data)
+
+    stream = pa.input_stream(file_path, buffer_size=32)
+    assert stream.read() == data
+    stream = pa.input_stream(str(file_path), buffer_size=64)
+    assert stream.read() == data
+    stream = pa.input_stream(pathlib.Path(str(file_path)), buffer_size=1024)
+    assert stream.read() == data
+
+    unbuffered_stream = pa.input_stream(file_path, buffer_size=0)
+    assert isinstance(unbuffered_stream, pa.OSFile)
+
+    msg = 'Buffer size must be larger than zero'
+    with pytest.raises(ValueError, match=msg):
+        pa.input_stream(file_path, buffer_size=-1)
+    with pytest.raises(TypeError):
+        pa.input_stream(file_path, buffer_size='million')
+
+
+def test_input_stream_file_path_compressed_and_buffered(tmpdir):
+    data = b"some test data\n" * 100 + b"eof\n"
+    gz_data = gzip_compress(data)
+    file_path = tmpdir / 'input_stream_compressed_and_buffered.gz'
+    with open(str(file_path), 'wb') as f:
+        f.write(gz_data)
+
+    stream = pa.input_stream(file_path, buffer_size=32, compression='gzip')
+    assert stream.read() == data
+    stream = pa.input_stream(str(file_path), buffer_size=64)
+    assert stream.read() == data
+    stream = pa.input_stream(pathlib.Path(str(file_path)), buffer_size=1024)
+    assert stream.read() == data
+
+
 def test_input_stream_python_file(tmpdir):
     data = b"some test data\n" * 10 + b"eof\n"
     bio = BytesIO(data)
@@ -1232,6 +1270,54 @@ def test_output_stream_file_path_compressed(tmpdir):
         check_data(file_path, data, compression='gzip')) == data
     assert check_data(file_path, data, compression=None) == data
 
+    with pytest.raises(ValueError, match='Unrecognized compression type'):
+        assert check_data(file_path, data, compression='rabbit') == data
+
+
+def test_output_stream_file_path_buffered(tmpdir):
+    data = b"some test data\n" * 10 + b"eof\n"
+    file_path = tmpdir / 'output_stream.buffered'
+
+    def check_data(file_path, data, **kwargs):
+        with pa.output_stream(file_path, **kwargs) as stream:
+            stream.write(data)
+        with open(str(file_path), 'rb') as f:
+            return f.read()
+
+    unbuffered_stream = pa.output_stream(file_path, buffer_size=0)
+    assert isinstance(unbuffered_stream, pa.OSFile)
+
+    msg = 'Buffer size must be larger than zero'
+    with pytest.raises(ValueError, match=msg):
+        assert check_data(file_path, data, buffer_size=-128) == data
+
+    assert check_data(file_path, data, buffer_size=32) == data
+    assert check_data(file_path, data, buffer_size=1024) == data
+    assert check_data(str(file_path), data, buffer_size=32) == data
+
+    result = check_data(pathlib.Path(str(file_path)), data, buffer_size=32)
+    assert result == data
+
+
+def test_output_stream_file_path_compressed_and_buffered(tmpdir):
+    data = b"some test data\n" * 100 + b"eof\n"
+    file_path = tmpdir / 'output_stream_compressed_and_buffered.gz'
+
+    def check_data(file_path, data, **kwargs):
+        with pa.output_stream(file_path, **kwargs) as stream:
+            stream.write(data)
+        with open(str(file_path), 'rb') as f:
+            return f.read()
+
+    result = check_data(file_path, data, buffer_size=32)
+    assert gzip_decompress(result) == data
+
+    result = check_data(file_path, data, buffer_size=1024)
+    assert gzip_decompress(result) == data
+
+    result = check_data(file_path, data, buffer_size=1024, compression='gzip')
+    assert gzip_decompress(result) == data
+
 
 def test_output_stream_python_file(tmpdir):
     data = b"some test data\n" * 10 + b"eof\n"

Reply via email to