Yes I was referring to
that is responsible for scheduling the bundle finalization. It will only be
invoked if the current bundle completes.

I would add logging to ParDoEvaluator#finishBundle[1] to see that the
bundle is being completed.
I would add logging to EvaluationContext#handleResult[2] to see how the
bundle completion is being handled at the bundle finalization callback is
being invoked.


On Thu, Jun 17, 2021 at 10:42 AM Alex Koay <> wrote:

> Could you be referring to this part?
> I've tried fiddling with it and it gave some good results if I recall
> correctly.
> I didn't mention it earlier because I thought that maybe a shorter expiry
> actually causes the readers to expire faster and thus releasing the unacked
> messages for another reader to bundle up.
> I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get
> called for at least some time if the bundle size is not maxed (or close to
> maxed) out.
> I've added logging into finalizeCheckpoint() and don't see it getting
> called. It's where the acknowledgements are happening.
> I've actually opened a Google support ticket for this as well, perhaps you
> could take a look at it (Case 28209335).
> Thanks for your reply, I'll try to debug this further too.
> On Fri, Jun 18, 2021 at 12:49 AM Luke Cwik <> wrote:
>> Reducing the bundle size/timeout shouldn't be necessary since when the
>> UnboundedSource returns false from advance(), the
>> UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and
>> return resume for the process continuation. This should cause
>> invokeProcessElement() to complete in
>> OutputAndTimeBoundedSplittableProcessElementInvoker and the runner specific
>> implementation should finish the current bundle. This will allow the runner
>> to do two things:
>> 1) Finalize the current bundle
>> 2) Schedule the continuation for the checkpoint mark
>> Based upon your description it looks like for some reason the runner is
>> unable to complete the current bundle.
>> On Thu, Jun 17, 2021 at 2:48 AM Alex Koay <> wrote:
>>> Okay, I think I've found the issue, but now I need some help figuring
>>> out how to fix the issue.
>>> 1. Solace allows a number of unacknowledged messages before it stops
>>> sending more messages. This number just so happens to be 10k messages by
>>> default (in the queue I am using).
>>> 2. The Solace Beam transform (rightly) waits until bundle finalization
>>> before acknowledging the messages.
>>> 3. Bundle finalization doesn't happen until it either reaches 10k
>>> messages or 10s for the DataflowRunner. For the PortableRunner this seems
>>> to be 10k or an unknown timeout. This is related to the
>>> OutputAndTimeBoundedSplittableProcessElementInvoker.
>>> 4. Many readers are created (over and over) due to the
>>> UnboundedSourceAsSdfWrapperFn.
>>> 5. When a lot of readers are created, they would compete for the
>>> messages (in non-exclusive mode), eventually leaving a small number of
>>> unacknowledged messages per bundle.
>>> 6. The readers are then cached in cachedReaders in the
>>> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get
>>> evicted after a minute. See
>>> 7. The readers each have a small number of unacknowledged messages which
>>> will remain unacknowledged and cannot be given to another consumer until
>>> the bundle finalization happens.
>>> 8. When bundle finalization happens (possibly after the reader gets
>>> evicted), the messages return to the queue, only to get taken by the huge
>>> number of other competing readers.
>>> At this point, I'm guessing the few methods to fix this are:
>>> a. reduce the bundle size / reduce the bundle timeout (which all seem to
>>> be hardcoded per runner)
>>> b. reduce the number of cached readers / their timeouts (which doesn't
>>> seem to be customizable either) so that there would be less contention
>>> c. somehow reduce the splitting process and instead reusing existing
>>> sources over and over
>>> I'd be happy to send pull requests to help fix this issue but perhaps
>>> will need some direction as to how I should fix this.
>>> On Wed, Jun 16, 2021 at 8:32 PM Alex Koay <> wrote:
>>>> Alright, some updates.
>>>> Using DirectRunner helped narrow things down quite a bit. It seems that
>>>> the Solace transform is somewhat buggy when used with the
>>>> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark.
>>>> Refer to this:
>>>> The source simply creates a new Reader every time createReader() is
>>>> called.
>>>> Because of these, the cachedReaders in the
>>>> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in
>>>> readers not being closed, but stay in the cache.
>>>> Changing the timeout causes the pipeline to continue draining but at a
>>>> glacial pace.
>>>> I've still not able to isolate the root cause of why it suddenly stops
>>>> reading more data (could be a Solace issue though).
>>>> Also, trying the easy way out, I've tried running it with 2.24.0 (the
>>>> last one without the SDF default Read) in Java and it works perfectly.
>>>> Newer versions in Java DirectRunner don't work correctly either.
>>>> Unfortunately Dataflow seems to expand the external transform using the
>>>> SDF Read version even when using 2.24.0 (I'm not entirely sure why this is
>>>> the case).
>>>> I feel like I'm almost at the verge of fixing the problem, but at this
>>>> point I'm still far from it.
>>>> On Wed, Jun 16, 2021 at 11:24 AM Alex Koay <>
>>>> wrote:
>>>>> 1. I'm building a streaming pipeline.
>>>>> 2. For the pure Java transforms pipeline I believe it got substituted
>>>>> with a Dataflow native Solace transform (it isn't using use_runner_v2 as I
>>>>> think Java doesn't support that publicly yet). I used the default Java
>>>>> flags with a DataflowRunner.
>>>>> 3. I believe it's the source reader that is being created in mass.
>>>>> Currently I just tested the Python pipeline (with Java Solace
>>>>> transform) on the DirectRunner without bounds, and it seems that the issue
>>>>> is similarly manifesting. I'm trying to debug it this way for now.
>>>>> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang <>
>>>>> wrote:
>>>>>> In terms of the odd case you are experiencing,  it seems like you are
>>>>>> comparing a pure java pipeline with a cross-language pipeline, right? I
>>>>>> want to learn more details on this case:
>>>>>>    - Is this a batch pipeline or a streaming pipeline?
>>>>>>    - For your pure java transforms pipeline, do you run the pipeline
>>>>>>    with 'use_runner_v2' or 'beam_fn_api' or 'use_unified_worker'?
>>>>>>    - For a large number of consumers, do you mean dataflow workers
>>>>>>    or the source reader?
>>>>>> If you can share the implementation of the source and the pipeline,
>>>>>> that would be really helpful.
>>>>>> +Lukasz Cwik <> for awareness.
>>>>>> On Tue, Jun 15, 2021 at 9:50 AM Chamikara Jayalath <
>>>>>>> wrote:
>>>>>>> On Tue, Jun 15, 2021 at 3:20 AM Alex Koay <>
>>>>>>> wrote:
>>>>>>>> Several questions:
>>>>>>>> 1. Is there any way to set the log level for the Java workers via a
>>>>>>>> Python Dataflow pipeline?
>>>>>>>> 2. What is the easiest way to debug an external transform in Java?
>>>>>>>> My main pipeline code is in Python.
>>>>>>> In general, debugging a job should be similar to any other Dataflow
>>>>>>> job [1]. But some of the SDK options available to the main SDK 
>>>>>>> environment
>>>>>>> are currently not available to external SDK environments. This includes
>>>>>>> changing the debug level. So I suggest adding INFO logs instead of 
>>>>>>> changing
>>>>>>> the debug level if possible.
>>>>>>> [1]
>>>>>>>> 3. Are there any edge cases with the UnboundedSourceWrapperFn SDF
>>>>>>>> that I should be wary of? I'm currently encountering a odd case (in
>>>>>>>> Dataflow) where a Java pipeline runs with only one worker all the way
>>>>>>>> reading Solace messages, but with an external transform in Python, it
>>>>>>>> generates a large number of consumers and stop reading messages 
>>>>>>>> altogether
>>>>>>>> about 90% of the way.
>>>>>>> +Boyuan Zhang <> might be able to help.
>>>>>>>> Thanks!
>>>>>>>> Cheers
>>>>>>>> Alex

Reply via email to