Hi David, We believe we have a data race somewhere as debugging the code above causes no issues but running it without the debugger causes a timeout. We were trying to investigate if putNext kept around a reference to the data in the VectorSchemaRoot. Given that we are getting a timeout with the backpressure we think it is possible the code https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java may be the culprit.
On Fri, Mar 4, 2022 at 2:35 PM David Li <[email protected]> wrote: > It should be safe. Are you seeing any issues? > > Flight waits for an explicit next()/putNext() to actually touch anything. > And once they return, Flight will not read or mutate your data. So for > instance, calling putNext() copies the Arrow data into a gRPC buffer, after > which you can reuse the Arrow buffers. (This is *not* true if you have > zero-copy writes enabled. In that case we poke a reference to the Arrow > buffers into the gRPC data structure and so mutating your Arrow buffer will > mysteriously change "previously written" data.) > > It's been years since I've touched this, though, so the details here are > fuzzy to me... > > On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote: > > Hi all, > > Related to this issue and solution below we were wondering if it is safe > to call VectorLoader.load() before checking if a client is ready when using > back pressure strategy. The thinking is that the client may still be > reading data from the root and calling load() may cause a data race. > > *public* void getStream(CallContext context, ServerStreamListener listener) { > *final* FlightStream flightStream = *this*.client.getStream(ticket); > > VectorSchemaRoot clientRoot = flightStream.getRoot(); > VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot); > > *final* VectorSchemaRoot root = *new* > VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), > clientRoot.getRowCount()); > *final* VectorLoader vectorLoader = *new* VectorLoader(root); > listener.start(root, flightStream.getDictionaryProvider()); > > *while* (flightStream.next()) { > *if* (!flightStream.hasRoot()) { > *break*; > } > > *if* (flightStream.getRoot() != clientRoot) { > clientRoot = flightStream.getRoot(); > vectorUnloader = *new* VectorUnloader(clientRoot); > } > > // is this safe to happen before the client is ready? > vectorLoader.load(vectorUnloader.getRecordBatch()); > > // this uses the build in CallBackpressureStrategy > *final* BackpressureStrategy.WaitResult waitResult = > backpressureStrategy.waitForListener(timeout); > *if* (waitResult == BackpressureStrategy.WaitResult.READY) { > listener.putNext(); > } > } > > listener.completed();} > > Let me know what you think. > > Sincerely, > > Alex McRae > > On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <[email protected]> > wrote: > > Absolutely! > > On Mon, Feb 28, 2022 at 10:06 AM David Li <[email protected]> wrote: > > > Just to make sure this doesn't get forgotten: I filed > https://github.com/apache/arrow-cookbook/issues/158 for providing an > example of this. > > -David > > On Tue, Feb 15, 2022, at 13:54, David Li wrote: > > It should be safe. The fast-read path has pretty much always been enabled > and I'm not aware of issues it causes. (The fast-read path simply calls an > internal gRPC method to avoid bouncing through byte[], but we're still > copying the data into Arrow, now that I look at it.) > > The fast-write path is not relevant here, but that's the one that is > trickier to use. We should make sure the optimization(s) are properly > documented, since looking through it's not really explained what the > consequences are (or at least the flag in ArrowMessage should reference > setUseZeroCopy, and we should have a doc page for these env vars analogous > to ARROW-15617 for C++.) > > On a side note, digging around to refresh my memory shows that gRPC Java > *finally* introduced a zero-copy Protobuf deserialization path. I'm not > sure it's quite relevant for us, since we still need to get the data into > an off-heap buffer in the end, but I need to take a closer look. (See > grpc/grpc-java#8102.) > > -David > > On Tue, Feb 15, 2022, at 13:12, James Duong wrote: > > Thanks for the tip David. > > Do you know if zero copy can be used safely on the ServerStreamListener > when using the VectorUnloader/Loader pattern above? > > On Mon, Feb 14, 2022 at 9:38 AM David Li <[email protected]> wrote: > > > Hey Alex, > > Basically, you should call start() exactly once, as you noticed, it sends > the initial schema message. > > If the VectorSchemaRoot is not stable, what you should do is create your > own root with the same schema, and use VectorUnloader/VectorLoader to > transfer data from the source root to the root used by Flight. > > Does that make sense? This would be good to add to the Arrow Java cookbook > (at least, the VectorLoader/Unloader part). > > -David > > On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote: > > Hi team, > > We are currently building a Flight service which proxies requests in Java. > We are currently getting getStream working on the FlightProducer. > > The code looks similar to this > > public *void* getStream(CallContext context, ServerStreamListener listener) { > FlightStream stream = *this*.client.getStream(ticket); > *while* (flightStream.next()) { > *if* (!flightStream.hasRoot()) { *break*; } > > listener.start(flightStream.getRoot(), > flightStream.getDictionaryProvider()); > listener.putNext(); > } > > listener.completed(); > > } > > > We are running into issues understanding if this is valid usage? I have > looked at the OutBoundStreamListenerImpl.java file and it looks like > calling start() on the listener causes it to resend some schema messages. > We are trying to understand how to handle the case where > flightStream.getRoot() returns a different VectorSchemaRoot than the > previous call. > > For more context we have also tried > > public *void* getStream(CallContext context, ServerStreamListener listener) { > FlightStream flightStream = *this*.client.getStream(ticket); > listener.start(flightStream.getRoot(), > flightStream.getDictionaryProvider()); > *while* (flightStream.next()) { > *if* (!flightStream.hasRoot()) { *break*; } > > listener.putNext(); > } > listener.completed();} > > But ran into issues with the connection not closing, we believe this to be > due to the VectorSchemaRoot changing on flightStream.next() calls. We > believe this is a issue because we are sharing the root with both the > FlightStream and ServerStreamListener. > https://github.com/dremio-hub/arrow-flight-client-examples is the client > we are using to test this end to end. > > Please let me know if you can provide any clarity, I would be happy to > update the documentation afterwards. > > Sincerely, > Alex McRae > [email protected] > > > > > -- > *James Duong* > Lead Software Developer > Bit Quill Technologies Inc. > Direct: +1.604.562.6082 | [email protected] > https://www.bitquilltech.com > > > This email message is for the sole use of the intended recipient(s) and > may contain confidential and privileged information. Any unauthorized > review, use, disclosure, or distribution is prohibited. If you are not the > intended recipient, please contact the sender by reply email and destroy > all copies of the original message. Thank you. > > > > >
