Yeah, I think getting into the voided warranty domain.

I've done this:

class FlightService(flight_grpc.FlightServiceServicer):
    def DoGet(self, request, ctx) -> Iterator[flight_proto.FlightData]:
        tbl = pyarrow.Table.from_pydict({"hello_world": [1,2,3]})
        batches = tbl.to_batches()
        buf = batches[0].serialize()

        buffer_ = buf.to_pybytes()
        header_length = struct.unpack("<I", buffer_[4:8])[0]

        yield flight_proto.FlightData(
            flight_descriptor=flight_proto.FlightDescriptor(),
            data_header=buffer_[8:header_length],
            data_body=buffer_[header_length+8:]
        )



Which looks kinda right... but I'm getting:

FlightInternalError: Server never sent a data message

When I try to request with the flight client.




On Tue, 8 Feb 2022 at 20:58, David Li <[email protected]> wrote:

> Ah, right. You'd first serialize the batch to a byte buffer, then read the
> flatbuffer length from the second 4 bytes (since the first 4 bytes are the
> IPC continuation token), slice the buffer, and use that as the message
> header, and the remainder of the buffer as the message body. (But this is
> starting to get into "your warranty is void"/"double-check with the Arrow
> specification" territory.)
>
> -David
>
> On Tue, Feb 8, 2022, at 15:25, R KB wrote:
>
> I'm confident I can do that with a RecordBatchWriter and BytesIO object
> that I can the value of., however what do I do about the message header?
>
> On Tue, 8 Feb 2022 at 19:17, David Li <[email protected]> wrote:
>
>
> No, you'd have to serialize the buffer, chop off the first 8 bytes
> yourself, and generate the Protobuf.
>
> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>
> Thanks for that; hopefully those links should be fruitful.
>
> Question, before I get started, is there an API exposed so that I could do
> the reverse of this:
>
> import asyncioimport pathlibimport struct
> import grpcimport pyarrow as paimport pyarrow.flight as pf
> import Flight_pb2, Flight_pb2_grpc
> async def main():
>     ticket = pf.Ticket("tick")
>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>         schema = None
>         async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>             # 4 bytes: Need IPC continuation token
>             token = b'\xff\xff\xff\xff'
>             # 4 bytes: message length (little-endian)
>             length = struct.pack('<I', len(data.data_header))
>             buf = pa.py_buffer(token + length + data.data_header + 
> data.data_body)
>             message = pa.ipc.read_message(buf)
>             print(message)
>             if schema is None:
>                 # This should work but is unimplemented
>                 # print(pa.ipc.read_schema(message))
>                 schema = pa.ipc.read_schema(buf)
>                 print(schema)
>             else:
>                 batch = pa.ipc.read_record_batch(message, schema)
>                 print(batch)
>                 print(batch.to_pydict())
>
> asyncio.run(main())
>
>
> On Tue, 8 Feb 2022 at 18:33, David Li <[email protected]> wrote:
>
>
> Unfortunately Flight wraps the C++ Flight implementation, which uses
> gRPC/C++, which is mostly a separate library entirely from grpcio and does
> not benefit from any improvements there. (The two do share a common network
> stack, but that's all; also, grpcio doesn't expose any of the lower level
> APIs that might make it possible to combine the two somehow.)
>
> You might ask why pyarrow.flight didn't use grpcio directly (with bindings
> to transmute from FlightData to RecordBatch). However at the time the
> thought is that we would also have non-gRPC transports (which are finally
> being worked on) and so a from-scratch grpcio/Python implementation was not
> desirable.
>
> That said there are issues filed about better documenting FlightData. See
> ARROW-15287[1] which links a StackOverflow answer that demonstrates how to
> glue together asyncio/grpcio/PyArrow.
>
> There's also some previous discussion about adding async to Flight more
> generally [2].
>
> [1]: https://issues.apache.org/jira/browse/ARROW-15287
> [2]: "[C++] Async Arrow Flight" 2021/06/02
> https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>
> -David
>
> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>
> GRPC has pretty good AsyncIO support at this point, and since Flight is
> essentially a wrapper around some GRPC types: why can't we just expose
> something that generates FlightData grpc objects?
>
>
>
>
>
>
>
>
>

Reply via email to