It seems like you could potentially try using pyarrow.ipc.read_message and 
pyarrow.ipc.read_record_batch to read individual messages from the appropriate 
stream type. I've never played with either function, so I can't help with their 
usage and details (sorry!)



# ------------------------------

# Aldrin


https://github.com/drin/

https://gitlab.com/octalene

https://keybase.io/octalene


On Monday, April 15th, 2024 at 17:17, Aldrin <[email protected]> wrote:

> I could be wrong (there may have been many changes since I last experimented 
> with IPC API), but in my experience this issue happens when I have mixed up 
> IPC streaming types (feather/file vs in-memory).
> 

> I believe pyarrow.ipc.new_stream and open_stream are essentially feather 
> stream format, as opposed to pyarrow.ipc.RecordBatchStreamReader and 
> pyarrow.ipc.RecordBatchStreamWriter which are "in-memory". The basic 
> difference being that feather and and "in-memory" have different headers. The 
> main point is that whichever method you're using for writing the stream, you 
> should double check you're using the associated method for reading the same 
> type of stream.
> 

> For sending over sockets I recommend using the RecordBatchStream equivalents. 
> If you're writing to (or reading from) files on either end, then you'll need 
> to do a bit of playing around to see what functions are ergonomic and 
> efficient.
> 

> 

> 

> # ------------------------------
> 

> # Aldrin
> 

> 

> https://github.com/drin/
> 

> https://gitlab.com/octalene
> 

> https://keybase.io/octalene
> 

> 

> On Monday, April 15th, 2024 at 13:42, Kevin Liu <[email protected]> 
> wrote:
> 

> > What is the code used to send bytes over the wire?
> > My hunch is that there's an issue from the sending side which caused the 
> > bytes to be smaller than expected.
> > 

> > The example in the doc constructed a writer using a provided Schema.
> > ```
> > sink = pa.BufferOutputStream()
> > 

> > with pa.ipc.new_stream(sink, batch.schema) as writer:
> > for i in range(5):
> > writer.write_batch(batch)
> > ```
> > 

> > On Mon, Apr 15, 2024 at 10:56 AM Amanda Weirich <[email protected]> 
> > wrote:
> > 

> > > When I try this I receive the following error:
> > > 

> > > Expected to be able to read 824 bytes for message body, got 384
> > > 

> > > I'm assuming this is because the expected schema is missing?
> > > 

> > > 

> > > On Mon, Apr 15, 2024 at 1:49 PM Kevin Liu <[email protected]> wrote:
> > > 

> > > > From the example in the Streaming, Serialization, and IPC doc, it looks 
> > > > like you don't need to create/open a stream with a schema, the schema 
> > > > can be inferred from the RecordBatchStreamReader object.
> > > > 

> > > > ```
> > > > with pa.ipc.open_stream(buf) as reader:
> > > > schema = reader.schema
> > > > batches = [b for b in reader]
> > > > ```
> > > > 

> > > > On Mon, Apr 15, 2024 at 10:35 AM Amanda Weirich <[email protected]> 
> > > > wrote:
> > > > 

> > > > > Hello,
> > > > > I have an incoming arrow record batch without a schema attached 
> > > > > coming in over a UDP port as buf.to_pybytes. We dont want to attach 
> > > > > the schema because the schema is already known. So in my receive 
> > > > > script I create my schema, and I am trying to create an arrow stream 
> > > > > reader where I pass in the batch and the schema, but it says only one 
> > > > > argument can be accepted (meaning it only expected to see the stream 
> > > > > and not the schema):
> > > > > 

> > > > > # Receive data
> > > > > received_data, addr = sock.recvfrom(1024)
> > > > > # make byte array
> > > > > byte_stream = bytearray(received_data)
> > > > > # Create a BytesIO object from the received data
> > > > > stream = io.BytesIO(byte_stream)
> > > > > 

> > > > > schema = create_schema()
> > > > > reader = pa.ipc.open_stream(stream, schema=schema)
> > > > > 

> > > > > How can I create an arrow stream reader for this use case?
> > > > > 

> > > > > Thank you in advance!
> > > > > 

> > > > > Amanda

Attachment: publickey - [email protected] - 0x21969656.asc
Description: application/pgp-keys

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to