Ah, right. You'd first serialize the batch to a byte buffer, then read the 
flatbuffer length from the second 4 bytes (since the first 4 bytes are the IPC 
continuation token), slice the buffer, and use that as the message header, and 
the remainder of the buffer as the message body. (But this is starting to get 
into "your warranty is void"/"double-check with the Arrow specification" 
territory.)

-David

On Tue, Feb 8, 2022, at 15:25, R KB wrote:
> I'm confident I can do that with a RecordBatchWriter and BytesIO object that 
> I can the value of., however what do I do about the message header?
> 
> On Tue, 8 Feb 2022 at 19:17, David Li <[email protected]> wrote:
>> __
>> No, you'd have to serialize the buffer, chop off the first 8 bytes yourself, 
>> and generate the Protobuf.
>> 
>> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>>> Thanks for that; hopefully those links should be fruitful.
>>> 
>>> Question, before I get started, is there an API exposed so that I could do 
>>> the reverse of this: 
>>> 
>>> `import asyncio
>>> import pathlib
>>> import struct
>>> 
>>> import grpc
>>> import pyarrow as pa
>>> import pyarrow.flight as pf
>>> 
>>> import Flight_pb2, Flight_pb2_grpc
>>> 
>>> async def main():
>>>     ticket = pf.Ticket("tick")
>>>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>>>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>>>         schema = None
>>>         async for data in 
>>> stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>>>             # 4 bytes: Need IPC continuation token
>>>             token = b'\xff\xff\xff\xff'
>>>             # 4 bytes: message length (little-endian)
>>>             length = struct.pack('<I', len(data.data_header))
>>>             buf = pa.py_buffer(token + length + data.data_header + 
>>> data.data_body)
>>>             message = pa.ipc.read_message(buf)
>>>             print(message)
>>>             if schema is None:
>>>                 # This should work but is unimplemented
>>>                 # print(pa.ipc.read_schema(message))
>>>                 schema = pa.ipc.read_schema(buf)
>>>                 print(schema)
>>>             else:
>>>                 batch = pa.ipc.read_record_batch(message, schema)
>>>                 print(batch)
>>>                 print(batch.to_pydict())
>>> 
>>> asyncio.run(main())`
>>> 
>>> On Tue, 8 Feb 2022 at 18:33, David Li <[email protected]> wrote:
>>>> __
>>>> Unfortunately Flight wraps the C++ Flight implementation, which uses 
>>>> gRPC/C++, which is mostly a separate library entirely from grpcio and does 
>>>> not benefit from any improvements there. (The two do share a common 
>>>> network stack, but that's all; also, grpcio doesn't expose any of the 
>>>> lower level APIs that might make it possible to combine the two somehow.)
>>>> 
>>>> You might ask why pyarrow.flight didn't use grpcio directly (with bindings 
>>>> to transmute from FlightData to RecordBatch). However at the time the 
>>>> thought is that we would also have non-gRPC transports (which are finally 
>>>> being worked on) and so a from-scratch grpcio/Python implementation was 
>>>> not desirable. 
>>>> 
>>>> That said there are issues filed about better documenting FlightData. See 
>>>> ARROW-15287[1] which links a StackOverflow answer that demonstrates how to 
>>>> glue together asyncio/grpcio/PyArrow.
>>>> 
>>>> There's also some previous discussion about adding async to Flight more 
>>>> generally [2].
>>>> 
>>>> [1]: https://issues.apache.org/jira/browse/ARROW-15287
>>>> [2]: "[C++] Async Arrow Flight" 2021/06/02 
>>>> https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>>>> 
>>>> -David
>>>> 
>>>> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>>>>> GRPC has pretty good AsyncIO support at this point, and since Flight is 
>>>>> essentially a wrapper around some GRPC types: why can't we just expose 
>>>>> something that generates FlightData grpc objects?
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>> 

Reply via email to