Hi all,
Related to this issue and solution below we were wondering if it is safe to
call VectorLoader.load() before checking if a client is ready when using back
pressure strategy. The thinking is that the client may still be reading data
from the root and calling load() may cause a data race.
public void getStream(CallContext context, ServerStreamListener listener) {
final FlightStream flightStream = this.client.getStream(ticket);
VectorSchemaRoot clientRoot = flightStream.getRoot();
VectorUnloader vectorUnloader = new VectorUnloader(clientRoot);
final VectorSchemaRoot root = new
VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(),
clientRoot.getRowCount());
final VectorLoader vectorLoader = new VectorLoader(root);
listener.start(root, flightStream.getDictionaryProvider());
while (flightStream.next()) {
if (!flightStream.hasRoot()) {
break;
}
if (flightStream.getRoot() != clientRoot) {
clientRoot = flightStream.getRoot();
vectorUnloader = new VectorUnloader(clientRoot);
}
// is this safe to happen before the client is ready?
vectorLoader.load(vectorUnloader.getRecordBatch());
// this uses the build in CallBackpressureStrategy
final BackpressureStrategy.WaitResult waitResult =
backpressureStrategy.waitForListener(timeout);
if (waitResult == BackpressureStrategy.WaitResult.READY) {
listener.putNext();
}
}
listener.completed();
}
Let me know what you think.
Sincerely,
Alex McRae
> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <[email protected]> wrote:
>
> Absolutely!
>
> On Mon, Feb 28, 2022 at 10:06 AM David Li <[email protected]
> <mailto:[email protected]>> wrote:
> Just to make sure this doesn't get forgotten: I filed
> https://github.com/apache/arrow-cookbook/issues/158
> <https://github.com/apache/arrow-cookbook/issues/158> for providing an
> example of this.
>
> -David
>
> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>> It should be safe. The fast-read path has pretty much always been enabled
>> and I'm not aware of issues it causes. (The fast-read path simply calls an
>> internal gRPC method to avoid bouncing through byte[], but we're still
>> copying the data into Arrow, now that I look at it.)
>>
>> The fast-write path is not relevant here, but that's the one that is
>> trickier to use. We should make sure the optimization(s) are properly
>> documented, since looking through it's not really explained what the
>> consequences are (or at least the flag in ArrowMessage should reference
>> setUseZeroCopy, and we should have a doc page for these env vars analogous
>> to ARROW-15617 for C++.)
>>
>> On a side note, digging around to refresh my memory shows that gRPC Java
>> *finally* introduced a zero-copy Protobuf deserialization path. I'm not sure
>> it's quite relevant for us, since we still need to get the data into an
>> off-heap buffer in the end, but I need to take a closer look. (See
>> grpc/grpc-java#8102.)
>>
>> -David
>>
>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>>> Thanks for the tip David.
>>>
>>> Do you know if zero copy can be used safely on the ServerStreamListener
>>> when using the VectorUnloader/Loader pattern above?
>>>
>>> On Mon, Feb 14, 2022 at 9:38 AM David Li <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Hey Alex,
>>>
>>> Basically, you should call start() exactly once, as you noticed, it sends
>>> the initial schema message.
>>>
>>> If the VectorSchemaRoot is not stable, what you should do is create your
>>> own root with the same schema, and use VectorUnloader/VectorLoader to
>>> transfer data from the source root to the root used by Flight.
>>>
>>> Does that make sense? This would be good to add to the Arrow Java cookbook
>>> (at least, the VectorLoader/Unloader part).
>>>
>>> -David
>>>
>>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>>> Hi team,
>>>>
>>>> We are currently building a Flight service which proxies requests in Java.
>>>> We are currently getting getStream working on the FlightProducer.
>>>>
>>>> The code looks similar to this
>>>>
>>>> public void getStream(CallContext context, ServerStreamListener listener) {
>>>> FlightStream stream = this.client.getStream(ticket);
>>>> while (flightStream.next()) {
>>>> if (!flightStream.hasRoot()) { break; }
>>>>
>>>> listener.start(flightStream.getRoot(),
>>>> flightStream.getDictionaryProvider());
>>>> listener.putNext();
>>>> }
>>>> listener.completed();
>>>> }
>>>>
>>>>
>>>> We are running into issues understanding if this is valid usage? I have
>>>> looked at the OutBoundStreamListenerImpl.java file and it looks like
>>>> calling start() on the listener causes it to resend some schema messages.
>>>> We are trying to understand how to handle the case where
>>>> flightStream.getRoot() returns a different VectorSchemaRoot than the
>>>> previous call.
>>>>
>>>> For more context we have also tried
>>>> public void getStream(CallContext context, ServerStreamListener listener) {
>>>> FlightStream flightStream = this.client.getStream(ticket);
>>>> listener.start(flightStream.getRoot(),
>>>> flightStream.getDictionaryProvider());
>>>> while (flightStream.next()) {
>>>> if (!flightStream.hasRoot()) { break; }
>>>>
>>>> listener.putNext();
>>>> }
>>>> listener.completed();
>>>> }
>>>> But ran into issues with the connection not closing, we believe this to be
>>>> due to the VectorSchemaRoot changing on flightStream.next() calls. We
>>>> believe this is a issue because we are sharing the root with both the
>>>> FlightStream and ServerStreamListener.
>>>> https://github.com/dremio-hub/arrow-flight-client-examples
>>>> <https://github.com/dremio-hub/arrow-flight-client-examples> is the client
>>>> we are using to test this end to end.
>>>>
>>>> Please let me know if you can provide any clarity, I would be happy to
>>>> update the documentation afterwards.
>>>>
>>>> Sincerely,
>>>> Alex McRae
>>>> [email protected] <mailto:[email protected]>
>>>
>>>
>>>
>>> --
>>> James Duong
>>> Lead Software Developer
>>> Bit Quill Technologies Inc.
>>> Direct: +1.604.562.6082 | [email protected]
>>> <mailto:[email protected]>
>>> https://www.bitquilltech.com <https://www.bitquilltech.com/>
>>>
>>>
>>> This email message is for the sole use of the intended recipient(s) and may
>>> contain confidential and privileged information. Any unauthorized review,
>>> use, disclosure, or distribution is prohibited. If you are not the
>>> intended recipient, please contact the sender by reply email and destroy
>>> all copies of the original message. Thank you.
>>
>