Thanks David, Got it working :)
On Tue, 8 Feb 2022 at 21:41, David Li <[email protected]> wrote: > IIRC header_length does not include those first 8 bytes so there's an > offset needed in all slices here. > > Also, the first FlightData is expected to contain the schema. > > On Tue, Feb 8, 2022, at 16:32, R KB wrote: > > Yeah, I think getting into the voided warranty domain. > > I've done this: > > class FlightService(flight_grpc.FlightServiceServicer): > def DoGet(self, request, ctx) -> Iterator[flight_proto.FlightData]: > tbl = pyarrow.Table.from_pydict({"hello_world": [1,2,3]}) > batches = tbl.to_batches() > buf = batches[0].serialize() > > buffer_ = buf.to_pybytes() > header_length = struct.unpack("<I", buffer_[4:8])[0] > > yield flight_proto.FlightData( > flight_descriptor=flight_proto.FlightDescriptor(), > data_header=buffer_[8:header_length], > data_body=buffer_[header_length+8:] > ) > > > > Which looks kinda right... but I'm getting: > > FlightInternalError: Server never sent a data message > > When I try to request with the flight client. > > > > > On Tue, 8 Feb 2022 at 20:58, David Li <[email protected]> wrote: > > > Ah, right. You'd first serialize the batch to a byte buffer, then read the > flatbuffer length from the second 4 bytes (since the first 4 bytes are the > IPC continuation token), slice the buffer, and use that as the message > header, and the remainder of the buffer as the message body. (But this is > starting to get into "your warranty is void"/"double-check with the Arrow > specification" territory.) > > -David > > On Tue, Feb 8, 2022, at 15:25, R KB wrote: > > I'm confident I can do that with a RecordBatchWriter and BytesIO object > that I can the value of., however what do I do about the message header? > > On Tue, 8 Feb 2022 at 19:17, David Li <[email protected]> wrote: > > > No, you'd have to serialize the buffer, chop off the first 8 bytes > yourself, and generate the Protobuf. > > On Tue, Feb 8, 2022, at 13:59, R KB wrote: > > Thanks for that; hopefully those links should be fruitful. > > Question, before I get started, is there an API exposed so that I could do > the reverse of this: > > import asyncioimport pathlibimport struct > import grpcimport pyarrow as paimport pyarrow.flight as pf > import Flight_pb2, Flight_pb2_grpc > async def main(): > ticket = pf.Ticket("tick") > async with grpc.aio.insecure_channel("localhost:1234") as channel: > stub = Flight_pb2_grpc.FlightServiceStub(channel) > schema = None > async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)): > # 4 bytes: Need IPC continuation token > token = b'\xff\xff\xff\xff' > # 4 bytes: message length (little-endian) > length = struct.pack('<I', len(data.data_header)) > buf = pa.py_buffer(token + length + data.data_header + > data.data_body) > message = pa.ipc.read_message(buf) > print(message) > if schema is None: > # This should work but is unimplemented > # print(pa.ipc.read_schema(message)) > schema = pa.ipc.read_schema(buf) > print(schema) > else: > batch = pa.ipc.read_record_batch(message, schema) > print(batch) > print(batch.to_pydict()) > > asyncio.run(main()) > > > On Tue, 8 Feb 2022 at 18:33, David Li <[email protected]> wrote: > > > Unfortunately Flight wraps the C++ Flight implementation, which uses > gRPC/C++, which is mostly a separate library entirely from grpcio and does > not benefit from any improvements there. (The two do share a common network > stack, but that's all; also, grpcio doesn't expose any of the lower level > APIs that might make it possible to combine the two somehow.) > > You might ask why pyarrow.flight didn't use grpcio directly (with bindings > to transmute from FlightData to RecordBatch). However at the time the > thought is that we would also have non-gRPC transports (which are finally > being worked on) and so a from-scratch grpcio/Python implementation was not > desirable. > > That said there are issues filed about better documenting FlightData. See > ARROW-15287[1] which links a StackOverflow answer that demonstrates how to > glue together asyncio/grpcio/PyArrow. > > There's also some previous discussion about adding async to Flight more > generally [2]. > > [1]: https://issues.apache.org/jira/browse/ARROW-15287 > [2]: "[C++] Async Arrow Flight" 2021/06/02 > https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv > > -David > > On Tue, Feb 8, 2022, at 13:24, R KB wrote: > > GRPC has pretty good AsyncIO support at this point, and since Flight is > essentially a wrapper around some GRPC types: why can't we just expose > something that generates FlightData grpc objects? > > > > > > > > > >
