So you're finding that if you remove the backpressure handler, there are no problems?
Is the timeout a gRPC timeout? Do you know if any messages are making it through, or is it timing out after a period of no activity at all? On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote: > Hi David, > > We believe we have a data race somewhere as debugging the code above causes > no issues but running it without the debugger causes a timeout. We were > trying to investigate if putNext kept around a reference to the data in the > VectorSchemaRoot. Given that we are getting a timeout with the backpressure > we think it is possible the code > https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java may > be the culprit. > > On Fri, Mar 4, 2022 at 2:35 PM David Li <[email protected]> wrote: >> __ >> It should be safe. Are you seeing any issues? >> >> Flight waits for an explicit next()/putNext() to actually touch anything. >> And once they return, Flight will not read or mutate your data. So for >> instance, calling putNext() copies the Arrow data into a gRPC buffer, after >> which you can reuse the Arrow buffers. (This is *not* true if you have >> zero-copy writes enabled. In that case we poke a reference to the Arrow >> buffers into the gRPC data structure and so mutating your Arrow buffer will >> mysteriously change "previously written" data.) >> >> It's been years since I've touched this, though, so the details here are >> fuzzy to me... >> >> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote: >>> Hi all, >>> >>> Related to this issue and solution below we were wondering if it is safe to >>> call VectorLoader.load() before checking if a client is ready when using >>> back pressure strategy. The thinking is that the client may still be >>> reading data from the root and calling load() may cause a data race. >>> >>> *public* void getStream(CallContext context, ServerStreamListener listener) >>> { >>> *final* FlightStream flightStream = *this*.client.getStream(ticket); >>> >>> VectorSchemaRoot clientRoot = flightStream.getRoot(); >>> VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot); >>> >>> *final* VectorSchemaRoot root = *new* >>> VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), >>> clientRoot.getRowCount()); >>> *final* VectorLoader vectorLoader = *new* VectorLoader(root); >>> listener.start(root, flightStream.getDictionaryProvider()); >>> >>> *while* (flightStream.next()) { >>> *if* (!flightStream.hasRoot()) { >>> *break*; >>> } >>> >>> *if* (flightStream.getRoot() != clientRoot) { >>> clientRoot = flightStream.getRoot(); >>> vectorUnloader = *new* VectorUnloader(clientRoot); >>> } >>> >>> // is this safe to happen before the client is ready? >>> vectorLoader.load(vectorUnloader.getRecordBatch()); >>> >>> // this uses the build in CallBackpressureStrategy >>> *final* BackpressureStrategy.WaitResult waitResult = >>> backpressureStrategy.waitForListener(timeout); >>> *if* (waitResult == BackpressureStrategy.WaitResult.READY) { >>> listener.putNext(); >>> } >>> } >>> >>> listener.completed(); >>> } >>> >>> Let me know what you think. >>> >>> Sincerely, >>> >>> Alex McRae >>> >>>> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <[email protected]> >>>> wrote: >>>> >>>> Absolutely! >>>> >>>> On Mon, Feb 28, 2022 at 10:06 AM David Li <[email protected]> wrote: >>>>> __ >>>>> Just to make sure this doesn't get forgotten: I filed >>>>> https://github.com/apache/arrow-cookbook/issues/158 for providing an >>>>> example of this. >>>>> >>>>> -David >>>>> >>>>> On Tue, Feb 15, 2022, at 13:54, David Li wrote: >>>>>> It should be safe. The fast-read path has pretty much always been >>>>>> enabled and I'm not aware of issues it causes. (The fast-read path >>>>>> simply calls an internal gRPC method to avoid bouncing through byte[], >>>>>> but we're still copying the data into Arrow, now that I look at it.) >>>>>> >>>>>> The fast-write path is not relevant here, but that's the one that is >>>>>> trickier to use. We should make sure the optimization(s) are properly >>>>>> documented, since looking through it's not really explained what the >>>>>> consequences are (or at least the flag in ArrowMessage should reference >>>>>> setUseZeroCopy, and we should have a doc page for these env vars >>>>>> analogous to ARROW-15617 for C++.) >>>>>> >>>>>> On a side note, digging around to refresh my memory shows that gRPC Java >>>>>> *finally* introduced a zero-copy Protobuf deserialization path. I'm not >>>>>> sure it's quite relevant for us, since we still need to get the data >>>>>> into an off-heap buffer in the end, but I need to take a closer look. >>>>>> (See grpc/grpc-java#8102.) >>>>>> >>>>>> -David >>>>>> >>>>>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote: >>>>>>> Thanks for the tip David. >>>>>>> >>>>>>> Do you know if zero copy can be used safely on the ServerStreamListener >>>>>>> when using the VectorUnloader/Loader pattern above? >>>>>>> >>>>>>> On Mon, Feb 14, 2022 at 9:38 AM David Li <[email protected]> wrote: >>>>>>>> __ >>>>>>>> Hey Alex, >>>>>>>> >>>>>>>> Basically, you should call start() exactly once, as you noticed, it >>>>>>>> sends the initial schema message. >>>>>>>> >>>>>>>> If the VectorSchemaRoot is not stable, what you should do is create >>>>>>>> your own root with the same schema, and use >>>>>>>> VectorUnloader/VectorLoader to transfer data from the source root to >>>>>>>> the root used by Flight. >>>>>>>> >>>>>>>> Does that make sense? This would be good to add to the Arrow Java >>>>>>>> cookbook (at least, the VectorLoader/Unloader part). >>>>>>>> >>>>>>>> -David >>>>>>>> >>>>>>>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote: >>>>>>>>> Hi team, >>>>>>>>> >>>>>>>>> We are currently building a Flight service which proxies requests in >>>>>>>>> Java. We are currently getting getStream working on the >>>>>>>>> FlightProducer. >>>>>>>>> >>>>>>>>> The code looks similar to this >>>>>>>>> >>>>>>>>> public *void* getStream(CallContext context, ServerStreamListener >>>>>>>>> listener) { >>>>>>>>> FlightStream stream = *this*.client.getStream(ticket); >>>>>>>>> *while* (flightStream.next()) { >>>>>>>>> *if* (!flightStream.hasRoot()) { *break*; } >>>>>>>>> >>>>>>>>> listener.start(flightStream.getRoot(), >>>>>>>>> flightStream.getDictionaryProvider()); >>>>>>>>> listener.putNext(); >>>>>>>>> } >>>>>>>>> listener.completed(); >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> We are running into issues understanding if this is valid usage? I >>>>>>>>> have looked at the OutBoundStreamListenerImpl.java file and it looks >>>>>>>>> like calling start() on the listener causes it to resend some schema >>>>>>>>> messages. >>>>>>>>> We are trying to understand how to handle the case where >>>>>>>>> flightStream.getRoot() returns a different VectorSchemaRoot than the >>>>>>>>> previous call. >>>>>>>>> >>>>>>>>> For more context we have also tried >>>>>>>>> public *void* getStream(CallContext context, ServerStreamListener >>>>>>>>> listener) { >>>>>>>>> FlightStream flightStream = *this*.client.getStream(ticket); >>>>>>>>> listener.start(flightStream.getRoot(), >>>>>>>>> flightStream.getDictionaryProvider()); >>>>>>>>> *while* (flightStream.next()) { >>>>>>>>> *if* (!flightStream.hasRoot()) { *break*; } >>>>>>>>> >>>>>>>>> listener.putNext(); >>>>>>>>> } >>>>>>>>> listener.completed(); >>>>>>>>> } >>>>>>>>> But ran into issues with the connection not closing, we believe this >>>>>>>>> to be due to the VectorSchemaRoot changing on flightStream.next() >>>>>>>>> calls. We believe this is a issue because we are sharing the root >>>>>>>>> with both the FlightStream and ServerStreamListener. >>>>>>>>> https://github.com/dremio-hub/arrow-flight-client-examples is the >>>>>>>>> client we are using to test this end to end. >>>>>>>>> >>>>>>>>> Please let me know if you can provide any clarity, I would be happy >>>>>>>>> to update the documentation afterwards. >>>>>>>>> >>>>>>>>> Sincerely, >>>>>>>>> Alex McRae >>>>>>>>> [email protected] >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> *James Duong* >>>>>>> Lead Software Developer >>>>>>> Bit Quill Technologies Inc. >>>>>>> Direct: +1.604.562.6082 | [email protected] >>>>>>> https://www.bitquilltech.com >>>>>>> >>>>>>> >>>>>>> This email message is for the sole use of the intended recipient(s) and >>>>>>> may contain confidential and privileged information. Any unauthorized >>>>>>> review, use, disclosure, or distribution is prohibited. If you are not >>>>>>> the intended recipient, please contact the sender by reply email and >>>>>>> destroy all copies of the original message. Thank you. >>>>>> >>>>> >>
