Hi David,

We believe we have a data race somewhere as debugging the code above causes
no issues but running it without the debugger causes a timeout. We were
trying to investigate if putNext kept around a reference to the data in the
VectorSchemaRoot. Given that we are getting a timeout with the backpressure
we think it is possible the code
https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java
may be the culprit.

On Fri, Mar 4, 2022 at 2:35 PM David Li <[email protected]> wrote:

> It should be safe. Are you seeing any issues?
>
> Flight waits for an explicit next()/putNext() to actually touch anything.
> And once they return, Flight will not read or mutate your data. So for
> instance, calling putNext() copies the Arrow data into a gRPC buffer, after
> which you can reuse the Arrow buffers. (This is *not* true if you have
> zero-copy writes enabled. In that case we poke a reference to the Arrow
> buffers into the gRPC data structure and so mutating your Arrow buffer will
> mysteriously change "previously written" data.)
>
> It's been years since I've touched this, though, so the details here are
> fuzzy to me...
>
> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
>
> Hi all,
>
> Related to this issue and solution below we were wondering if it is safe
> to call VectorLoader.load() before checking if a client is ready when using
> back pressure strategy. The thinking is that the client may still be
> reading data from the root and calling load() may cause a data race.
>
> *public* void getStream(CallContext context, ServerStreamListener listener) {
>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
>
>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
>
>       *final* VectorSchemaRoot root = *new* 
> VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), 
> clientRoot.getRowCount());
>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>       listener.start(root, flightStream.getDictionaryProvider());
>
>       *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) {
>           *break*;
>         }
>
>         *if* (flightStream.getRoot() != clientRoot) {
>           clientRoot = flightStream.getRoot();
>           vectorUnloader = *new* VectorUnloader(clientRoot);
>         }
>
>         // is this safe to happen before the client is ready?
>         vectorLoader.load(vectorUnloader.getRecordBatch());
>
>         // this uses the build in CallBackpressureStrategy
>         *final* BackpressureStrategy.WaitResult waitResult = 
> backpressureStrategy.waitForListener(timeout);
>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>           listener.putNext();
>         }
>       }
>
>       listener.completed();}
>
> Let me know what you think.
>
> Sincerely,
>
> Alex McRae
>
> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <[email protected]>
> wrote:
>
> Absolutely!
>
> On Mon, Feb 28, 2022 at 10:06 AM David Li <[email protected]> wrote:
>
>
> Just to make sure this doesn't get forgotten: I filed
> https://github.com/apache/arrow-cookbook/issues/158 for providing an
> example of this.
>
> -David
>
> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>
> It should be safe. The fast-read path has pretty much always been enabled
> and I'm not aware of issues it causes. (The fast-read path simply calls an
> internal gRPC method to avoid bouncing through byte[], but we're still
> copying the data into Arrow, now that I look at it.)
>
> The fast-write path is not relevant here, but that's the one that is
> trickier to use. We should make sure the optimization(s) are properly
> documented, since looking through it's not really explained what the
> consequences are (or at least the flag in ArrowMessage should reference
> setUseZeroCopy, and we should have a doc page for these env vars analogous
> to ARROW-15617 for C++.)
>
> On a side note, digging around to refresh my memory shows that gRPC Java
> *finally* introduced a zero-copy Protobuf deserialization path. I'm not
> sure it's quite relevant for us, since we still need to get the data into
> an off-heap buffer in the end, but I need to take a closer look. (See
> grpc/grpc-java#8102.)
>
> -David
>
> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>
> Thanks for the tip David.
>
> Do you know if zero copy can be used safely on the ServerStreamListener
> when using the VectorUnloader/Loader pattern above?
>
> On Mon, Feb 14, 2022 at 9:38 AM David Li <[email protected]> wrote:
>
>
> Hey Alex,
>
> Basically, you should call start() exactly once, as you noticed, it sends
> the initial schema message.
>
> If the VectorSchemaRoot is not stable, what you should do is create your
> own root with the same schema, and use VectorUnloader/VectorLoader to
> transfer data from the source root to the root used by Flight.
>
> Does that make sense? This would be good to add to the Arrow Java cookbook
> (at least, the VectorLoader/Unloader part).
>
> -David
>
> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>
> Hi team,
>
> We are currently building a Flight service which proxies requests in Java.
> We are currently getting getStream working on the FlightProducer.
>
> The code looks similar to this
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream stream = *this*.client.getStream(ticket);
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.start(flightStream.getRoot(), 
> flightStream.getDictionaryProvider());
>         listener.putNext();
>     }
>
>     listener.completed();
>
> }
>
>
> We are running into issues understanding if this is valid usage? I have
> looked at the OutBoundStreamListenerImpl.java file and it looks like
> calling start() on the listener causes it to resend some schema messages.
> We are trying to understand how to handle the case where
> flightStream.getRoot() returns a different VectorSchemaRoot than the
> previous call.
>
> For more context we have also tried
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream flightStream = *this*.client.getStream(ticket);
>     listener.start(flightStream.getRoot(), 
> flightStream.getDictionaryProvider());
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.putNext();
>     }
>     listener.completed();}
>
> But ran into issues with the connection not closing, we believe this to be
> due to the VectorSchemaRoot changing on flightStream.next() calls. We
> believe this is a issue because we are sharing the root with both the
> FlightStream and ServerStreamListener.
> https://github.com/dremio-hub/arrow-flight-client-examples is the client
> we are using to test this end to end.
>
> Please let me know if you can provide any clarity, I would be happy to
> update the documentation afterwards.
>
> Sincerely,
> Alex McRae
> [email protected]
>
>
>
>
> --
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | [email protected]
> https://www.bitquilltech.com
>
>
> This email message is for the sole use of the intended recipient(s) and
> may contain confidential and privileged information.  Any unauthorized
> review, use, disclosure, or distribution is prohibited.  If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.  Thank you.
>
>
>
>
>

Reply via email to