Sorry for the delayed reply. I've tried to answer to the best of my ability.
Also, from the use-case it sounds like this is something Plasma (no longer maintained in Arrow but is part of Ray) accomplishes through a separate server IIRC. 1. Is there a way to avoid the temporary buffer? Like, could I allocate > straight into a bigger slab of shared memory, or something? I believe this should work and then you can write directly to buf through arrow libraries. > 2. Relatedly, is there a way to predict the IPC-serialized message size > without actually serializing it? I'm thinking of something like > `pa.ipc.get_record_batch_size(batch)`. This functionality does exist in C++ [1][2] but i'm not sure if it is surfaced through the python APIs I don't think there is a reason to not do this. IIRC the implementation here does some amount of the Flatbuffer serialization to get accurate calculations but it is still cheaper then copying all of the buffers. 3. Is it possible to write a pyarrow.MemoryAllocator in pure Python? I > don't see obvious "allocate" and "deallocate" methods to hook into. I'm not sure about this. 4. Are there other reasons this might be unsafe? In general, shared memory should work as memory mapping was one of the design criteria's here. Cheers, Micah [1] https://github.com/apache/arrow/blob/main/cpp/src/arrow/ipc/writer.h#L385 [2] https://github.com/apache/arrow/blob/main/cpp/src/arrow/ipc/writer.h#L260 On Thu, Aug 17, 2023 at 2:21 PM Spencer Nelson <[email protected]> wrote: > I'm working with some large-ish datasets that I want to share across a > Python multiprocessing worker pool. Is multiprocessing.shared_memory safe > for this job? I have a few questions about it. > > Here's a sketch: > ``` > import multiprocessing.sharedmemory > import pyarrow as pa > > # Make some data in a RecordBatch > data = pa.array([1, 2, 3, 4, 5]) > batch = pa.record_batch(data, names=["x"]) > > # Write it to a temporary buffer > sink = pa.BufferOutputStream() > with pa.ipc.new_stream(sink, batch.schema) as writer: > writer.write_batch(batch) > buf = sink.getvalue() > > # Allocate some shared memory > shm = multiprocessing.shared_memory.SharedMemory("arrow_shmem_1", > size=buf.size, create=True) > > # Copy the buffer into shared memory > shm.buf[:buf.size] = buf.to_pybytes() > ``` > > Then, in a separate process: > > ``` > import multiprocessing.sharedmemory > import pyarrow as pa > > # Connect to the existing shared memory > shm = multiprocessing.shared_memory.SharedMemory("arrow_shmem_1", > create=False) > > # Read a batch out > r = pa.ipc.RecordBatchStreamReader(source=shm.buf.obj) > result = r.read_all() > print(result) > ``` > > That second process prints the expected: > > ``` > pyarrow.Table > x: int64 > ---- > x: [[1,2,3,4,5]] > ``` > > I have a few questions about this. > > 1. Is there a way to avoid the temporary buffer? Like, could I allocate > straight into a bigger slab of shared memory, or something? > > 2. Relatedly, is there a way to predict the IPC-serialized message size > without actually serializing it? I'm thinking of something like > `pa.ipc.get_record_batch_size(batch)`. > > 3. Is it possible to write a pyarrow.MemoryAllocator in pure Python? I > don't see obvious "allocate" and "deallocate" methods to hook into. > > 4. Are there other reasons this might be unsafe? > > Thanks, > Spencer >
