http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_io.pyx b/python/pyarrow/_io.pyx new file mode 100644 index 0000000..9f067fb --- /dev/null +++ b/python/pyarrow/_io.pyx @@ -0,0 +1,1273 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Cython wrappers for IO interfaces defined in arrow::io and messaging in +# arrow::ipc + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from cython.operator cimport dereference as deref +from libc.stdlib cimport malloc, free +from pyarrow.includes.libarrow cimport * +cimport pyarrow.includes.pyarrow as pyarrow +from pyarrow._array cimport Array, Tensor, box_tensor, Schema +from pyarrow._error cimport check_status +from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool +from pyarrow._table cimport (Column, RecordBatch, batch_from_cbatch, + table_from_ctable) +cimport cpython as cp + +import pyarrow._config +from pyarrow.compat import frombytes, tobytes, encode_file_path + +import re +import six +import sys +import threading +import time + + +# 64K +DEFAULT_BUFFER_SIZE = 2 ** 16 + + +# To let us get a PyObject* and avoid Cython auto-ref-counting +cdef extern from "Python.h": + PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"( + char *v, Py_ssize_t len) except NULL + +cdef class NativeFile: + + def __cinit__(self): + self.is_open = False + self.own_file = False + + def __dealloc__(self): + if self.is_open and self.own_file: + self.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + self.close() + + def close(self): + if self.is_open: + with nogil: + if self.is_readable: + check_status(self.rd_file.get().Close()) + else: + check_status(self.wr_file.get().Close()) + self.is_open = False + + cdef read_handle(self, shared_ptr[RandomAccessFile]* file): + self._assert_readable() + file[0] = <shared_ptr[RandomAccessFile]> self.rd_file + + cdef write_handle(self, shared_ptr[OutputStream]* file): + self._assert_writeable() + file[0] = <shared_ptr[OutputStream]> self.wr_file + + def _assert_readable(self): + if not self.is_readable: + raise IOError("only valid on readonly files") + + if not self.is_open: + raise IOError("file not open") + + def _assert_writeable(self): + if not self.is_writeable: + raise IOError("only valid on writeable files") + + if not self.is_open: + raise IOError("file not open") + + def size(self): + cdef int64_t size + self._assert_readable() + with nogil: + check_status(self.rd_file.get().GetSize(&size)) + return size + + def tell(self): + cdef int64_t position + with nogil: + if self.is_readable: + check_status(self.rd_file.get().Tell(&position)) + else: + check_status(self.wr_file.get().Tell(&position)) + return position + + def seek(self, int64_t position): + self._assert_readable() + with nogil: + check_status(self.rd_file.get().Seek(position)) + + def write(self, data): + """ + Write byte from any object implementing buffer protocol (bytes, + bytearray, ndarray, pyarrow.Buffer) + """ + self._assert_writeable() + + if isinstance(data, six.string_types): + data = tobytes(data) + + cdef Buffer arrow_buffer = frombuffer(data) + + cdef const uint8_t* buf = arrow_buffer.buffer.get().data() + cdef int64_t bufsize = len(arrow_buffer) + with nogil: + check_status(self.wr_file.get().Write(buf, bufsize)) + + def read(self, nbytes=None): + cdef: + int64_t c_nbytes + int64_t bytes_read = 0 + PyObject* obj + + if nbytes is None: + c_nbytes = self.size() - self.tell() + else: + c_nbytes = nbytes + + self._assert_readable() + + # Allocate empty write space + obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes) + + cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj) + with nogil: + check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf)) + + if bytes_read < c_nbytes: + cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read) + + return PyObject_to_object(obj) + + def read_buffer(self, nbytes=None): + cdef: + int64_t c_nbytes + int64_t bytes_read = 0 + shared_ptr[CBuffer] output + self._assert_readable() + + if nbytes is None: + c_nbytes = self.size() - self.tell() + else: + c_nbytes = nbytes + + with nogil: + check_status(self.rd_file.get().ReadB(c_nbytes, &output)) + + return wrap_buffer(output) + + def download(self, stream_or_path, buffer_size=None): + """ + Read file completely to local path (rather than reading completely into + memory). First seeks to the beginning of the file. + """ + cdef: + int64_t bytes_read = 0 + uint8_t* buf + self._assert_readable() + + buffer_size = buffer_size or DEFAULT_BUFFER_SIZE + + write_queue = Queue(50) + + if not hasattr(stream_or_path, 'read'): + stream = open(stream_or_path, 'wb') + cleanup = lambda: stream.close() + else: + stream = stream_or_path + cleanup = lambda: None + + done = False + exc_info = None + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue + stream.write(buf) + except Exception as e: + exc_info = sys.exc_info() + finally: + cleanup() + + self.seek(0) + + writer_thread = threading.Thread(target=bg_write) + + # This isn't ideal -- PyBytes_FromStringAndSize copies the data from + # the passed buffer, so it's hard for us to avoid doubling the memory + buf = <uint8_t*> malloc(buffer_size) + if buf == NULL: + raise MemoryError("Failed to allocate {0} bytes" + .format(buffer_size)) + + writer_thread.start() + + cdef int64_t total_bytes = 0 + cdef int32_t c_buffer_size = buffer_size + + try: + while True: + with nogil: + check_status(self.rd_file.get() + .Read(c_buffer_size, &bytes_read, buf)) + + total_bytes += bytes_read + + # EOF + if bytes_read == 0: + break + + pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf, + bytes_read) + + write_queue.put_nowait(pybuf) + finally: + free(buf) + done = True + + writer_thread.join() + if exc_info is not None: + raise exc_info[0], exc_info[1], exc_info[2] + + def upload(self, stream, buffer_size=None): + """ + Pipe file-like object to file + """ + write_queue = Queue(50) + self._assert_writeable() + + buffer_size = buffer_size or DEFAULT_BUFFER_SIZE + + done = False + exc_info = None + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue + + self.write(buf) + + except Exception as e: + exc_info = sys.exc_info() + + writer_thread = threading.Thread(target=bg_write) + writer_thread.start() + + try: + while True: + buf = stream.read(buffer_size) + if not buf: + break + + if writer_thread.is_alive(): + while write_queue.full(): + time.sleep(0.01) + else: + break + + write_queue.put_nowait(buf) + finally: + done = True + + writer_thread.join() + if exc_info is not None: + raise exc_info[0], exc_info[1], exc_info[2] + + +# ---------------------------------------------------------------------- +# Python file-like objects + + +cdef class PythonFileInterface(NativeFile): + cdef: + object handle + + def __cinit__(self, handle, mode='w'): + self.handle = handle + + if mode.startswith('w'): + self.wr_file.reset(new pyarrow.PyOutputStream(handle)) + self.is_readable = 0 + self.is_writeable = 1 + elif mode.startswith('r'): + self.rd_file.reset(new pyarrow.PyReadableFile(handle)) + self.is_readable = 1 + self.is_writeable = 0 + else: + raise ValueError('Invalid file mode: {0}'.format(mode)) + + self.is_open = True + + +cdef class MemoryMappedFile(NativeFile): + """ + Supports 'r', 'r+w', 'w' modes + """ + cdef: + object path + + def __cinit__(self): + self.is_open = False + self.is_readable = 0 + self.is_writeable = 0 + + @staticmethod + def create(path, size): + cdef: + shared_ptr[CMemoryMappedFile] handle + c_string c_path = encode_file_path(path) + int64_t c_size = size + + with nogil: + check_status(CMemoryMappedFile.Create(c_path, c_size, &handle)) + + cdef MemoryMappedFile result = MemoryMappedFile() + result.path = path + result.is_readable = 1 + result.is_writeable = 1 + result.wr_file = <shared_ptr[OutputStream]> handle + result.rd_file = <shared_ptr[RandomAccessFile]> handle + result.is_open = True + + return result + + def open(self, path, mode='r'): + self.path = path + + cdef: + FileMode c_mode + shared_ptr[CMemoryMappedFile] handle + c_string c_path = encode_file_path(path) + + if mode in ('r', 'rb'): + c_mode = FileMode_READ + self.is_readable = 1 + elif mode in ('w', 'wb'): + c_mode = FileMode_WRITE + self.is_writeable = 1 + elif mode == 'r+w': + c_mode = FileMode_READWRITE + self.is_readable = 1 + self.is_writeable = 1 + else: + raise ValueError('Invalid file mode: {0}'.format(mode)) + + check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle)) + + self.wr_file = <shared_ptr[OutputStream]> handle + self.rd_file = <shared_ptr[RandomAccessFile]> handle + self.is_open = True + + +def memory_map(path, mode='r'): + """ + Open memory map at file path. Size of the memory map cannot change + + Parameters + ---------- + path : string + mode : {'r', 'w'}, default 'r' + + Returns + ------- + mmap : MemoryMappedFile + """ + cdef MemoryMappedFile mmap = MemoryMappedFile() + mmap.open(path, mode) + return mmap + + +def create_memory_map(path, size): + """ + Create memory map at indicated path of the given size, return open + writeable file object + + Parameters + ---------- + path : string + size : int + + Returns + ------- + mmap : MemoryMappedFile + """ + return MemoryMappedFile.create(path, size) + + +cdef class OSFile(NativeFile): + """ + Supports 'r', 'w' modes + """ + cdef: + object path + + def __cinit__(self, path, mode='r', MemoryPool memory_pool=None): + self.path = path + + cdef: + FileMode c_mode + shared_ptr[Readable] handle + c_string c_path = encode_file_path(path) + + self.is_readable = self.is_writeable = 0 + + if mode in ('r', 'rb'): + self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool)) + elif mode in ('w', 'wb'): + self._open_writeable(c_path) + else: + raise ValueError('Invalid file mode: {0}'.format(mode)) + + self.is_open = True + + cdef _open_readable(self, c_string path, CMemoryPool* pool): + cdef shared_ptr[ReadableFile] handle + + with nogil: + check_status(ReadableFile.Open(path, pool, &handle)) + + self.is_readable = 1 + self.rd_file = <shared_ptr[RandomAccessFile]> handle + + cdef _open_writeable(self, c_string path): + cdef shared_ptr[FileOutputStream] handle + + with nogil: + check_status(FileOutputStream.Open(path, &handle)) + self.is_writeable = 1 + self.wr_file = <shared_ptr[OutputStream]> handle + + +# ---------------------------------------------------------------------- +# Arrow buffers + + +cdef class Buffer: + + def __cinit__(self): + pass + + cdef init(self, const shared_ptr[CBuffer]& buffer): + self.buffer = buffer + self.shape[0] = self.size + self.strides[0] = <Py_ssize_t>(1) + + def __len__(self): + return self.size + + property size: + + def __get__(self): + return self.buffer.get().size() + + property parent: + + def __get__(self): + cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent() + + if parent_buf.get() == NULL: + return None + else: + return wrap_buffer(parent_buf) + + def __getitem__(self, key): + # TODO(wesm): buffer slicing + raise NotImplementedError + + def to_pybytes(self): + return cp.PyBytes_FromStringAndSize( + <const char*>self.buffer.get().data(), + self.buffer.get().size()) + + def __getbuffer__(self, cp.Py_buffer* buffer, int flags): + + buffer.buf = <char *>self.buffer.get().data() + buffer.format = 'b' + buffer.internal = NULL + buffer.itemsize = 1 + buffer.len = self.size + buffer.ndim = 1 + buffer.obj = self + buffer.readonly = 1 + buffer.shape = self.shape + buffer.strides = self.strides + buffer.suboffsets = NULL + +cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool): + cdef shared_ptr[PoolBuffer] result + result.reset(new PoolBuffer(pool)) + return result + + +cdef class InMemoryOutputStream(NativeFile): + + cdef: + shared_ptr[PoolBuffer] buffer + + def __cinit__(self, MemoryPool memory_pool=None): + self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool)) + self.wr_file.reset(new BufferOutputStream( + <shared_ptr[ResizableBuffer]> self.buffer)) + self.is_readable = 0 + self.is_writeable = 1 + self.is_open = True + + def get_result(self): + check_status(self.wr_file.get().Close()) + self.is_open = False + return wrap_buffer(<shared_ptr[CBuffer]> self.buffer) + + +cdef class BufferReader(NativeFile): + """ + Zero-copy reader from objects convertible to Arrow buffer + + Parameters + ---------- + obj : Python bytes or pyarrow.io.Buffer + """ + cdef: + Buffer buffer + + def __cinit__(self, object obj): + + if isinstance(obj, Buffer): + self.buffer = obj + else: + self.buffer = frombuffer(obj) + + self.rd_file.reset(new CBufferReader(self.buffer.buffer)) + self.is_readable = 1 + self.is_writeable = 0 + self.is_open = True + + +def frombuffer(object obj): + """ + Construct an Arrow buffer from a Python bytes object + """ + cdef shared_ptr[CBuffer] buf + try: + memoryview(obj) + buf.reset(new pyarrow.PyBuffer(obj)) + return wrap_buffer(buf) + except TypeError: + raise ValueError('Must pass object that implements buffer protocol') + + + +cdef Buffer wrap_buffer(const shared_ptr[CBuffer]& buf): + cdef Buffer result = Buffer() + result.init(buf) + return result + + +cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader): + cdef NativeFile nf + + if isinstance(source, six.string_types): + source = memory_map(source, mode='r') + elif isinstance(source, Buffer): + source = BufferReader(source) + elif not isinstance(source, NativeFile) and hasattr(source, 'read'): + # Optimistically hope this is file-like + source = PythonFileInterface(source, mode='r') + + if isinstance(source, NativeFile): + nf = source + + # TODO: what about read-write sources (e.g. memory maps) + if not nf.is_readable: + raise IOError('Native file is not readable') + + nf.read_handle(reader) + else: + raise TypeError('Unable to read from object of type: {0}' + .format(type(source))) + + +cdef get_writer(object source, shared_ptr[OutputStream]* writer): + cdef NativeFile nf + + if isinstance(source, six.string_types): + source = OSFile(source, mode='w') + elif not isinstance(source, NativeFile) and hasattr(source, 'write'): + # Optimistically hope this is file-like + source = PythonFileInterface(source, mode='w') + + if isinstance(source, NativeFile): + nf = source + + if nf.is_readable: + raise IOError('Native file is not writeable') + + nf.write_handle(writer) + else: + raise TypeError('Unable to read from object of type: {0}' + .format(type(source))) + +# ---------------------------------------------------------------------- +# HDFS IO implementation + +_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)') + +try: + # Python 3 + from queue import Queue, Empty as QueueEmpty, Full as QueueFull +except ImportError: + from Queue import Queue, Empty as QueueEmpty, Full as QueueFull + + +def have_libhdfs(): + try: + check_status(HaveLibHdfs()) + return True + except: + return False + + +def have_libhdfs3(): + try: + check_status(HaveLibHdfs3()) + return True + except: + return False + + +def strip_hdfs_abspath(path): + m = _HDFS_PATH_RE.match(path) + if m: + return m.group(3) + else: + return path + + +cdef class _HdfsClient: + cdef: + shared_ptr[CHdfsClient] client + + cdef readonly: + bint is_open + + def __cinit__(self): + pass + + def _connect(self, host, port, user, kerb_ticket, driver): + cdef HdfsConnectionConfig conf + + if host is not None: + conf.host = tobytes(host) + conf.port = port + if user is not None: + conf.user = tobytes(user) + if kerb_ticket is not None: + conf.kerb_ticket = tobytes(kerb_ticket) + + if driver == 'libhdfs': + check_status(HaveLibHdfs()) + conf.driver = HdfsDriver_LIBHDFS + else: + check_status(HaveLibHdfs3()) + conf.driver = HdfsDriver_LIBHDFS3 + + with nogil: + check_status(CHdfsClient.Connect(&conf, &self.client)) + self.is_open = True + + @classmethod + def connect(cls, *args, **kwargs): + return cls(*args, **kwargs) + + def __dealloc__(self): + if self.is_open: + self.close() + + def close(self): + """ + Disconnect from the HDFS cluster + """ + self._ensure_client() + with nogil: + check_status(self.client.get().Disconnect()) + self.is_open = False + + cdef _ensure_client(self): + if self.client.get() == NULL: + raise IOError('HDFS client improperly initialized') + elif not self.is_open: + raise IOError('HDFS client is closed') + + def exists(self, path): + """ + Returns True if the path is known to the cluster, False if it does not + (or there is an RPC error) + """ + self._ensure_client() + + cdef c_string c_path = tobytes(path) + cdef c_bool result + with nogil: + result = self.client.get().Exists(c_path) + return result + + def isdir(self, path): + cdef HdfsPathInfo info + self._path_info(path, &info) + return info.kind == ObjectType_DIRECTORY + + def isfile(self, path): + cdef HdfsPathInfo info + self._path_info(path, &info) + return info.kind == ObjectType_FILE + + cdef _path_info(self, path, HdfsPathInfo* info): + cdef c_string c_path = tobytes(path) + + with nogil: + check_status(self.client.get() + .GetPathInfo(c_path, info)) + + + def ls(self, path, bint full_info): + cdef: + c_string c_path = tobytes(path) + vector[HdfsPathInfo] listing + list results = [] + int i + + self._ensure_client() + + with nogil: + check_status(self.client.get() + .ListDirectory(c_path, &listing)) + + cdef const HdfsPathInfo* info + for i in range(<int> listing.size()): + info = &listing[i] + + # Try to trim off the hdfs://HOST:PORT piece + name = strip_hdfs_abspath(frombytes(info.name)) + + if full_info: + kind = ('file' if info.kind == ObjectType_FILE + else 'directory') + + results.append({ + 'kind': kind, + 'name': name, + 'owner': frombytes(info.owner), + 'group': frombytes(info.group), + 'list_modified_time': info.last_modified_time, + 'list_access_time': info.last_access_time, + 'size': info.size, + 'replication': info.replication, + 'block_size': info.block_size, + 'permissions': info.permissions + }) + else: + results.append(name) + + return results + + def mkdir(self, path): + """ + Create indicated directory and any necessary parent directories + """ + self._ensure_client() + + cdef c_string c_path = tobytes(path) + with nogil: + check_status(self.client.get() + .CreateDirectory(c_path)) + + def delete(self, path, bint recursive=False): + """ + Delete the indicated file or directory + + Parameters + ---------- + path : string + recursive : boolean, default False + If True, also delete child paths for directories + """ + self._ensure_client() + + cdef c_string c_path = tobytes(path) + with nogil: + check_status(self.client.get() + .Delete(c_path, recursive)) + + def open(self, path, mode='rb', buffer_size=None, replication=None, + default_block_size=None): + """ + Parameters + ---------- + mode : string, 'rb', 'wb', 'ab' + """ + self._ensure_client() + + cdef HdfsFile out = HdfsFile() + + if mode not in ('rb', 'wb', 'ab'): + raise Exception("Mode must be 'rb' (read), " + "'wb' (write, new file), or 'ab' (append)") + + cdef c_string c_path = tobytes(path) + cdef c_bool append = False + + # 0 in libhdfs means "use the default" + cdef int32_t c_buffer_size = buffer_size or 0 + cdef int16_t c_replication = replication or 0 + cdef int64_t c_default_block_size = default_block_size or 0 + + cdef shared_ptr[HdfsOutputStream] wr_handle + cdef shared_ptr[HdfsReadableFile] rd_handle + + if mode in ('wb', 'ab'): + if mode == 'ab': + append = True + + with nogil: + check_status( + self.client.get() + .OpenWriteable(c_path, append, c_buffer_size, + c_replication, c_default_block_size, + &wr_handle)) + + out.wr_file = <shared_ptr[OutputStream]> wr_handle + + out.is_readable = False + out.is_writeable = 1 + else: + with nogil: + check_status(self.client.get() + .OpenReadable(c_path, &rd_handle)) + + out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle + out.is_readable = True + out.is_writeable = 0 + + if c_buffer_size == 0: + c_buffer_size = 2 ** 16 + + out.mode = mode + out.buffer_size = c_buffer_size + out.parent = _HdfsFileNanny(self, out) + out.is_open = True + out.own_file = True + + return out + + def download(self, path, stream, buffer_size=None): + with self.open(path, 'rb') as f: + f.download(stream, buffer_size=buffer_size) + + def upload(self, path, stream, buffer_size=None): + """ + Upload file-like object to HDFS path + """ + with self.open(path, 'wb') as f: + f.upload(stream, buffer_size=buffer_size) + + +# ARROW-404: Helper class to ensure that files are closed before the +# client. During deallocation of the extension class, the attributes are +# decref'd which can cause the client to get closed first if the file has the +# last remaining reference +cdef class _HdfsFileNanny: + cdef: + object client + object file_handle_ref + + def __cinit__(self, client, file_handle): + import weakref + self.client = client + self.file_handle_ref = weakref.ref(file_handle) + + def __dealloc__(self): + fh = self.file_handle_ref() + if fh: + fh.close() + # avoid cyclic GC + self.file_handle_ref = None + self.client = None + + +cdef class HdfsFile(NativeFile): + cdef readonly: + int32_t buffer_size + object mode + object parent + + cdef object __weakref__ + + def __dealloc__(self): + self.parent = None + +# ---------------------------------------------------------------------- +# File and stream readers and writers + +cdef class _StreamWriter: + cdef: + shared_ptr[CStreamWriter] writer + shared_ptr[OutputStream] sink + bint closed + + def __cinit__(self): + self.closed = True + + def __dealloc__(self): + if not self.closed: + self.close() + + def _open(self, sink, Schema schema): + get_writer(sink, &self.sink) + + with nogil: + check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema, + &self.writer)) + + self.closed = False + + def write_batch(self, RecordBatch batch): + with nogil: + check_status(self.writer.get() + .WriteRecordBatch(deref(batch.batch))) + + def close(self): + with nogil: + check_status(self.writer.get().Close()) + self.closed = True + + +cdef class _StreamReader: + cdef: + shared_ptr[CStreamReader] reader + + cdef readonly: + Schema schema + + def __cinit__(self): + pass + + def _open(self, source): + cdef: + shared_ptr[RandomAccessFile] reader + shared_ptr[InputStream] in_stream + + get_reader(source, &reader) + in_stream = <shared_ptr[InputStream]> reader + + with nogil: + check_status(CStreamReader.Open(in_stream, &self.reader)) + + self.schema = Schema() + self.schema.init_schema(self.reader.get().schema()) + + def get_next_batch(self): + """ + Read next RecordBatch from the stream. Raises StopIteration at end of + stream + """ + cdef shared_ptr[CRecordBatch] batch + + with nogil: + check_status(self.reader.get().GetNextRecordBatch(&batch)) + + if batch.get() == NULL: + raise StopIteration + + return batch_from_cbatch(batch) + + def read_all(self): + """ + Read all record batches as a pyarrow.Table + """ + cdef: + vector[shared_ptr[CRecordBatch]] batches + shared_ptr[CRecordBatch] batch + shared_ptr[CTable] table + + with nogil: + while True: + check_status(self.reader.get().GetNextRecordBatch(&batch)) + if batch.get() == NULL: + break + batches.push_back(batch) + + check_status(CTable.FromRecordBatches(batches, &table)) + + return table_from_ctable(table) + + +cdef class _FileWriter(_StreamWriter): + + def _open(self, sink, Schema schema): + cdef shared_ptr[CFileWriter] writer + get_writer(sink, &self.sink) + + with nogil: + check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema, + &writer)) + + # Cast to base class, because has same interface + self.writer = <shared_ptr[CStreamWriter]> writer + self.closed = False + + +cdef class _FileReader: + cdef: + shared_ptr[CFileReader] reader + + def __cinit__(self): + pass + + def _open(self, source, footer_offset=None): + cdef shared_ptr[RandomAccessFile] reader + get_reader(source, &reader) + + cdef int64_t offset = 0 + if footer_offset is not None: + offset = footer_offset + + with nogil: + if offset != 0: + check_status(CFileReader.Open2(reader, offset, &self.reader)) + else: + check_status(CFileReader.Open(reader, &self.reader)) + + property num_record_batches: + + def __get__(self): + return self.reader.get().num_record_batches() + + def get_batch(self, int i): + cdef shared_ptr[CRecordBatch] batch + + if i < 0 or i >= self.num_record_batches: + raise ValueError('Batch number {0} out of range'.format(i)) + + with nogil: + check_status(self.reader.get().GetRecordBatch(i, &batch)) + + return batch_from_cbatch(batch) + + # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of + # time has passed + get_record_batch = get_batch + + def read_all(self): + """ + Read all record batches as a pyarrow.Table + """ + cdef: + vector[shared_ptr[CRecordBatch]] batches + shared_ptr[CTable] table + int i, nbatches + + nbatches = self.num_record_batches + + batches.resize(nbatches) + with nogil: + for i in range(nbatches): + check_status(self.reader.get().GetRecordBatch(i, &batches[i])) + check_status(CTable.FromRecordBatches(batches, &table)) + + return table_from_ctable(table) + + +#---------------------------------------------------------------------- +# Implement legacy Feather file format + + +class FeatherError(Exception): + pass + + +cdef class FeatherWriter: + cdef: + unique_ptr[CFeatherWriter] writer + + cdef public: + int64_t num_rows + + def __cinit__(self): + self.num_rows = -1 + + def open(self, object dest): + cdef shared_ptr[OutputStream] sink + get_writer(dest, &sink) + + with nogil: + check_status(CFeatherWriter.Open(sink, &self.writer)) + + def close(self): + if self.num_rows < 0: + self.num_rows = 0 + self.writer.get().SetNumRows(self.num_rows) + check_status(self.writer.get().Finalize()) + + def write_array(self, object name, object col, object mask=None): + cdef Array arr + + if self.num_rows >= 0: + if len(col) != self.num_rows: + raise ValueError('prior column had a different number of rows') + else: + self.num_rows = len(col) + + if isinstance(col, Array): + arr = col + else: + arr = Array.from_numpy(col, mask=mask) + + cdef c_string c_name = tobytes(name) + + with nogil: + check_status( + self.writer.get().Append(c_name, deref(arr.sp_array))) + + +cdef class FeatherReader: + cdef: + unique_ptr[CFeatherReader] reader + + def __cinit__(self): + pass + + def open(self, source): + cdef shared_ptr[RandomAccessFile] reader + get_reader(source, &reader) + + with nogil: + check_status(CFeatherReader.Open(reader, &self.reader)) + + property num_rows: + + def __get__(self): + return self.reader.get().num_rows() + + property num_columns: + + def __get__(self): + return self.reader.get().num_columns() + + def get_column_name(self, int i): + cdef c_string name = self.reader.get().GetColumnName(i) + return frombytes(name) + + def get_column(self, int i): + if i < 0 or i >= self.num_columns: + raise IndexError(i) + + cdef shared_ptr[CColumn] sp_column + with nogil: + check_status(self.reader.get() + .GetColumn(i, &sp_column)) + + cdef Column col = Column() + col.init(sp_column) + return col + + +def get_tensor_size(Tensor tensor): + """ + Return total size of serialized Tensor including metadata and padding + """ + cdef int64_t size + with nogil: + check_status(GetTensorSize(deref(tensor.tp), &size)) + return size + + +def get_record_batch_size(RecordBatch batch): + """ + Return total size of serialized RecordBatch including metadata and padding + """ + cdef int64_t size + with nogil: + check_status(GetRecordBatchSize(deref(batch.batch), &size)) + return size + + +def write_tensor(Tensor tensor, NativeFile dest): + """ + Write pyarrow.Tensor to pyarrow.NativeFile object its current position + + Parameters + ---------- + tensor : pyarrow.Tensor + dest : pyarrow.NativeFile + + Returns + ------- + bytes_written : int + Total number of bytes written to the file + """ + cdef: + int32_t metadata_length + int64_t body_length + + dest._assert_writeable() + + with nogil: + check_status( + WriteTensor(deref(tensor.tp), dest.wr_file.get(), + &metadata_length, &body_length)) + + return metadata_length + body_length + + +def read_tensor(NativeFile source): + """ + Read pyarrow.Tensor from pyarrow.NativeFile object from current + position. If the file source supports zero copy (e.g. a memory map), then + this operation does not allocate any memory + + Parameters + ---------- + source : pyarrow.NativeFile + + Returns + ------- + tensor : Tensor + """ + cdef: + shared_ptr[CTensor] sp_tensor + + source._assert_writeable() + + cdef int64_t offset = source.tell() + with nogil: + check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor)) + + return box_tensor(sp_tensor)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_jemalloc.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_jemalloc.pyx b/python/pyarrow/_jemalloc.pyx new file mode 100644 index 0000000..3b41964 --- /dev/null +++ b/python/pyarrow/_jemalloc.pyx @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from pyarrow.includes.libarrow_jemalloc cimport CJemallocMemoryPool +from pyarrow._memory cimport MemoryPool + +def default_pool(): + cdef MemoryPool pool = MemoryPool() + pool.init(CJemallocMemoryPool.default_pool()) + return pool http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_memory.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/_memory.pxd b/python/pyarrow/_memory.pxd new file mode 100644 index 0000000..bb1af85 --- /dev/null +++ b/python/pyarrow/_memory.pxd @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool + + +cdef class MemoryPool: + cdef: + CMemoryPool* pool + + cdef init(self, CMemoryPool* pool) + +cdef class LoggingMemoryPool(MemoryPool): + pass + +cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_memory.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_memory.pyx b/python/pyarrow/_memory.pyx new file mode 100644 index 0000000..98dbf66 --- /dev/null +++ b/python/pyarrow/_memory.pyx @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool +from pyarrow.includes.pyarrow cimport set_default_memory_pool, get_memory_pool + +cdef class MemoryPool: + cdef init(self, CMemoryPool* pool): + self.pool = pool + + def bytes_allocated(self): + return self.pool.bytes_allocated() + +cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool): + if memory_pool is None: + return get_memory_pool() + else: + return memory_pool.pool + +cdef class LoggingMemoryPool(MemoryPool): + pass + +def default_pool(): + cdef: + MemoryPool pool = MemoryPool() + pool.init(get_memory_pool()) + return pool + +def set_default_pool(MemoryPool pool): + set_default_memory_pool(pool.pool) + +def total_allocated_bytes(): + cdef CMemoryPool* pool = get_memory_pool() + return pool.bytes_allocated() http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 079bf5e..5418e1d 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -20,20 +20,18 @@ # cython: embedsignature = True from cython.operator cimport dereference as deref - from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * cimport pyarrow.includes.pyarrow as pyarrow +from pyarrow._array cimport Array +from pyarrow._error cimport check_status +from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool +from pyarrow._table cimport Table, table_from_ctable +from pyarrow._io cimport NativeFile, get_reader, get_writer -from pyarrow.array cimport Array from pyarrow.compat import tobytes, frombytes -from pyarrow.error import ArrowException -from pyarrow.error cimport check_status -from pyarrow.io import NativeFile -from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool -from pyarrow.table cimport Table, table_from_ctable - -from pyarrow.io cimport NativeFile, get_reader, get_writer +from pyarrow._error import ArrowException +from pyarrow._io import NativeFile import six http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_table.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/_table.pxd b/python/pyarrow/_table.pxd new file mode 100644 index 0000000..e61e90d --- /dev/null +++ b/python/pyarrow/_table.pxd @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyarrow.includes.common cimport shared_ptr +from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable, + CRecordBatch) +from pyarrow._array cimport Schema + + +cdef class ChunkedArray: + cdef: + shared_ptr[CChunkedArray] sp_chunked_array + CChunkedArray* chunked_array + + cdef init(self, const shared_ptr[CChunkedArray]& chunked_array) + cdef _check_nullptr(self) + + +cdef class Column: + cdef: + shared_ptr[CColumn] sp_column + CColumn* column + + cdef init(self, const shared_ptr[CColumn]& column) + cdef _check_nullptr(self) + + +cdef class Table: + cdef: + shared_ptr[CTable] sp_table + CTable* table + + cdef init(self, const shared_ptr[CTable]& table) + cdef _check_nullptr(self) + + +cdef class RecordBatch: + cdef: + shared_ptr[CRecordBatch] sp_batch + CRecordBatch* batch + Schema _schema + + cdef init(self, const shared_ptr[CRecordBatch]& table) + cdef _check_nullptr(self) + +cdef object box_column(const shared_ptr[CColumn]& ccolumn) +cdef api object table_from_ctable(const shared_ptr[CTable]& ctable) +cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch) http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_table.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_table.pyx b/python/pyarrow/_table.pyx new file mode 100644 index 0000000..6558b2e --- /dev/null +++ b/python/pyarrow/_table.pyx @@ -0,0 +1,913 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from cython.operator cimport dereference as deref + +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.common cimport * +cimport pyarrow.includes.pyarrow as pyarrow +from pyarrow._array cimport (Array, box_array, wrap_array_output, + box_data_type, box_schema, DataType, Field) +from pyarrow._error cimport check_status +cimport cpython + +import pyarrow._config +from pyarrow._error import ArrowException +from pyarrow._array import field +from pyarrow.compat import frombytes, tobytes + + +from collections import OrderedDict + + +cdef _pandas(): + import pandas as pd + return pd + + +cdef class ChunkedArray: + """ + Array backed via one or more memory chunks. + + Warning + ------- + Do not call this class's constructor directly. + """ + + def __cinit__(self): + self.chunked_array = NULL + + cdef init(self, const shared_ptr[CChunkedArray]& chunked_array): + self.sp_chunked_array = chunked_array + self.chunked_array = chunked_array.get() + + cdef _check_nullptr(self): + if self.chunked_array == NULL: + raise ReferenceError("ChunkedArray object references a NULL " + "pointer. Not initialized.") + + def length(self): + self._check_nullptr() + return self.chunked_array.length() + + def __len__(self): + return self.length() + + @property + def null_count(self): + """ + Number of null entires + + Returns + ------- + int + """ + self._check_nullptr() + return self.chunked_array.null_count() + + @property + def num_chunks(self): + """ + Number of underlying chunks + + Returns + ------- + int + """ + self._check_nullptr() + return self.chunked_array.num_chunks() + + def chunk(self, i): + """ + Select a chunk by its index + + Parameters + ---------- + i : int + + Returns + ------- + pyarrow.array.Array + """ + self._check_nullptr() + return box_array(self.chunked_array.chunk(i)) + + def iterchunks(self): + for i in range(self.num_chunks): + yield self.chunk(i) + + def to_pylist(self): + """ + Convert to a list of native Python objects. + """ + result = [] + for i in range(self.num_chunks): + result += self.chunk(i).to_pylist() + return result + + +cdef class Column: + """ + Named vector of elements of equal type. + + Warning + ------- + Do not call this class's constructor directly. + """ + + def __cinit__(self): + self.column = NULL + + cdef init(self, const shared_ptr[CColumn]& column): + self.sp_column = column + self.column = column.get() + + @staticmethod + def from_array(object field_or_name, Array arr): + cdef Field boxed_field + + if isinstance(field_or_name, Field): + boxed_field = field_or_name + else: + boxed_field = field(field_or_name, arr.type) + + cdef shared_ptr[CColumn] sp_column + sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array)) + return box_column(sp_column) + + def to_pandas(self): + """ + Convert the arrow::Column to a pandas.Series + + Returns + ------- + pandas.Series + """ + cdef: + PyObject* out + + check_status(pyarrow.ConvertColumnToPandas(self.sp_column, + <PyObject*> self, &out)) + + return _pandas().Series(wrap_array_output(out), name=self.name) + + def equals(self, Column other): + """ + Check if contents of two columns are equal + + Parameters + ---------- + other : pyarrow.Column + + Returns + ------- + are_equal : boolean + """ + cdef: + CColumn* my_col = self.column + CColumn* other_col = other.column + c_bool result + + self._check_nullptr() + other._check_nullptr() + + with nogil: + result = my_col.Equals(deref(other_col)) + + return result + + def to_pylist(self): + """ + Convert to a list of native Python objects. + """ + return self.data.to_pylist() + + cdef _check_nullptr(self): + if self.column == NULL: + raise ReferenceError("Column object references a NULL pointer." + "Not initialized.") + + def __len__(self): + self._check_nullptr() + return self.column.length() + + def length(self): + self._check_nullptr() + return self.column.length() + + @property + def shape(self): + """ + Dimensions of this columns + + Returns + ------- + (int,) + """ + self._check_nullptr() + return (self.length(),) + + @property + def null_count(self): + """ + Number of null entires + + Returns + ------- + int + """ + self._check_nullptr() + return self.column.null_count() + + @property + def name(self): + """ + Label of the column + + Returns + ------- + str + """ + return bytes(self.column.name()).decode('utf8') + + @property + def type(self): + """ + Type information for this column + + Returns + ------- + pyarrow.schema.DataType + """ + return box_data_type(self.column.type()) + + @property + def data(self): + """ + The underlying data + + Returns + ------- + pyarrow.table.ChunkedArray + """ + cdef ChunkedArray chunked_array = ChunkedArray() + chunked_array.init(self.column.data()) + return chunked_array + + +cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema): + cdef: + Array arr + Column col + c_string c_name + vector[shared_ptr[CField]] fields + cdef shared_ptr[CDataType] type_ + + cdef int K = len(arrays) + + fields.resize(K) + + if len(arrays) == 0: + raise ValueError('Must pass at least one array') + + if isinstance(arrays[0], Array): + if names is None: + raise ValueError('Must pass names when constructing ' + 'from Array objects') + for i in range(K): + arr = arrays[i] + type_ = arr.type.sp_type + c_name = tobytes(names[i]) + fields[i].reset(new CField(c_name, type_, True)) + elif isinstance(arrays[0], Column): + for i in range(K): + col = arrays[i] + type_ = col.sp_column.get().type() + c_name = tobytes(col.name) + fields[i].reset(new CField(c_name, type_, True)) + else: + raise TypeError(type(arrays[0])) + + schema.reset(new CSchema(fields)) + + + +cdef _dataframe_to_arrays(df, timestamps_to_ms, Schema schema): + cdef: + list names = [] + list arrays = [] + DataType type = None + + for name in df.columns: + col = df[name] + if schema is not None: + type = schema.field_by_name(name).type + + arr = Array.from_numpy(col, type=type, + timestamps_to_ms=timestamps_to_ms) + names.append(name) + arrays.append(arr) + + return names, arrays + + +cdef class RecordBatch: + """ + Batch of rows of columns of equal length + + Warning + ------- + Do not call this class's constructor directly, use one of the ``from_*`` + methods instead. + """ + + def __cinit__(self): + self.batch = NULL + self._schema = None + + cdef init(self, const shared_ptr[CRecordBatch]& batch): + self.sp_batch = batch + self.batch = batch.get() + + cdef _check_nullptr(self): + if self.batch == NULL: + raise ReferenceError("Object not initialized") + + def __len__(self): + self._check_nullptr() + return self.batch.num_rows() + + @property + def num_columns(self): + """ + Number of columns + + Returns + ------- + int + """ + self._check_nullptr() + return self.batch.num_columns() + + @property + def num_rows(self): + """ + Number of rows + + Due to the definition of a RecordBatch, all columns have the same + number of rows. + + Returns + ------- + int + """ + return len(self) + + @property + def schema(self): + """ + Schema of the RecordBatch and its columns + + Returns + ------- + pyarrow.schema.Schema + """ + cdef Schema schema + self._check_nullptr() + if self._schema is None: + schema = Schema() + schema.init_schema(self.batch.schema()) + self._schema = schema + + return self._schema + + def __getitem__(self, i): + return box_array(self.batch.column(i)) + + def slice(self, offset=0, length=None): + """ + Compute zero-copy slice of this RecordBatch + + Parameters + ---------- + offset : int, default 0 + Offset from start of array to slice + length : int, default None + Length of slice (default is until end of batch starting from + offset) + + Returns + ------- + sliced : RecordBatch + """ + cdef shared_ptr[CRecordBatch] result + + if offset < 0: + raise IndexError('Offset must be non-negative') + + if length is None: + result = self.batch.Slice(offset) + else: + result = self.batch.Slice(offset, length) + + return batch_from_cbatch(result) + + def equals(self, RecordBatch other): + cdef: + CRecordBatch* my_batch = self.batch + CRecordBatch* other_batch = other.batch + c_bool result + + self._check_nullptr() + other._check_nullptr() + + with nogil: + result = my_batch.Equals(deref(other_batch)) + + return result + + def to_pydict(self): + """ + Converted the arrow::RecordBatch to an OrderedDict + + Returns + ------- + OrderedDict + """ + entries = [] + for i in range(self.batch.num_columns()): + name = bytes(self.batch.column_name(i)).decode('utf8') + column = self[i].to_pylist() + entries.append((name, column)) + return OrderedDict(entries) + + + def to_pandas(self, nthreads=None): + """ + Convert the arrow::RecordBatch to a pandas DataFrame + + Returns + ------- + pandas.DataFrame + """ + return Table.from_batches([self]).to_pandas(nthreads=nthreads) + + @classmethod + def from_pandas(cls, df, schema=None): + """ + Convert pandas.DataFrame to an Arrow RecordBatch + + Parameters + ---------- + df: pandas.DataFrame + schema: pyarrow.Schema (optional) + The expected schema of the RecordBatch. This can be used to + indicate the type of columns if we cannot infer it automatically. + + Returns + ------- + pyarrow.table.RecordBatch + """ + names, arrays = _dataframe_to_arrays(df, False, schema) + return cls.from_arrays(arrays, names) + + @staticmethod + def from_arrays(arrays, names): + """ + Construct a RecordBatch from multiple pyarrow.Arrays + + Parameters + ---------- + arrays: list of pyarrow.Array + column-wise data vectors + names: list of str + Labels for the columns + + Returns + ------- + pyarrow.table.RecordBatch + """ + cdef: + Array arr + c_string c_name + shared_ptr[CSchema] schema + shared_ptr[CRecordBatch] batch + vector[shared_ptr[CArray]] c_arrays + int64_t num_rows + + if len(arrays) == 0: + raise ValueError('Record batch cannot contain no arrays (for now)') + + num_rows = len(arrays[0]) + _schema_from_arrays(arrays, names, &schema) + + for i in range(len(arrays)): + arr = arrays[i] + c_arrays.push_back(arr.sp_array) + + batch.reset(new CRecordBatch(schema, num_rows, c_arrays)) + return batch_from_cbatch(batch) + + +cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads): + cdef: + PyObject* result_obj + CColumn* col + int i + + import pandas.core.internals as _int + from pandas import RangeIndex, Categorical + from pyarrow.compat import DatetimeTZDtype + + with nogil: + check_status(pyarrow.ConvertTableToPandas(table, nthreads, + &result_obj)) + + result = PyObject_to_object(result_obj) + + blocks = [] + for item in result: + block_arr = item['block'] + placement = item['placement'] + if 'dictionary' in item: + cat = Categorical(block_arr, + categories=item['dictionary'], + ordered=False, fastpath=True) + block = _int.make_block(cat, placement=placement, + klass=_int.CategoricalBlock, + fastpath=True) + elif 'timezone' in item: + dtype = DatetimeTZDtype('ns', tz=item['timezone']) + block = _int.make_block(block_arr, placement=placement, + klass=_int.DatetimeTZBlock, + dtype=dtype, fastpath=True) + else: + block = _int.make_block(block_arr, placement=placement) + blocks.append(block) + + names = [] + for i in range(table.get().num_columns()): + col = table.get().column(i).get() + names.append(frombytes(col.name())) + + axes = [names, RangeIndex(table.get().num_rows())] + return _int.BlockManager(blocks, axes) + + +cdef class Table: + """ + A collection of top-level named, equal length Arrow arrays. + + Warning + ------- + Do not call this class's constructor directly, use one of the ``from_*`` + methods instead. + """ + + def __cinit__(self): + self.table = NULL + + def __repr__(self): + return 'pyarrow.Table\n{0}'.format(str(self.schema)) + + cdef init(self, const shared_ptr[CTable]& table): + self.sp_table = table + self.table = table.get() + + cdef _check_nullptr(self): + if self.table == NULL: + raise ReferenceError("Table object references a NULL pointer." + "Not initialized.") + + def equals(self, Table other): + """ + Check if contents of two tables are equal + + Parameters + ---------- + other : pyarrow.Table + + Returns + ------- + are_equal : boolean + """ + cdef: + CTable* my_table = self.table + CTable* other_table = other.table + c_bool result + + self._check_nullptr() + other._check_nullptr() + + with nogil: + result = my_table.Equals(deref(other_table)) + + return result + + @classmethod + def from_pandas(cls, df, timestamps_to_ms=False, schema=None): + """ + Convert pandas.DataFrame to an Arrow Table + + Parameters + ---------- + df: pandas.DataFrame + + timestamps_to_ms: bool + Convert datetime columns to ms resolution. This is needed for + compability with other functionality like Parquet I/O which + only supports milliseconds. + + schema: pyarrow.Schema (optional) + The expected schema of the Arrow Table. This can be used to + indicate the type of columns if we cannot infer it automatically. + + Returns + ------- + pyarrow.table.Table + + Examples + -------- + + >>> import pandas as pd + >>> import pyarrow as pa + >>> df = pd.DataFrame({ + ... 'int': [1, 2], + ... 'str': ['a', 'b'] + ... }) + >>> pa.Table.from_pandas(df) + <pyarrow.table.Table object at 0x7f05d1fb1b40> + """ + names, arrays = _dataframe_to_arrays(df, + timestamps_to_ms=timestamps_to_ms, + schema=schema) + return cls.from_arrays(arrays, names=names) + + @staticmethod + def from_arrays(arrays, names=None): + """ + Construct a Table from Arrow arrays or columns + + Parameters + ---------- + arrays: list of pyarrow.Array or pyarrow.Column + Equal-length arrays that should form the table. + names: list of str, optional + Names for the table columns. If Columns passed, will be + inferred. If Arrays passed, this argument is required + + Returns + ------- + pyarrow.table.Table + + """ + cdef: + vector[shared_ptr[CField]] fields + vector[shared_ptr[CColumn]] columns + shared_ptr[CSchema] schema + shared_ptr[CTable] table + + _schema_from_arrays(arrays, names, &schema) + + cdef int K = len(arrays) + columns.resize(K) + + for i in range(K): + if isinstance(arrays[i], Array): + columns[i].reset(new CColumn(schema.get().field(i), + (<Array> arrays[i]).sp_array)) + elif isinstance(arrays[i], Column): + columns[i] = (<Column> arrays[i]).sp_column + else: + raise ValueError(type(arrays[i])) + + table.reset(new CTable(schema, columns)) + return table_from_ctable(table) + + @staticmethod + def from_batches(batches): + """ + Construct a Table from a list of Arrow RecordBatches + + Parameters + ---------- + + batches: list of RecordBatch + RecordBatch list to be converted, schemas must be equal + """ + cdef: + vector[shared_ptr[CRecordBatch]] c_batches + shared_ptr[CTable] c_table + RecordBatch batch + + for batch in batches: + c_batches.push_back(batch.sp_batch) + + with nogil: + check_status(CTable.FromRecordBatches(c_batches, &c_table)) + + return table_from_ctable(c_table) + + def to_pandas(self, nthreads=None): + """ + Convert the arrow::Table to a pandas DataFrame + + Parameters + ---------- + nthreads : int, default max(1, multiprocessing.cpu_count() / 2) + For the default, we divide the CPU count by 2 because most modern + computers have hyperthreading turned on, so doubling the CPU count + beyond the number of physical cores does not help + + Returns + ------- + pandas.DataFrame + """ + if nthreads is None: + nthreads = pyarrow._config.cpu_count() + + mgr = table_to_blockmanager(self.sp_table, nthreads) + return _pandas().DataFrame(mgr) + + def to_pydict(self): + """ + Converted the arrow::Table to an OrderedDict + + Returns + ------- + OrderedDict + """ + entries = [] + for i in range(self.table.num_columns()): + name = self.column(i).name + column = self.column(i).to_pylist() + entries.append((name, column)) + return OrderedDict(entries) + + @property + def schema(self): + """ + Schema of the table and its columns + + Returns + ------- + pyarrow.schema.Schema + """ + return box_schema(self.table.schema()) + + def column(self, index): + """ + Select a column by its numeric index. + + Parameters + ---------- + index: int + + Returns + ------- + pyarrow.table.Column + """ + self._check_nullptr() + cdef Column column = Column() + column.init(self.table.column(index)) + return column + + def __getitem__(self, i): + return self.column(i) + + def itercolumns(self): + """ + Iterator over all columns in their numerical order + """ + for i in range(self.num_columns): + yield self.column(i) + + @property + def num_columns(self): + """ + Number of columns in this table + + Returns + ------- + int + """ + self._check_nullptr() + return self.table.num_columns() + + @property + def num_rows(self): + """ + Number of rows in this table. + + Due to the definition of a table, all columns have the same number of rows. + + Returns + ------- + int + """ + self._check_nullptr() + return self.table.num_rows() + + def __len__(self): + return self.num_rows + + @property + def shape(self): + """ + Dimensions of the table: (#rows, #columns) + + Returns + ------- + (int, int) + """ + return (self.num_rows, self.num_columns) + + def add_column(self, int i, Column column): + """ + Add column to Table at position. Returns new table + """ + cdef: + shared_ptr[CTable] c_table + + with nogil: + check_status(self.table.AddColumn(i, column.sp_column, &c_table)) + + return table_from_ctable(c_table) + + def append_column(self, Column column): + """ + Append column at end of columns. Returns new table + """ + return self.add_column(self.num_columns, column) + + def remove_column(self, int i): + """ + Create new Table with the indicated column removed + """ + cdef shared_ptr[CTable] c_table + + with nogil: + check_status(self.table.RemoveColumn(i, &c_table)) + + return table_from_ctable(c_table) + + +def concat_tables(tables): + """ + Perform zero-copy concatenation of pyarrow.Table objects. Raises exception + if all of the Table schemas are not the same + + Parameters + ---------- + tables : iterable of pyarrow.Table objects + output_name : string, default None + A name for the output table, if any + """ + cdef: + vector[shared_ptr[CTable]] c_tables + shared_ptr[CTable] c_result + Table table + + for table in tables: + c_tables.push_back(table.sp_table) + + with nogil: + check_status(ConcatenateTables(c_tables, &c_result)) + + return table_from_ctable(c_result) + + +cdef object box_column(const shared_ptr[CColumn]& ccolumn): + cdef Column column = Column() + column.init(ccolumn) + return column + + +cdef api object table_from_ctable(const shared_ptr[CTable]& ctable): + cdef Table table = Table() + table.init(ctable) + return table + + +cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch): + cdef RecordBatch batch = RecordBatch() + batch.init(cbatch) + return batch http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/array.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/array.pxd b/python/pyarrow/array.pxd deleted file mode 100644 index 3ba4871..0000000 --- a/python/pyarrow/array.pxd +++ /dev/null @@ -1,141 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from pyarrow.includes.common cimport shared_ptr, int64_t -from pyarrow.includes.libarrow cimport CArray, CTensor - -from pyarrow.scalar import NA - -from pyarrow.schema cimport DataType - -from cpython cimport PyObject - - -cdef extern from "Python.h": - int PySlice_Check(object) - - -cdef class Array: - cdef: - shared_ptr[CArray] sp_array - CArray* ap - - cdef readonly: - DataType type - - cdef init(self, const shared_ptr[CArray]& sp_array) - cdef getitem(self, int64_t i) - - -cdef class Tensor: - cdef: - shared_ptr[CTensor] sp_tensor - CTensor* tp - - cdef readonly: - DataType type - - cdef init(self, const shared_ptr[CTensor]& sp_tensor) - - -cdef object box_array(const shared_ptr[CArray]& sp_array) -cdef object box_tensor(const shared_ptr[CTensor]& sp_tensor) - - -cdef class BooleanArray(Array): - pass - - -cdef class NumericArray(Array): - pass - - -cdef class IntegerArray(NumericArray): - pass - - -cdef class FloatingPointArray(NumericArray): - pass - - -cdef class Int8Array(IntegerArray): - pass - - -cdef class UInt8Array(IntegerArray): - pass - - -cdef class Int16Array(IntegerArray): - pass - - -cdef class UInt16Array(IntegerArray): - pass - - -cdef class Int32Array(IntegerArray): - pass - - -cdef class UInt32Array(IntegerArray): - pass - - -cdef class Int64Array(IntegerArray): - pass - - -cdef class UInt64Array(IntegerArray): - pass - - -cdef class FloatArray(FloatingPointArray): - pass - - -cdef class DoubleArray(FloatingPointArray): - pass - - -cdef class FixedSizeBinaryArray(Array): - pass - - -cdef class DecimalArray(FixedSizeBinaryArray): - pass - - -cdef class ListArray(Array): - pass - - -cdef class StringArray(Array): - pass - - -cdef class BinaryArray(Array): - pass - - -cdef class DictionaryArray(Array): - cdef: - object _indices, _dictionary - - - -cdef wrap_array_output(PyObject* output)