So you're finding that if you remove the backpressure handler, there are no 
problems?

Is the timeout a gRPC timeout? Do you know if any messages are making it 
through, or is it timing out after a period of no activity at all?

On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote:
> Hi David,
> 
> We believe we have a data race somewhere as debugging the code above causes 
> no issues but running it without the debugger causes a timeout. We were 
> trying to investigate if putNext kept around a reference to the data in the 
> VectorSchemaRoot. Given that we are getting a timeout with the backpressure 
> we think it is possible the code 
> https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java may 
> be the culprit.
> 
> On Fri, Mar 4, 2022 at 2:35 PM David Li <[email protected]> wrote:
>> __
>> It should be safe. Are you seeing any issues?
>> 
>> Flight waits for an explicit next()/putNext() to actually touch anything. 
>> And once they return, Flight will not read or mutate your data. So for 
>> instance, calling putNext() copies the Arrow data into a gRPC buffer, after 
>> which you can reuse the Arrow buffers. (This is *not* true if you have 
>> zero-copy writes enabled. In that case we poke a reference to the Arrow 
>> buffers into the gRPC data structure and so mutating your Arrow buffer will 
>> mysteriously change "previously written" data.)
>> 
>> It's been years since I've touched this, though, so the details here are 
>> fuzzy to me...
>> 
>> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
>>> Hi all,
>>> 
>>> Related to this issue and solution below we were wondering if it is safe to 
>>> call VectorLoader.load() before checking if a client is ready when using 
>>> back pressure strategy. The thinking is that the client may still be 
>>> reading data from the root and calling load() may cause a data race.
>>> 
>>> *public* void getStream(CallContext context, ServerStreamListener listener) 
>>> {
>>>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
>>> 
>>>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>>>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
>>> 
>>>       *final* VectorSchemaRoot root = *new* 
>>> VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), 
>>> clientRoot.getRowCount());
>>>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>>>       listener.start(root, flightStream.getDictionaryProvider());
>>> 
>>>       *while* (flightStream.next()) {
>>>         *if* (!flightStream.hasRoot()) {
>>>           *break*;
>>>         }
>>> 
>>>         *if* (flightStream.getRoot() != clientRoot) {
>>>           clientRoot = flightStream.getRoot();
>>>           vectorUnloader = *new* VectorUnloader(clientRoot);
>>>         }
>>> 
>>>         // is this safe to happen before the client is ready?
>>>         vectorLoader.load(vectorUnloader.getRecordBatch());
>>> 
>>>         // this uses the build in CallBackpressureStrategy
>>>         *final* BackpressureStrategy.WaitResult waitResult = 
>>> backpressureStrategy.waitForListener(timeout);
>>>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>>>           listener.putNext();
>>>         }
>>>       }
>>> 
>>>       listener.completed();
>>> }
>>> 
>>> Let me know what you think.
>>> 
>>> Sincerely,
>>> 
>>> Alex McRae
>>> 
>>>> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <[email protected]> 
>>>> wrote:
>>>> 
>>>> Absolutely!
>>>> 
>>>> On Mon, Feb 28, 2022 at 10:06 AM David Li <[email protected]> wrote:
>>>>> __
>>>>> Just to make sure this doesn't get forgotten: I filed 
>>>>> https://github.com/apache/arrow-cookbook/issues/158 for providing an 
>>>>> example of this.
>>>>> 
>>>>> -David
>>>>> 
>>>>> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>>>>>> It should be safe. The fast-read path has pretty much always been 
>>>>>> enabled and I'm not aware of issues it causes. (The fast-read path 
>>>>>> simply calls an internal gRPC method to avoid bouncing through byte[], 
>>>>>> but we're still copying the data into Arrow, now that I look at it.)
>>>>>> 
>>>>>> The fast-write path is not relevant here, but that's the one that is 
>>>>>> trickier to use. We should make sure the optimization(s) are properly 
>>>>>> documented, since looking through it's not really explained what the 
>>>>>> consequences are (or at least the flag in ArrowMessage should reference 
>>>>>> setUseZeroCopy, and we should have a doc page for these env vars 
>>>>>> analogous to ARROW-15617 for C++.)
>>>>>> 
>>>>>> On a side note, digging around to refresh my memory shows that gRPC Java 
>>>>>> *finally* introduced a zero-copy Protobuf deserialization path. I'm not 
>>>>>> sure it's quite relevant for us, since we still need to get the data 
>>>>>> into an off-heap buffer in the end, but I need to take a closer look. 
>>>>>> (See grpc/grpc-java#8102.)
>>>>>> 
>>>>>> -David
>>>>>> 
>>>>>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>>>>>>> Thanks for the tip David.
>>>>>>> 
>>>>>>> Do you know if zero copy can be used safely on the ServerStreamListener 
>>>>>>> when using the VectorUnloader/Loader pattern above?
>>>>>>> 
>>>>>>> On Mon, Feb 14, 2022 at 9:38 AM David Li <[email protected]> wrote:
>>>>>>>> __
>>>>>>>> Hey Alex,
>>>>>>>> 
>>>>>>>> Basically, you should call start() exactly once, as you noticed, it 
>>>>>>>> sends the initial schema message.
>>>>>>>> 
>>>>>>>> If the VectorSchemaRoot is not stable, what you should do is create 
>>>>>>>> your own root with the same schema, and use 
>>>>>>>> VectorUnloader/VectorLoader to transfer data from the source root to 
>>>>>>>> the root used by Flight.
>>>>>>>> 
>>>>>>>> Does that make sense? This would be good to add to the Arrow Java 
>>>>>>>> cookbook (at least, the VectorLoader/Unloader part).
>>>>>>>> 
>>>>>>>> -David
>>>>>>>> 
>>>>>>>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>>>>>>>> Hi team,
>>>>>>>>> 
>>>>>>>>> We are currently building a Flight service which proxies requests in 
>>>>>>>>> Java. We are currently getting getStream working on the 
>>>>>>>>> FlightProducer.
>>>>>>>>> 
>>>>>>>>> The code looks similar to this
>>>>>>>>> 
>>>>>>>>> public *void* getStream(CallContext context, ServerStreamListener 
>>>>>>>>> listener) {
>>>>>>>>>     FlightStream stream = *this*.client.getStream(ticket);
>>>>>>>>>     *while* (flightStream.next()) {
>>>>>>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>>>>>>         
>>>>>>>>>         listener.start(flightStream.getRoot(), 
>>>>>>>>> flightStream.getDictionaryProvider());
>>>>>>>>>         listener.putNext();
>>>>>>>>>     }
>>>>>>>>>     listener.completed();
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> We are running into issues understanding if this is valid usage? I 
>>>>>>>>> have looked at the OutBoundStreamListenerImpl.java file and it looks 
>>>>>>>>> like calling start() on the listener causes it to resend some schema 
>>>>>>>>> messages.
>>>>>>>>> We are trying to understand how to handle the case where 
>>>>>>>>> flightStream.getRoot() returns a different VectorSchemaRoot than the 
>>>>>>>>> previous call.
>>>>>>>>> 
>>>>>>>>> For more context we have also tried
>>>>>>>>> public *void* getStream(CallContext context, ServerStreamListener 
>>>>>>>>> listener) {
>>>>>>>>>     FlightStream flightStream = *this*.client.getStream(ticket);
>>>>>>>>>     listener.start(flightStream.getRoot(), 
>>>>>>>>> flightStream.getDictionaryProvider());
>>>>>>>>>     *while* (flightStream.next()) {
>>>>>>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>>>>>>         
>>>>>>>>>         listener.putNext();
>>>>>>>>>     }
>>>>>>>>>     listener.completed();
>>>>>>>>> }
>>>>>>>>> But ran into issues with the connection not closing, we believe this 
>>>>>>>>> to be due to the VectorSchemaRoot changing on flightStream.next() 
>>>>>>>>> calls. We believe this is a issue because we are sharing the root 
>>>>>>>>> with both the FlightStream and ServerStreamListener.
>>>>>>>>> https://github.com/dremio-hub/arrow-flight-client-examples is the 
>>>>>>>>> client we are using to test this end to end.
>>>>>>>>> 
>>>>>>>>> Please let me know if you can provide any clarity, I would be happy 
>>>>>>>>> to update the documentation afterwards.
>>>>>>>>> 
>>>>>>>>> Sincerely,
>>>>>>>>> Alex McRae
>>>>>>>>> [email protected]
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -- 
>>>>>>> *James Duong*
>>>>>>> Lead Software Developer
>>>>>>> Bit Quill Technologies Inc.
>>>>>>> Direct: +1.604.562.6082 | [email protected]
>>>>>>> https://www.bitquilltech.com
>>>>>>> 
>>>>>>> 
>>>>>>> This email message is for the sole use of the intended recipient(s) and 
>>>>>>> may contain confidential and privileged information.  Any unauthorized 
>>>>>>> review, use, disclosure, or distribution is prohibited.  If you are not 
>>>>>>> the intended recipient, please contact the sender by reply email and 
>>>>>>> destroy all copies of the original message.  Thank you.
>>>>>> 
>>>>> 
>> 

Reply via email to