Use case is a distributed setting. A Flight server at edge of a cluster
receives Arrow data from remote nodes in IPC format. Rather than
deserializing and serializing again to send out via Flight, better to
leave as is.
Anyway, thanks. Best, Matt
On 11/1/22 12:14 am, David Li wrote:
Hey Matt,
It's not built out of the box but I think you're on the right track.
That said, I'm curious about your use case here that you have
pre-serialized bytes - is this to avoid using the Arrow reader at some
point?
Descriptor can indeed be ignored here. app_metadata is optional.
The first message in a stream should be an IPC schema message. It
should then be followed by DictionaryBatch messages, then RecordBatch
messages. All of these follow the "encapsulated message format" [1]
where the metadata flatbuffer goes in metadata and the message body
goes in body_buffers. (The continuation token is omitted, but not the
length, IIRC.)
So for schema, you would have the IPC schema flatbuffer in "metadata"
and no body. For RecordBatch/DictionaryBatch, you would have the IPC
record batch/dictionary batch in "metadata" and the data in "body".
This means that you may need to parse your pre-serialized bytes (to
some extent) to separate the two.
There's multiple body buffers so that we don't require concatenation.
For instance, a RecordBatch in memory might be backed by multiple
allocations. Only taking one body buffer would mean that we would have
to concatenate everything before sending, which defeats the zero-copy
goal. But if what you have is truly pre-serialized, then you can pass
just the one buffer.
[1]:
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
<https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
-David
On Mon, Jan 10, 2022, at 02:32, Matt Youill wrote:
Hi,
Have been hacking on this for a while, but wanted to make sure I'm on
the right track.
Is it possible to supply a pre-serialized IPC stream of data from a
Flight server's DoGet function? It looks like a table *object* (or
schema + record batches) can be supplied to the FlightDataStream
parameter (using a RecordBatchStream) but not plain bytes.
I've had a look at implementing a FlightDataStream for plain bytes. I
can see the byte stream needs to be split up into FlightPayloads, but
it's not clear what goes where in each one.
Given the following defs for FlightPayloads...
struct ARROW_FLIGHT_EXPORT FlightPayload {
std::shared_ptr<Buffer> descriptor;
std::shared_ptr<Buffer> app_metadata;
ipc::IpcPayload ipc_message;
};
struct IpcPayload {
MessageType type = MessageType::NONE;
std::shared_ptr<Buffer> metadata;
std::vector<std::shared_ptr<Buffer>> body_buffers;
int64_t body_length = 0;
};
AFAICT it looks like:
"descriptor" is ignored for DoGet
"app_metadata" can be ignored
"type" is set to whatever message it is (e.g. schema, batch, etc)
"metadata" buffer should contain a schema IPC bytes?
"body_buffers" should contain IPC bytes for each batch? (Why are there
multiple buffers? Is there any reason not to just use the first slot of
the buffers vector?)
Any advice appreciated.
Thanks, Matt