Yes I was referring to https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563 since that is responsible for scheduling the bundle finalization. It will only be invoked if the current bundle completes.
I would add logging to ParDoEvaluator#finishBundle[1] to see that the bundle is being completed. I would add logging to EvaluationContext#handleResult[2] to see how the bundle completion is being handled at the bundle finalization callback is being invoked. 1: https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L267 2: https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java#L157 On Thu, Jun 17, 2021 at 10:42 AM Alex Koay <alexkoa...@gmail.com> wrote: > Could you be referring to this part? > https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563 > > I've tried fiddling with it and it gave some good results if I recall > correctly. > I didn't mention it earlier because I thought that maybe a shorter expiry > actually causes the readers to expire faster and thus releasing the unacked > messages for another reader to bundle up. > > I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get > called for at least some time if the bundle size is not maxed (or close to > maxed) out. > I've added logging into finalizeCheckpoint() and don't see it getting > called. It's where the acknowledgements are happening. > > I've actually opened a Google support ticket for this as well, perhaps you > could take a look at it (Case 28209335). > Thanks for your reply, I'll try to debug this further too. > > On Fri, Jun 18, 2021 at 12:49 AM Luke Cwik <lc...@google.com> wrote: > >> Reducing the bundle size/timeout shouldn't be necessary since when the >> UnboundedSource returns false from advance(), the >> UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and >> return resume for the process continuation. This should cause >> invokeProcessElement() to complete in >> OutputAndTimeBoundedSplittableProcessElementInvoker and the runner specific >> implementation should finish the current bundle. This will allow the runner >> to do two things: >> 1) Finalize the current bundle >> 2) Schedule the continuation for the checkpoint mark >> >> Based upon your description it looks like for some reason the runner is >> unable to complete the current bundle. >> >> On Thu, Jun 17, 2021 at 2:48 AM Alex Koay <alexkoa...@gmail.com> wrote: >> >>> Okay, I think I've found the issue, but now I need some help figuring >>> out how to fix the issue. >>> >>> 1. Solace allows a number of unacknowledged messages before it stops >>> sending more messages. This number just so happens to be 10k messages by >>> default (in the queue I am using). >>> 2. The Solace Beam transform (rightly) waits until bundle finalization >>> before acknowledging the messages. >>> 3. Bundle finalization doesn't happen until it either reaches 10k >>> messages or 10s for the DataflowRunner. For the PortableRunner this seems >>> to be 10k or an unknown timeout. This is related to the >>> OutputAndTimeBoundedSplittableProcessElementInvoker. >>> 4. Many readers are created (over and over) due to the >>> UnboundedSourceAsSdfWrapperFn. >>> 5. When a lot of readers are created, they would compete for the >>> messages (in non-exclusive mode), eventually leaving a small number of >>> unacknowledged messages per bundle. >>> 6. The readers are then cached in cachedReaders in the >>> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get >>> evicted after a minute. See https://github.com/apache/beam/pull/13592 >>> 7. The readers each have a small number of unacknowledged messages which >>> will remain unacknowledged and cannot be given to another consumer until >>> the bundle finalization happens. >>> 8. When bundle finalization happens (possibly after the reader gets >>> evicted), the messages return to the queue, only to get taken by the huge >>> number of other competing readers. >>> >>> At this point, I'm guessing the few methods to fix this are: >>> a. reduce the bundle size / reduce the bundle timeout (which all seem to >>> be hardcoded per runner) >>> b. reduce the number of cached readers / their timeouts (which doesn't >>> seem to be customizable either) so that there would be less contention >>> c. somehow reduce the splitting process and instead reusing existing >>> sources over and over >>> >>> I'd be happy to send pull requests to help fix this issue but perhaps >>> will need some direction as to how I should fix this. >>> >>> On Wed, Jun 16, 2021 at 8:32 PM Alex Koay <alexkoa...@gmail.com> wrote: >>> >>>> Alright, some updates. >>>> >>>> Using DirectRunner helped narrow things down quite a bit. It seems that >>>> the Solace transform is somewhat buggy when used with the >>>> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark. >>>> Refer to this: >>>> https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40 >>>> >>>> The source simply creates a new Reader every time createReader() is >>>> called. >>>> >>>> Because of these, the cachedReaders in the >>>> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in >>>> readers not being closed, but stay in the cache. >>>> Changing the timeout causes the pipeline to continue draining but at a >>>> glacial pace. >>>> >>>> I've still not able to isolate the root cause of why it suddenly stops >>>> reading more data (could be a Solace issue though). >>>> >>>> >>>> Also, trying the easy way out, I've tried running it with 2.24.0 (the >>>> last one without the SDF default Read) in Java and it works perfectly. >>>> Newer versions in Java DirectRunner don't work correctly either. >>>> Unfortunately Dataflow seems to expand the external transform using the >>>> SDF Read version even when using 2.24.0 (I'm not entirely sure why this is >>>> the case). >>>> >>>> I feel like I'm almost at the verge of fixing the problem, but at this >>>> point I'm still far from it. >>>> >>>> >>>> On Wed, Jun 16, 2021 at 11:24 AM Alex Koay <alexkoa...@gmail.com> >>>> wrote: >>>> >>>>> 1. I'm building a streaming pipeline. >>>>> 2. For the pure Java transforms pipeline I believe it got substituted >>>>> with a Dataflow native Solace transform (it isn't using use_runner_v2 as I >>>>> think Java doesn't support that publicly yet). I used the default Java >>>>> flags with a DataflowRunner. >>>>> 3. I believe it's the source reader that is being created in mass. >>>>> >>>>> Currently I just tested the Python pipeline (with Java Solace >>>>> transform) on the DirectRunner without bounds, and it seems that the issue >>>>> is similarly manifesting. I'm trying to debug it this way for now. >>>>> >>>>> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang <boyu...@google.com> >>>>> wrote: >>>>> >>>>>> In terms of the odd case you are experiencing, it seems like you are >>>>>> comparing a pure java pipeline with a cross-language pipeline, right? I >>>>>> want to learn more details on this case: >>>>>> >>>>>> - Is this a batch pipeline or a streaming pipeline? >>>>>> - For your pure java transforms pipeline, do you run the pipeline >>>>>> with 'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'? >>>>>> - For a large number of consumers, do you mean dataflow workers >>>>>> or the source reader? >>>>>> >>>>>> If you can share the implementation of the source and the pipeline, >>>>>> that would be really helpful. >>>>>> >>>>>> +Lukasz Cwik <lc...@google.com> for awareness. >>>>>> >>>>>> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath < >>>>>> chamik...@google.com> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay <alexkoa...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Several questions: >>>>>>>> >>>>>>>> 1. Is there any way to set the log level for the Java workers via a >>>>>>>> Python Dataflow pipeline? >>>>>>>> >>>>>>> >>>>>>>> 2. What is the easiest way to debug an external transform in Java? >>>>>>>> My main pipeline code is in Python. >>>>>>>> >>>>>>> >>>>>>> In general, debugging a job should be similar to any other Dataflow >>>>>>> job [1]. But some of the SDK options available to the main SDK >>>>>>> environment >>>>>>> are currently not available to external SDK environments. This includes >>>>>>> changing the debug level. So I suggest adding INFO logs instead of >>>>>>> changing >>>>>>> the debug level if possible. >>>>>>> >>>>>>> [1] >>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF >>>>>>>> that I should be wary of? I'm currently encountering a odd case (in >>>>>>>> Dataflow) where a Java pipeline runs with only one worker all the way >>>>>>>> reading Solace messages, but with an external transform in Python, it >>>>>>>> generates a large number of consumers and stop reading messages >>>>>>>> altogether >>>>>>>> about 90% of the way. >>>>>>>> >>>>>>> >>>>>>> +Boyuan Zhang <boyu...@google.com> might be able to help. >>>>>>> >>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> Cheers >>>>>>>> Alex >>>>>>>> >>>>>>>