We see backpressure related timeouts. I'm thinking there's an issue with CallbackBackpressureStrategy relying on ServerStreamListener#isReady(). I've created https://issues.apache.org/jira/browse/ARROW-15876 for this.
We're going to try a fix for this locally then if it helps create a PR. On Mon, Mar 7, 2022 at 4:19 PM David Li <[email protected]> wrote: > So you're finding that if you remove the backpressure handler, there are > no problems? > > Is the timeout a gRPC timeout? Do you know if any messages are making it > through, or is it timing out after a period of no activity at all? > > On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote: > > Hi David, > > We believe we have a data race somewhere as debugging the code above > causes no issues but running it without the debugger causes a timeout. We > were trying to investigate if putNext kept around a reference to the data > in the VectorSchemaRoot. Given that we are getting a timeout with the > backpressure we think it is possible the code > https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java > may be the culprit. > > On Fri, Mar 4, 2022 at 2:35 PM David Li <[email protected]> wrote: > > > It should be safe. Are you seeing any issues? > > Flight waits for an explicit next()/putNext() to actually touch anything. > And once they return, Flight will not read or mutate your data. So for > instance, calling putNext() copies the Arrow data into a gRPC buffer, after > which you can reuse the Arrow buffers. (This is *not* true if you have > zero-copy writes enabled. In that case we poke a reference to the Arrow > buffers into the gRPC data structure and so mutating your Arrow buffer will > mysteriously change "previously written" data.) > > It's been years since I've touched this, though, so the details here are > fuzzy to me... > > On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote: > > Hi all, > > Related to this issue and solution below we were wondering if it is safe > to call VectorLoader.load() before checking if a client is ready when using > back pressure strategy. The thinking is that the client may still be > reading data from the root and calling load() may cause a data race. > > *public* void getStream(CallContext context, ServerStreamListener listener) { > *final* FlightStream flightStream = *this*.client.getStream(ticket); > > VectorSchemaRoot clientRoot = flightStream.getRoot(); > VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot); > > *final* VectorSchemaRoot root = *new* > VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), > clientRoot.getRowCount()); > *final* VectorLoader vectorLoader = *new* VectorLoader(root); > listener.start(root, flightStream.getDictionaryProvider()); > > *while* (flightStream.next()) { > *if* (!flightStream.hasRoot()) { > *break*; > } > > *if* (flightStream.getRoot() != clientRoot) { > clientRoot = flightStream.getRoot(); > vectorUnloader = *new* VectorUnloader(clientRoot); > } > > // is this safe to happen before the client is ready? > vectorLoader.load(vectorUnloader.getRecordBatch()); > > // this uses the build in CallBackpressureStrategy > *final* BackpressureStrategy.WaitResult waitResult = > backpressureStrategy.waitForListener(timeout); > *if* (waitResult == BackpressureStrategy.WaitResult.READY) { > listener.putNext(); > } > } > > listener.completed();} > > Let me know what you think. > > Sincerely, > > Alex McRae > > On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <[email protected]> > wrote: > > Absolutely! > > On Mon, Feb 28, 2022 at 10:06 AM David Li <[email protected]> wrote: > > > Just to make sure this doesn't get forgotten: I filed > https://github.com/apache/arrow-cookbook/issues/158 for providing an > example of this. > > -David > > On Tue, Feb 15, 2022, at 13:54, David Li wrote: > > It should be safe. The fast-read path has pretty much always been enabled > and I'm not aware of issues it causes. (The fast-read path simply calls an > internal gRPC method to avoid bouncing through byte[], but we're still > copying the data into Arrow, now that I look at it.) > > The fast-write path is not relevant here, but that's the one that is > trickier to use. We should make sure the optimization(s) are properly > documented, since looking through it's not really explained what the > consequences are (or at least the flag in ArrowMessage should reference > setUseZeroCopy, and we should have a doc page for these env vars analogous > to ARROW-15617 for C++.) > > On a side note, digging around to refresh my memory shows that gRPC Java > *finally* introduced a zero-copy Protobuf deserialization path. I'm not > sure it's quite relevant for us, since we still need to get the data into > an off-heap buffer in the end, but I need to take a closer look. (See > grpc/grpc-java#8102.) > > -David > > On Tue, Feb 15, 2022, at 13:12, James Duong wrote: > > Thanks for the tip David. > > Do you know if zero copy can be used safely on the ServerStreamListener > when using the VectorUnloader/Loader pattern above? > > On Mon, Feb 14, 2022 at 9:38 AM David Li <[email protected]> wrote: > > > Hey Alex, > > Basically, you should call start() exactly once, as you noticed, it sends > the initial schema message. > > If the VectorSchemaRoot is not stable, what you should do is create your > own root with the same schema, and use VectorUnloader/VectorLoader to > transfer data from the source root to the root used by Flight. > > Does that make sense? This would be good to add to the Arrow Java cookbook > (at least, the VectorLoader/Unloader part). > > -David > > On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote: > > Hi team, > > We are currently building a Flight service which proxies requests in Java. > We are currently getting getStream working on the FlightProducer. > > The code looks similar to this > > public *void* getStream(CallContext context, ServerStreamListener listener) { > FlightStream stream = *this*.client.getStream(ticket); > *while* (flightStream.next()) { > *if* (!flightStream.hasRoot()) { *break*; } > > listener.start(flightStream.getRoot(), > flightStream.getDictionaryProvider()); > listener.putNext(); > } > > listener.completed(); > > } > > > We are running into issues understanding if this is valid usage? I have > looked at the OutBoundStreamListenerImpl.java file and it looks like > calling start() on the listener causes it to resend some schema messages. > We are trying to understand how to handle the case where > flightStream.getRoot() returns a different VectorSchemaRoot than the > previous call. > > For more context we have also tried > > public *void* getStream(CallContext context, ServerStreamListener listener) { > FlightStream flightStream = *this*.client.getStream(ticket); > listener.start(flightStream.getRoot(), > flightStream.getDictionaryProvider()); > *while* (flightStream.next()) { > *if* (!flightStream.hasRoot()) { *break*; } > > listener.putNext(); > } > listener.completed();} > > But ran into issues with the connection not closing, we believe this to be > due to the VectorSchemaRoot changing on flightStream.next() calls. We > believe this is a issue because we are sharing the root with both the > FlightStream and ServerStreamListener. > https://github.com/dremio-hub/arrow-flight-client-examples is the client > we are using to test this end to end. > > Please let me know if you can provide any clarity, I would be happy to > update the documentation afterwards. > > Sincerely, > Alex McRae > [email protected] > > > > > -- > *James Duong* > Lead Software Developer > Bit Quill Technologies Inc. > Direct: +1.604.562.6082 | [email protected] > https://www.bitquilltech.com > > > This email message is for the sole use of the intended recipient(s) and > may contain confidential and privileged information. Any unauthorized > review, use, disclosure, or distribution is prohibited. If you are not the > intended recipient, please contact the sender by reply email and destroy > all copies of the original message. Thank you. > > > > > > -- *James Duong* Lead Software Developer Bit Quill Technologies Inc. Direct: +1.604.562.6082 | [email protected] https://www.bitquilltech.com This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure, or distribution is prohibited. If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message. Thank you.
