It seems like you could potentially try using pyarrow.ipc.read_message and pyarrow.ipc.read_record_batch to read individual messages from the appropriate stream type. I've never played with either function, so I can't help with their usage and details (sorry!)
# ------------------------------ # Aldrin https://github.com/drin/ https://gitlab.com/octalene https://keybase.io/octalene On Monday, April 15th, 2024 at 17:17, Aldrin <[email protected]> wrote: > I could be wrong (there may have been many changes since I last experimented > with IPC API), but in my experience this issue happens when I have mixed up > IPC streaming types (feather/file vs in-memory). > > I believe pyarrow.ipc.new_stream and open_stream are essentially feather > stream format, as opposed to pyarrow.ipc.RecordBatchStreamReader and > pyarrow.ipc.RecordBatchStreamWriter which are "in-memory". The basic > difference being that feather and and "in-memory" have different headers. The > main point is that whichever method you're using for writing the stream, you > should double check you're using the associated method for reading the same > type of stream. > > For sending over sockets I recommend using the RecordBatchStream equivalents. > If you're writing to (or reading from) files on either end, then you'll need > to do a bit of playing around to see what functions are ergonomic and > efficient. > > > > # ------------------------------ > > # Aldrin > > > https://github.com/drin/ > > https://gitlab.com/octalene > > https://keybase.io/octalene > > > On Monday, April 15th, 2024 at 13:42, Kevin Liu <[email protected]> > wrote: > > > What is the code used to send bytes over the wire? > > My hunch is that there's an issue from the sending side which caused the > > bytes to be smaller than expected. > > > > The example in the doc constructed a writer using a provided Schema. > > ``` > > sink = pa.BufferOutputStream() > > > > with pa.ipc.new_stream(sink, batch.schema) as writer: > > for i in range(5): > > writer.write_batch(batch) > > ``` > > > > On Mon, Apr 15, 2024 at 10:56 AM Amanda Weirich <[email protected]> > > wrote: > > > > > When I try this I receive the following error: > > > > > > Expected to be able to read 824 bytes for message body, got 384 > > > > > > I'm assuming this is because the expected schema is missing? > > > > > > > > > On Mon, Apr 15, 2024 at 1:49 PM Kevin Liu <[email protected]> wrote: > > > > > > > From the example in the Streaming, Serialization, and IPC doc, it looks > > > > like you don't need to create/open a stream with a schema, the schema > > > > can be inferred from the RecordBatchStreamReader object. > > > > > > > > ``` > > > > with pa.ipc.open_stream(buf) as reader: > > > > schema = reader.schema > > > > batches = [b for b in reader] > > > > ``` > > > > > > > > On Mon, Apr 15, 2024 at 10:35 AM Amanda Weirich <[email protected]> > > > > wrote: > > > > > > > > > Hello, > > > > > I have an incoming arrow record batch without a schema attached > > > > > coming in over a UDP port as buf.to_pybytes. We dont want to attach > > > > > the schema because the schema is already known. So in my receive > > > > > script I create my schema, and I am trying to create an arrow stream > > > > > reader where I pass in the batch and the schema, but it says only one > > > > > argument can be accepted (meaning it only expected to see the stream > > > > > and not the schema): > > > > > > > > > > # Receive data > > > > > received_data, addr = sock.recvfrom(1024) > > > > > # make byte array > > > > > byte_stream = bytearray(received_data) > > > > > # Create a BytesIO object from the received data > > > > > stream = io.BytesIO(byte_stream) > > > > > > > > > > schema = create_schema() > > > > > reader = pa.ipc.open_stream(stream, schema=schema) > > > > > > > > > > How can I create an arrow stream reader for this use case? > > > > > > > > > > Thank you in advance! > > > > > > > > > > Amanda
publickey - [email protected] - 0x21969656.asc
Description: application/pgp-keys
signature.asc
Description: OpenPGP digital signature
