It should be safe. Are you seeing any issues?

Flight waits for an explicit next()/putNext() to actually touch anything. And 
once they return, Flight will not read or mutate your data. So for instance, 
calling putNext() copies the Arrow data into a gRPC buffer, after which you can 
reuse the Arrow buffers. (This is *not* true if you have zero-copy writes 
enabled. In that case we poke a reference to the Arrow buffers into the gRPC 
data structure and so mutating your Arrow buffer will mysteriously change 
"previously written" data.)

It's been years since I've touched this, though, so the details here are fuzzy 
to me...

On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
> Hi all,
> 
> Related to this issue and solution below we were wondering if it is safe to 
> call VectorLoader.load() before checking if a client is ready when using back 
> pressure strategy. The thinking is that the client may still be reading data 
> from the root and calling load() may cause a data race.
> 
> *public* void getStream(CallContext context, ServerStreamListener listener) {
>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
> 
>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
> 
>       *final* VectorSchemaRoot root = *new* 
> VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), 
> clientRoot.getRowCount());
>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>       listener.start(root, flightStream.getDictionaryProvider());
> 
>       *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) {
>           *break*;
>         }
> 
>         *if* (flightStream.getRoot() != clientRoot) {
>           clientRoot = flightStream.getRoot();
>           vectorUnloader = *new* VectorUnloader(clientRoot);
>         }
> 
>         // is this safe to happen before the client is ready?
>         vectorLoader.load(vectorUnloader.getRecordBatch());
> 
>         // this uses the build in CallBackpressureStrategy
>         *final* BackpressureStrategy.WaitResult waitResult = 
> backpressureStrategy.waitForListener(timeout);
>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>           listener.putNext();
>         }
>       }
> 
>       listener.completed();
> }
> 
> Let me know what you think.
> 
> Sincerely,
> 
> Alex McRae
> 
>> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <[email protected]> wrote:
>> 
>> Absolutely!
>> 
>> On Mon, Feb 28, 2022 at 10:06 AM David Li <[email protected]> wrote:
>>> __
>>> Just to make sure this doesn't get forgotten: I filed 
>>> https://github.com/apache/arrow-cookbook/issues/158 for providing an 
>>> example of this.
>>> 
>>> -David
>>> 
>>> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>>>> It should be safe. The fast-read path has pretty much always been enabled 
>>>> and I'm not aware of issues it causes. (The fast-read path simply calls an 
>>>> internal gRPC method to avoid bouncing through byte[], but we're still 
>>>> copying the data into Arrow, now that I look at it.)
>>>> 
>>>> The fast-write path is not relevant here, but that's the one that is 
>>>> trickier to use. We should make sure the optimization(s) are properly 
>>>> documented, since looking through it's not really explained what the 
>>>> consequences are (or at least the flag in ArrowMessage should reference 
>>>> setUseZeroCopy, and we should have a doc page for these env vars analogous 
>>>> to ARROW-15617 for C++.)
>>>> 
>>>> On a side note, digging around to refresh my memory shows that gRPC Java 
>>>> *finally* introduced a zero-copy Protobuf deserialization path. I'm not 
>>>> sure it's quite relevant for us, since we still need to get the data into 
>>>> an off-heap buffer in the end, but I need to take a closer look. (See 
>>>> grpc/grpc-java#8102.)
>>>> 
>>>> -David
>>>> 
>>>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>>>>> Thanks for the tip David.
>>>>> 
>>>>> Do you know if zero copy can be used safely on the ServerStreamListener 
>>>>> when using the VectorUnloader/Loader pattern above?
>>>>> 
>>>>> On Mon, Feb 14, 2022 at 9:38 AM David Li <[email protected]> wrote:
>>>>>> __
>>>>>> Hey Alex,
>>>>>> 
>>>>>> Basically, you should call start() exactly once, as you noticed, it 
>>>>>> sends the initial schema message.
>>>>>> 
>>>>>> If the VectorSchemaRoot is not stable, what you should do is create your 
>>>>>> own root with the same schema, and use VectorUnloader/VectorLoader to 
>>>>>> transfer data from the source root to the root used by Flight.
>>>>>> 
>>>>>> Does that make sense? This would be good to add to the Arrow Java 
>>>>>> cookbook (at least, the VectorLoader/Unloader part).
>>>>>> 
>>>>>> -David
>>>>>> 
>>>>>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>>>>>> Hi team,
>>>>>>> 
>>>>>>> We are currently building a Flight service which proxies requests in 
>>>>>>> Java. We are currently getting getStream working on the FlightProducer.
>>>>>>> 
>>>>>>> The code looks similar to this
>>>>>>> 
>>>>>>> public *void* getStream(CallContext context, ServerStreamListener 
>>>>>>> listener) {
>>>>>>>     FlightStream stream = *this*.client.getStream(ticket);
>>>>>>>     *while* (flightStream.next()) {
>>>>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>>>>         
>>>>>>>         listener.start(flightStream.getRoot(), 
>>>>>>> flightStream.getDictionaryProvider());
>>>>>>>         listener.putNext();
>>>>>>>     }
>>>>>>>     listener.completed();
>>>>>>> }
>>>>>>> 
>>>>>>> 
>>>>>>> We are running into issues understanding if this is valid usage? I have 
>>>>>>> looked at the OutBoundStreamListenerImpl.java file and it looks like 
>>>>>>> calling start() on the listener causes it to resend some schema 
>>>>>>> messages.
>>>>>>> We are trying to understand how to handle the case where 
>>>>>>> flightStream.getRoot() returns a different VectorSchemaRoot than the 
>>>>>>> previous call.
>>>>>>> 
>>>>>>> For more context we have also tried
>>>>>>> public *void* getStream(CallContext context, ServerStreamListener 
>>>>>>> listener) {
>>>>>>>     FlightStream flightStream = *this*.client.getStream(ticket);
>>>>>>>     listener.start(flightStream.getRoot(), 
>>>>>>> flightStream.getDictionaryProvider());
>>>>>>>     *while* (flightStream.next()) {
>>>>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>>>>         
>>>>>>>         listener.putNext();
>>>>>>>     }
>>>>>>>     listener.completed();
>>>>>>> }
>>>>>>> But ran into issues with the connection not closing, we believe this to 
>>>>>>> be due to the VectorSchemaRoot changing on flightStream.next() calls. 
>>>>>>> We believe this is a issue because we are sharing the root with both 
>>>>>>> the FlightStream and ServerStreamListener.
>>>>>>> https://github.com/dremio-hub/arrow-flight-client-examples is the 
>>>>>>> client we are using to test this end to end.
>>>>>>> 
>>>>>>> Please let me know if you can provide any clarity, I would be happy to 
>>>>>>> update the documentation afterwards.
>>>>>>> 
>>>>>>> Sincerely,
>>>>>>> Alex McRae
>>>>>>> [email protected]
>>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> *James Duong*
>>>>> Lead Software Developer
>>>>> Bit Quill Technologies Inc.
>>>>> Direct: +1.604.562.6082 | [email protected]
>>>>> https://www.bitquilltech.com
>>>>> 
>>>>> 
>>>>> This email message is for the sole use of the intended recipient(s) and 
>>>>> may contain confidential and privileged information.  Any unauthorized 
>>>>> review, use, disclosure, or distribution is prohibited.  If you are not 
>>>>> the intended recipient, please contact the sender by reply email and 
>>>>> destroy all copies of the original message.  Thank you.
>>>> 
>>> 

Reply via email to