IIRC header_length does not include those first 8 bytes so there's an offset
needed in all slices here.
Also, the first FlightData is expected to contain the schema.
On Tue, Feb 8, 2022, at 16:32, R KB wrote:
> Yeah, I think getting into the voided warranty domain.
>
> I've done this:
>
> class FlightService(flight_grpc.FlightServiceServicer):
> def DoGet(self, request, ctx) -> Iterator[flight_proto.FlightData]:
> tbl = pyarrow.Table.from_pydict({"hello_world": [1,2,3]})
> batches = tbl.to_batches()
> buf = batches[0].serialize()
>
> buffer_ = buf.to_pybytes()
> header_length = struct.unpack("<I", buffer_[4:8])[0]
>
> yield flight_proto.FlightData(
> flight_descriptor=flight_proto.FlightDescriptor(),
> data_header=buffer_[8:header_length],
> data_body=buffer_[header_length+8:]
> )
>
>
>
> Which looks kinda right... but I'm getting:
>
> FlightInternalError: Server never sent a data message
>
> When I try to request with the flight client.
>>
>
>
> On Tue, 8 Feb 2022 at 20:58, David Li <[email protected]> wrote:
>> __
>> Ah, right. You'd first serialize the batch to a byte buffer, then read the
>> flatbuffer length from the second 4 bytes (since the first 4 bytes are the
>> IPC continuation token), slice the buffer, and use that as the message
>> header, and the remainder of the buffer as the message body. (But this is
>> starting to get into "your warranty is void"/"double-check with the Arrow
>> specification" territory.)
>>
>> -David
>>
>> On Tue, Feb 8, 2022, at 15:25, R KB wrote:
>>> I'm confident I can do that with a RecordBatchWriter and BytesIO object
>>> that I can the value of., however what do I do about the message header?
>>>
>>> On Tue, 8 Feb 2022 at 19:17, David Li <[email protected]> wrote:
>>>> __
>>>> No, you'd have to serialize the buffer, chop off the first 8 bytes
>>>> yourself, and generate the Protobuf.
>>>>
>>>> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>>>>> Thanks for that; hopefully those links should be fruitful.
>>>>>
>>>>> Question, before I get started, is there an API exposed so that I could
>>>>> do the reverse of this:
>>>>>
>>>>> `import asyncio
>>>>> import pathlib
>>>>> import struct
>>>>>
>>>>> import grpc
>>>>> import pyarrow as pa
>>>>> import pyarrow.flight as pf
>>>>>
>>>>> import Flight_pb2, Flight_pb2_grpc
>>>>>
>>>>> async def main():
>>>>> ticket = pf.Ticket("tick")
>>>>> async with grpc.aio.insecure_channel("localhost:1234") as channel:
>>>>> stub = Flight_pb2_grpc.FlightServiceStub(channel)
>>>>> schema = None
>>>>> async for data in
>>>>> stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>>>>> # 4 bytes: Need IPC continuation token
>>>>> token = b'\xff\xff\xff\xff'
>>>>> # 4 bytes: message length (little-endian)
>>>>> length = struct.pack('<I', len(data.data_header))
>>>>> buf = pa.py_buffer(token + length + data.data_header +
>>>>> data.data_body)
>>>>> message = pa.ipc.read_message(buf)
>>>>> print(message)
>>>>> if schema is None:
>>>>> # This should work but is unimplemented
>>>>> # print(pa.ipc.read_schema(message))
>>>>> schema = pa.ipc.read_schema(buf)
>>>>> print(schema)
>>>>> else:
>>>>> batch = pa.ipc.read_record_batch(message, schema)
>>>>> print(batch)
>>>>> print(batch.to_pydict())
>>>>>
>>>>> asyncio.run(main())`
>>>>>
>>>>> On Tue, 8 Feb 2022 at 18:33, David Li <[email protected]> wrote:
>>>>>> __
>>>>>> Unfortunately Flight wraps the C++ Flight implementation, which uses
>>>>>> gRPC/C++, which is mostly a separate library entirely from grpcio and
>>>>>> does not benefit from any improvements there. (The two do share a common
>>>>>> network stack, but that's all; also, grpcio doesn't expose any of the
>>>>>> lower level APIs that might make it possible to combine the two somehow.)
>>>>>>
>>>>>> You might ask why pyarrow.flight didn't use grpcio directly (with
>>>>>> bindings to transmute from FlightData to RecordBatch). However at the
>>>>>> time the thought is that we would also have non-gRPC transports (which
>>>>>> are finally being worked on) and so a from-scratch grpcio/Python
>>>>>> implementation was not desirable.
>>>>>>
>>>>>> That said there are issues filed about better documenting FlightData.
>>>>>> See ARROW-15287[1] which links a StackOverflow answer that demonstrates
>>>>>> how to glue together asyncio/grpcio/PyArrow.
>>>>>>
>>>>>> There's also some previous discussion about adding async to Flight more
>>>>>> generally [2].
>>>>>>
>>>>>> [1]: https://issues.apache.org/jira/browse/ARROW-15287
>>>>>> [2]: "[C++] Async Arrow Flight" 2021/06/02
>>>>>> https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>>>>>>
>>>>>> -David
>>>>>>
>>>>>> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>>>>>>> GRPC has pretty good AsyncIO support at this point, and since Flight is
>>>>>>> essentially a wrapper around some GRPC types: why can't we just expose
>>>>>>> something that generates FlightData grpc objects?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>