IIRC header_length does not include those first 8 bytes so there's an offset 
needed in all slices here.

Also, the first FlightData is expected to contain the schema.

On Tue, Feb 8, 2022, at 16:32, R KB wrote:
> Yeah, I think getting into the voided warranty domain.
> 
> I've done this:
> 
> class FlightService(flight_grpc.FlightServiceServicer):
>     def DoGet(self, request, ctx) -> Iterator[flight_proto.FlightData]:
>         tbl = pyarrow.Table.from_pydict({"hello_world": [1,2,3]})
>         batches = tbl.to_batches()
>         buf = batches[0].serialize()
> 
>         buffer_ = buf.to_pybytes()
>         header_length = struct.unpack("<I", buffer_[4:8])[0]
> 
>         yield flight_proto.FlightData(
>             flight_descriptor=flight_proto.FlightDescriptor(),
>             data_header=buffer_[8:header_length],
>             data_body=buffer_[header_length+8:]
>         )
> 
> 
> 
> Which looks kinda right... but I'm getting: 
> 
> FlightInternalError: Server never sent a data message
> 
> When I try to request with the flight client.
>> 
> 
> 
> On Tue, 8 Feb 2022 at 20:58, David Li <[email protected]> wrote:
>> __
>> Ah, right. You'd first serialize the batch to a byte buffer, then read the 
>> flatbuffer length from the second 4 bytes (since the first 4 bytes are the 
>> IPC continuation token), slice the buffer, and use that as the message 
>> header, and the remainder of the buffer as the message body. (But this is 
>> starting to get into "your warranty is void"/"double-check with the Arrow 
>> specification" territory.)
>> 
>> -David
>> 
>> On Tue, Feb 8, 2022, at 15:25, R KB wrote:
>>> I'm confident I can do that with a RecordBatchWriter and BytesIO object 
>>> that I can the value of., however what do I do about the message header?
>>> 
>>> On Tue, 8 Feb 2022 at 19:17, David Li <[email protected]> wrote:
>>>> __
>>>> No, you'd have to serialize the buffer, chop off the first 8 bytes 
>>>> yourself, and generate the Protobuf.
>>>> 
>>>> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>>>>> Thanks for that; hopefully those links should be fruitful.
>>>>> 
>>>>> Question, before I get started, is there an API exposed so that I could 
>>>>> do the reverse of this: 
>>>>> 
>>>>> `import asyncio
>>>>> import pathlib
>>>>> import struct
>>>>> 
>>>>> import grpc
>>>>> import pyarrow as pa
>>>>> import pyarrow.flight as pf
>>>>> 
>>>>> import Flight_pb2, Flight_pb2_grpc
>>>>> 
>>>>> async def main():
>>>>>     ticket = pf.Ticket("tick")
>>>>>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>>>>>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>>>>>         schema = None
>>>>>         async for data in 
>>>>> stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>>>>>             # 4 bytes: Need IPC continuation token
>>>>>             token = b'\xff\xff\xff\xff'
>>>>>             # 4 bytes: message length (little-endian)
>>>>>             length = struct.pack('<I', len(data.data_header))
>>>>>             buf = pa.py_buffer(token + length + data.data_header + 
>>>>> data.data_body)
>>>>>             message = pa.ipc.read_message(buf)
>>>>>             print(message)
>>>>>             if schema is None:
>>>>>                 # This should work but is unimplemented
>>>>>                 # print(pa.ipc.read_schema(message))
>>>>>                 schema = pa.ipc.read_schema(buf)
>>>>>                 print(schema)
>>>>>             else:
>>>>>                 batch = pa.ipc.read_record_batch(message, schema)
>>>>>                 print(batch)
>>>>>                 print(batch.to_pydict())
>>>>> 
>>>>> asyncio.run(main())`
>>>>> 
>>>>> On Tue, 8 Feb 2022 at 18:33, David Li <[email protected]> wrote:
>>>>>> __
>>>>>> Unfortunately Flight wraps the C++ Flight implementation, which uses 
>>>>>> gRPC/C++, which is mostly a separate library entirely from grpcio and 
>>>>>> does not benefit from any improvements there. (The two do share a common 
>>>>>> network stack, but that's all; also, grpcio doesn't expose any of the 
>>>>>> lower level APIs that might make it possible to combine the two somehow.)
>>>>>> 
>>>>>> You might ask why pyarrow.flight didn't use grpcio directly (with 
>>>>>> bindings to transmute from FlightData to RecordBatch). However at the 
>>>>>> time the thought is that we would also have non-gRPC transports (which 
>>>>>> are finally being worked on) and so a from-scratch grpcio/Python 
>>>>>> implementation was not desirable. 
>>>>>> 
>>>>>> That said there are issues filed about better documenting FlightData. 
>>>>>> See ARROW-15287[1] which links a StackOverflow answer that demonstrates 
>>>>>> how to glue together asyncio/grpcio/PyArrow.
>>>>>> 
>>>>>> There's also some previous discussion about adding async to Flight more 
>>>>>> generally [2].
>>>>>> 
>>>>>> [1]: https://issues.apache.org/jira/browse/ARROW-15287
>>>>>> [2]: "[C++] Async Arrow Flight" 2021/06/02 
>>>>>> https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>>>>>> 
>>>>>> -David
>>>>>> 
>>>>>> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>>>>>>> GRPC has pretty good AsyncIO support at this point, and since Flight is 
>>>>>>> essentially a wrapper around some GRPC types: why can't we just expose 
>>>>>>> something that generates FlightData grpc objects?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> 

Reply via email to